/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class PurgeRepartitionTopicIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String INPUT_TOPIC = "input-stream";
    private static final String APPLICATION_ID = "restore-test";
    private static final String REPARTITION_TOPIC = "restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition";
    private static AdminClient adminClient;
    private static KafkaStreams kafkaStreams;
    private static Integer purgeIntervalMs;
    private static Integer purgeSegmentBytes;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;
    private Time time;

    public PurgeRepartitionTopicIntegrationTest() {
        this.time = PurgeRepartitionTopicIntegrationTest.CLUSTER.time;
    }

    @BeforeClass
    public static void createTopics() throws InterruptedException {
        CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
    }

    @Before
    public void setup() {
        Properties adminConfig = new Properties();
        adminConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        adminClient = AdminClient.create((Properties)adminConfig);
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", APPLICATION_ID);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory((String)APPLICATION_ID).getPath());
        streamsConfiguration.put(StreamsConfig.topicPrefix((String)"segment.ms"), purgeIntervalMs);
        streamsConfiguration.put(StreamsConfig.topicPrefix((String)"segment.bytes"), purgeSegmentBytes);
        streamsConfiguration.put(StreamsConfig.producerPrefix((String)"batch.size"), (Object)(purgeSegmentBytes / 2));
        streamsConfiguration.put("commit.interval.ms", purgeIntervalMs);
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(INPUT_TOPIC).groupBy(MockMapper.selectKeyKeyValueMapper()).count();
        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration, this.time);
    }

    @After
    public void shutdown() throws IOException {
        if (kafkaStreams != null) {
            kafkaStreams.close(30L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void shouldRestoreState() throws InterruptedException, ExecutionException {
        ArrayList messages = new ArrayList();
        for (int i = 0; i < 1000; ++i) {
            messages.add(new KeyValue((Object)i, (Object)i));
        }
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, messages, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), this.time.milliseconds());
        kafkaStreams.start();
        TestUtils.waitForCondition((TestCondition)new RepartitionTopicCreatedWithExpectedConfigs(), (long)60000L, (String)"Repartition topic restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition not created with the expected configs after 60000 ms.");
        TestUtils.waitForCondition((TestCondition)new RepartitionTopicVerified(new TopicSizeVerifier(){

            @Override
            public boolean verify(long currentSize) {
                return currentSize > 0L;
            }
        }), (long)60000L, (String)"Repartition topic restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition not received data after 60000 ms.");
        TestUtils.waitForCondition((TestCondition)new RepartitionTopicVerified(new TopicSizeVerifier(){

            @Override
            public boolean verify(long currentSize) {
                return currentSize <= (long)purgeSegmentBytes.intValue();
            }
        }), (long)60000L, (String)"Repartition topic restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition not purged data after 60000 ms.");
    }

    static {
        purgeIntervalMs = 10;
        purgeSegmentBytes = 2000;
        CLUSTER = new EmbeddedKafkaCluster(1, new Properties(){
            {
                this.put("log.retention.check.interval.ms", purgeIntervalMs);
                this.put("file.delete.delay.ms", (Object)0);
            }
        });
    }

    private class RepartitionTopicVerified
    implements TestCondition {
        private final TopicSizeVerifier verifier;

        RepartitionTopicVerified(TopicSizeVerifier verifier) {
            this.verifier = verifier;
        }

        public final boolean conditionMet() {
            PurgeRepartitionTopicIntegrationTest.this.time.sleep((long)purgeIntervalMs.intValue());
            try {
                Collection logDirInfo = ((Map)((KafkaFuture)adminClient.describeLogDirs(Collections.singleton(0)).values().get(0)).get()).values();
                for (DescribeLogDirsResponse.LogDirInfo partitionInfo : logDirInfo) {
                    DescribeLogDirsResponse.ReplicaInfo replicaInfo = (DescribeLogDirsResponse.ReplicaInfo)partitionInfo.replicaInfos.get(new TopicPartition(PurgeRepartitionTopicIntegrationTest.REPARTITION_TOPIC, 0));
                    if (replicaInfo == null || !this.verifier.verify(replicaInfo.size)) continue;
                    return true;
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            return false;
        }
    }

    private static interface TopicSizeVerifier {
        public boolean verify(long var1);
    }

    private class RepartitionTopicCreatedWithExpectedConfigs
    implements TestCondition {
        private RepartitionTopicCreatedWithExpectedConfigs() {
        }

        public final boolean conditionMet() {
            try {
                Set topics = (Set)adminClient.listTopics().names().get();
                if (!topics.contains(PurgeRepartitionTopicIntegrationTest.REPARTITION_TOPIC)) {
                    return false;
                }
            }
            catch (Exception e) {
                return false;
            }
            try {
                ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, PurgeRepartitionTopicIntegrationTest.REPARTITION_TOPIC);
                Config config = (Config)((KafkaFuture)adminClient.describeConfigs(Collections.singleton(resource)).values().get(resource)).get();
                return config.get("cleanup.policy").value().equals("delete") && config.get("segment.ms").value().equals(purgeIntervalMs.toString()) && config.get("segment.bytes").value().equals(purgeSegmentBytes.toString());
            }
            catch (Exception e) {
                return false;
            }
        }
    }
}

