/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminClient;
import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.immutable.List;

abstract class AbstractResetIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(AbstractResetIntegrationTest.class);
    static final int NUM_BROKERS = 1;
    private static final String APP_ID = "cleanup-integration-test";
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String OUTPUT_TOPIC = "outputTopic";
    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
    private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
    private static final int TIMEOUT_MULTIPLIER = 5;
    private static AdminClient adminClient = null;
    private static KafkaAdminClient kafkaAdminClient = null;
    private static int testNo = 0;
    static EmbeddedKafkaCluster cluster;
    static String bootstrapServers;
    static MockTime mockTime;
    private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();

    AbstractResetIntegrationTest() {
    }

    static void afterClassGlobalCleanup() {
        if (adminClient != null) {
            adminClient.close();
            adminClient = null;
        }
        if (kafkaAdminClient != null) {
            kafkaAdminClient.close(10L, TimeUnit.SECONDS);
            kafkaAdminClient = null;
        }
    }

    void beforePrepareTest() throws Exception {
        ++testNo;
        mockTime = AbstractResetIntegrationTest.cluster.time;
        bootstrapServers = cluster.bootstrapServers();
        long alignedTime = (System.currentTimeMillis() / 1000L + 1L) * 1000L;
        mockTime.setCurrentTimeMs(alignedTime);
        Properties sslConfig = this.getClientSslConfig();
        if (sslConfig == null) {
            sslConfig = new Properties();
            sslConfig.put("bootstrap.servers", bootstrapServers);
        }
        if (adminClient == null) {
            adminClient = AdminClient.create((Properties)sslConfig);
        }
        if (kafkaAdminClient == null) {
            kafkaAdminClient = (KafkaAdminClient)org.apache.kafka.clients.admin.AdminClient.create((Properties)sslConfig);
        }
        while (true) {
            Thread.sleep(50L);
            try {
                TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Test consumer group active even after waiting 10000 ms.");
            }
            catch (TimeoutException e) {
                continue;
            }
            break;
        }
        this.prepareInputData();
    }

    Properties getClientSslConfig() {
        return null;
    }

    void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
        Properties sslConfig = this.getClientSslConfig();
        Properties streamsConfiguration = this.prepareTest();
        Properties resultTopicConsumerConfig = new Properties();
        if (sslConfig != null) {
            resultTopicConsumerConfig.putAll((Map<?, ?>)sslConfig);
        }
        resultTopicConsumerConfig.putAll((Map<?, ?>)TestUtils.consumerConfig((String)bootstrapServers, (String)"cleanup-integration-test-standard-consumer-outputTopic", LongDeserializer.class, LongDeserializer.class));
        KafkaStreams streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
        streams.start();
        java.util.List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Streams Application consumer group did not time out after 10000 ms.");
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
        streams.cleanUp();
        this.cleanGlobal(null, sslConfig);
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.assertInternalTopicsGotDeleted(null);
        streams.start();
        java.util.List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.cleanGlobal(null, sslConfig);
    }

    void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
        cluster.createTopic(INTERMEDIATE_USER_TOPIC);
        Properties sslConfig = this.getClientSslConfig();
        Properties streamsConfiguration = this.prepareTest();
        Properties resultTopicConsumerConfig = new Properties();
        if (sslConfig != null) {
            resultTopicConsumerConfig.putAll((Map<?, ?>)sslConfig);
        }
        resultTopicConsumerConfig.putAll((Map<?, ?>)TestUtils.consumerConfig((String)bootstrapServers, (String)"cleanup-integration-test-standard-consumer-outputTopic", LongDeserializer.class, LongDeserializer.class));
        KafkaStreams streams = new KafkaStreams(this.setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration);
        streams.start();
        java.util.List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
        java.util.List result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 40);
        streams.close();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Streams Application consumer group did not time out after 10000 ms.");
        mockTime.sleep(1L);
        Properties producerConfig = sslConfig;
        if (producerConfig == null) {
            producerConfig = new Properties();
        }
        producerConfig.putAll((Map<?, ?>)TestUtils.producerConfig((String)bootstrapServers, LongSerializer.class, StringSerializer.class));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INTERMEDIATE_USER_TOPIC, Collections.singleton(new KeyValue((Object)-1L, (Object)"badRecord-ShouldBeSkipped")), producerConfig, mockTime.milliseconds());
        streams = new KafkaStreams(this.setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
        streams.cleanUp();
        this.cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig);
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
        streams.start();
        java.util.List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
        java.util.List resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        MatcherAssert.assertThat(resultRerun2, (Matcher)CoreMatchers.equalTo(result2));
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig);
        cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
    }

    private Properties prepareTest() throws IOException {
        Properties streamsConfiguration = this.getClientSslConfig();
        if (streamsConfiguration == null) {
            streamsConfiguration = new Properties();
        }
        streamsConfiguration.put("application.id", APP_ID + testNo);
        streamsConfiguration.put("bootstrap.servers", bootstrapServers);
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("default.key.serde", Serdes.Long().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        streamsConfiguration.put("commit.interval.ms", (Object)100);
        streamsConfiguration.put("heartbeat.interval.ms", (Object)100);
        streamsConfiguration.put("session.timeout.ms", "2000");
        streamsConfiguration.put("auto.offset.reset", "earliest");
        streamsConfiguration.put("internal.leave.group.on.close", (Object)true);
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
        return streamsConfiguration;
    }

    private void prepareInputData() throws Exception {
        cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
        Properties producerConfig = this.getClientSslConfig();
        if (producerConfig == null) {
            producerConfig = new Properties();
        }
        producerConfig.putAll((Map<?, ?>)TestUtils.producerConfig((String)bootstrapServers, LongSerializer.class, StringSerializer.class));
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"aaa")), producerConfig, mockTime.milliseconds());
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"bbb")), producerConfig, mockTime.milliseconds());
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"ccc")), producerConfig, mockTime.milliseconds());
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"ddd")), producerConfig, mockTime.milliseconds());
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"eee")), producerConfig, mockTime.milliseconds());
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"fff")), producerConfig, mockTime.milliseconds());
        mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"ggg")), producerConfig, mockTime.milliseconds());
        mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"hhh")), producerConfig, mockTime.milliseconds());
        mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"iii")), producerConfig, mockTime.milliseconds());
        mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"jjj")), producerConfig, mockTime.milliseconds());
    }

    private Topology setupTopologyWithIntermediateUserTopic(String outputTopic2) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(INPUT_TOPIC);
        input.map((KeyValueMapper)new KeyValueMapper<Long, String, KeyValue<Long, String>>(){

            public KeyValue<Long, String> apply(Long key, String value) {
                return new KeyValue((Object)key, (Object)value);
            }
        }).groupByKey().count().toStream().to(OUTPUT_TOPIC, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        input.through(INTERMEDIATE_USER_TOPIC).groupByKey().windowedBy((Windows)TimeWindows.of((long)35L).advanceBy(10L)).count().toStream().map((KeyValueMapper)new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>(){

            public KeyValue<Long, Long> apply(Windowed<Long> key, Long value) {
                return new KeyValue((Object)(key.window().start() + key.window().end()), (Object)value);
            }
        }).to(outputTopic2, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        return builder.build();
    }

    private Topology setupTopologyWithoutIntermediateUserTopic() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(INPUT_TOPIC);
        input.map((KeyValueMapper)new KeyValueMapper<Long, String, KeyValue<Long, Long>>(){

            public KeyValue<Long, Long> apply(Long key, String value) {
                return new KeyValue((Object)key, (Object)key);
            }
        }).to(OUTPUT_TOPIC, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        return builder.build();
    }

    private void cleanGlobal(String intermediateUserTopic, Properties sslConfig) throws Exception {
        String[] parameters;
        if (intermediateUserTopic != null) {
            parameters = new String[]{"--application-id", APP_ID + testNo, "--bootstrap-servers", bootstrapServers, "--input-topics", INPUT_TOPIC, "--intermediate-topics", INTERMEDIATE_USER_TOPIC, "--zookeeper", "localhost:2181"};
        } else if (sslConfig != null) {
            File configFile = TestUtils.tempFile();
            BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));
            writer.write("security.protocol=SSL\n");
            writer.write("ssl.truststore.location=" + sslConfig.get("ssl.truststore.location") + "\n");
            writer.write("ssl.truststore.password=" + sslConfig.get("ssl.truststore.password") + "\n");
            writer.close();
            parameters = new String[]{"--application-id", APP_ID + testNo, "--bootstrap-servers", bootstrapServers, "--input-topics", INPUT_TOPIC, "--config-file", configFile.getAbsolutePath()};
        } else {
            parameters = new String[]{"--application-id", APP_ID + testNo, "--bootstrap-servers", bootstrapServers, "--input-topics", INPUT_TOPIC};
        }
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", "2000");
        log.info("Calling StreamsResetter with parameters {} and configs {}", (Object)parameters, (Object)cleanUpConfig);
        int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
        Assert.assertEquals((long)0L, (long)exitCode);
    }

    private void assertInternalTopicsGotDeleted(String intermediateUserTopic) throws Exception {
        if (intermediateUserTopic != null) {
            cluster.waitForRemainingTopics(30000L, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets", intermediateUserTopic);
        } else {
            cluster.waitForRemainingTopics(30000L, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets");
        }
    }

    private class WaitUntilConsumerGroupGotClosed
    implements TestCondition {
        private WaitUntilConsumerGroupGotClosed() {
        }

        public boolean conditionMet() {
            return ((List)adminClient.describeConsumerGroup(AbstractResetIntegrationTest.APP_ID, 0L).consumers().get()).isEmpty();
        }
    }
}

