/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.Collections;
import java.util.Properties;
import kafka.admin.AdminClient;
import kafka.server.KafkaConfig$;
import kafka.tools.StreamsResetter;
import kafka.utils.MockTime;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import scala.collection.immutable.List;

@Category(value={IntegrationTest.class})
public class ResetIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;
    private static final String APP_ID = "cleanup-integration-test";
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String OUTPUT_TOPIC = "outputTopic";
    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
    private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
    private static final int TIMEOUT_MULTIPLIER = 5;
    private static int testNo;
    private static AdminClient adminClient;
    private MockTime mockTime;
    private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();

    @AfterClass
    public static void globalCleanup() {
        if (adminClient != null) {
            adminClient.close();
            adminClient = null;
        }
    }

    @Before
    public void cleanup() throws Exception {
        ++testNo;
        this.mockTime = ResetIntegrationTest.CLUSTER.time;
        long alignedTime = (System.currentTimeMillis() / 1000L + 1L) * 1000L;
        this.mockTime.setCurrentTimeMs(alignedTime);
        if (adminClient == null) {
            adminClient = AdminClient.createSimplePlaintext((String)CLUSTER.bootstrapServers());
        }
        while (true) {
            Thread.sleep(50L);
            try {
                TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Test consumer group active even after waiting 10000 ms.");
            }
            catch (TimeoutException e) {
                continue;
            }
            break;
        }
        this.prepareInputData();
    }

    @Test
    public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
        CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
        Properties streamsConfiguration = this.prepareTest(4);
        Properties resultTopicConsumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)"cleanup-integration-test-standard-consumer-outputTopic", LongDeserializer.class, LongDeserializer.class);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)this.setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration);
        streams.start();
        java.util.List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
        java.util.List result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 40);
        streams.close();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Streams Application consumer group did not time out after 10000 ms.");
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INTERMEDIATE_USER_TOPIC, Collections.singleton(new KeyValue((Object)-1L, (Object)"badRecord-ShouldBeSkipped")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class), this.mockTime.milliseconds());
        streams = new KafkaStreams((TopologyBuilder)this.setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
        streams.cleanUp();
        this.cleanGlobal(INTERMEDIATE_USER_TOPIC);
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
        streams.start();
        java.util.List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
        java.util.List resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        MatcherAssert.assertThat(resultRerun2, (Matcher)CoreMatchers.equalTo(result2));
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.cleanGlobal(INTERMEDIATE_USER_TOPIC);
        CLUSTER.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
    }

    @Test
    public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
        Properties streamsConfiguration = this.prepareTest(1);
        Properties resultTopicConsumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)"cleanup-integration-test-standard-consumer-outputTopic", LongDeserializer.class, LongDeserializer.class);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)this.setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
        streams.start();
        java.util.List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Streams Application consumer group did not time out after 10000 ms.");
        streams = new KafkaStreams((TopologyBuilder)this.setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
        streams.cleanUp();
        this.cleanGlobal(null);
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.assertInternalTopicsGotDeleted(null);
        streams.start();
        java.util.List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.cleanGlobal(null);
    }

    private Properties prepareTest(int threads) throws Exception {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", APP_ID + testNo);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("default.key.serde", Serdes.Long().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        streamsConfiguration.put("num.stream.threads", (Object)threads);
        streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        streamsConfiguration.put("commit.interval.ms", (Object)100);
        streamsConfiguration.put("heartbeat.interval.ms", (Object)100);
        streamsConfiguration.put("session.timeout.ms", "2000");
        streamsConfiguration.put("auto.offset.reset", "earliest");
        streamsConfiguration.put("internal.leave.group.on.close", (Object)true);
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
        return streamsConfiguration;
    }

    private void prepareInputData() throws Exception {
        CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"aaa")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"bbb")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"ccc")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"ddd")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"eee")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"fff")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"ggg")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"hhh")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"iii")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"jjj")), producerConfig, this.mockTime.milliseconds());
    }

    private KStreamBuilder setupTopologyWithIntermediateUserTopic(String outputTopic2) {
        KStreamBuilder builder = new KStreamBuilder();
        KStream input = builder.stream(new String[]{INPUT_TOPIC});
        input.map((KeyValueMapper)new KeyValueMapper<Long, String, KeyValue<Long, String>>(){

            public KeyValue<Long, String> apply(Long key, String value) {
                return new KeyValue((Object)key, (Object)value);
            }
        }).groupByKey().count("global-count").to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
        input.through(INTERMEDIATE_USER_TOPIC).groupByKey().count((Windows)TimeWindows.of((long)35L).advanceBy(10L), "count").toStream().map((KeyValueMapper)new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>(){

            public KeyValue<Long, Long> apply(Windowed<Long> key, Long value) {
                return new KeyValue((Object)(key.window().start() + key.window().end()), (Object)value);
            }
        }).to(Serdes.Long(), Serdes.Long(), outputTopic2);
        return builder;
    }

    private KStreamBuilder setupTopologyWithoutIntermediateUserTopic() {
        KStreamBuilder builder = new KStreamBuilder();
        KStream input = builder.stream(new String[]{INPUT_TOPIC});
        input.map((KeyValueMapper)new KeyValueMapper<Long, String, KeyValue<Long, Long>>(){

            public KeyValue<Long, Long> apply(Long key, String value) {
                return new KeyValue((Object)key, (Object)key);
            }
        }).to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
        return builder;
    }

    private void cleanGlobal(String intermediateUserTopic) {
        String[] parameters = intermediateUserTopic != null ? new String[]{"--application-id", APP_ID + testNo, "--bootstrap-servers", CLUSTER.bootstrapServers(), "--zookeeper", CLUSTER.zKConnectString(), "--input-topics", INPUT_TOPIC, "--intermediate-topics", INTERMEDIATE_USER_TOPIC} : new String[]{"--application-id", APP_ID + testNo, "--bootstrap-servers", CLUSTER.bootstrapServers(), "--zookeeper", CLUSTER.zKConnectString(), "--input-topics", INPUT_TOPIC};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", "2000");
        int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
        Assert.assertEquals((long)0L, (long)exitCode);
    }

    private void assertInternalTopicsGotDeleted(String intermediateUserTopic) throws Exception {
        if (intermediateUserTopic != null) {
            CLUSTER.waitForRemainingTopics(30000L, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets", intermediateUserTopic);
        } else {
            CLUSTER.waitForRemainingTopics(30000L, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets");
        }
    }

    static {
        Properties props = new Properties();
        props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), (Object)-1L);
        CLUSTER = new EmbeddedKafkaCluster(1, props);
        testNo = 0;
        adminClient = null;
    }

    private class WaitUntilConsumerGroupGotClosed
    implements TestCondition {
        private WaitUntilConsumerGroupGotClosed() {
        }

        public boolean conditionMet() {
            return ((List)adminClient.describeConsumerGroup(ResetIntegrationTest.APP_ID + testNo, 0L).consumers().get()).isEmpty();
        }
    }
}

