/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category(value={IntegrationTest.class})
public abstract class AbstractResetIntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    static EmbeddedKafkaCluster cluster;
    private static MockTime mockTime;
    protected static KafkaStreams streams;
    protected static Admin adminClient;
    @Rule
    public final TestName testName = new TestName();
    protected Properties commonClientConfig;
    protected Properties streamsConfig;
    private Properties producerConfig;
    protected Properties resultConsumerConfig;
    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
    protected static final String INPUT_TOPIC = "inputTopic";
    protected static final String OUTPUT_TOPIC = "outputTopic";
    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
    protected static final int CLEANUP_CONSUMER_TIMEOUT = 2000;
    protected static final int TIMEOUT_MULTIPLIER = 15;

    abstract Map<String, Object> getClientSslConfig();

    @AfterClass
    public static void afterClassCleanup() {
        if (adminClient != null) {
            adminClient.close(Duration.ofSeconds(10L));
            adminClient = null;
        }
    }

    private void prepareEnvironment() {
        if (adminClient == null) {
            adminClient = Admin.create((Properties)this.commonClientConfig);
        }
        boolean timeSet = false;
        while (!timeSet) {
            timeSet = this.setCurrentTime();
        }
    }

    private boolean setCurrentTime() {
        boolean currentTimeSet = false;
        try {
            mockTime = AbstractResetIntegrationTest.cluster.time;
            long alignedTime = (System.currentTimeMillis() / 1000L + 1L) * 1000L;
            mockTime.setCurrentTimeMs(alignedTime);
            currentTimeSet = true;
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        return currentTimeSet;
    }

    private void prepareConfigs(String appID) {
        this.commonClientConfig = new Properties();
        this.commonClientConfig.put("bootstrap.servers", cluster.bootstrapServers());
        Map<String, Object> sslConfig = this.getClientSslConfig();
        if (sslConfig != null) {
            this.commonClientConfig.put("ssl.truststore.location", sslConfig.get("ssl.truststore.location"));
            this.commonClientConfig.put("ssl.truststore.password", ((Password)sslConfig.get("ssl.truststore.password")).value());
            this.commonClientConfig.put("security.protocol", "SSL");
        }
        this.producerConfig = new Properties();
        this.producerConfig.put("acks", "all");
        this.producerConfig.put("key.serializer", LongSerializer.class);
        this.producerConfig.put("value.serializer", StringSerializer.class);
        this.producerConfig.putAll((Map<?, ?>)this.commonClientConfig);
        this.resultConsumerConfig = new Properties();
        this.resultConsumerConfig.put("group.id", appID + "-result-consumer");
        this.resultConsumerConfig.put("auto.offset.reset", "earliest");
        this.resultConsumerConfig.put("key.deserializer", LongDeserializer.class);
        this.resultConsumerConfig.put("value.deserializer", LongDeserializer.class);
        this.resultConsumerConfig.putAll((Map<?, ?>)this.commonClientConfig);
        this.streamsConfig = new Properties();
        this.streamsConfig.put("state.dir", this.testFolder.getRoot().getPath());
        this.streamsConfig.put("default.key.serde", Serdes.Long().getClass());
        this.streamsConfig.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfig.put("cache.max.bytes.buffering", (Object)0);
        this.streamsConfig.put("commit.interval.ms", (Object)100L);
        this.streamsConfig.put("heartbeat.interval.ms", (Object)100);
        this.streamsConfig.put("auto.offset.reset", "earliest");
        this.streamsConfig.put("session.timeout.ms", Integer.toString(2000));
        this.streamsConfig.putAll((Map<?, ?>)this.commonClientConfig);
    }

    void prepareTest() throws Exception {
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.prepareConfigs(appID);
        this.prepareEnvironment();
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        cluster.deleteAllTopicsAndWait(120000L);
        cluster.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
        this.add10InputElements();
    }

    void cleanupTest() throws Exception {
        if (streams != null) {
            streams.close(Duration.ofSeconds(30L));
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfig);
    }

    private void add10InputElements() {
        List<KeyValue> records = Arrays.asList(KeyValue.pair((Object)0L, (Object)"aaa"), KeyValue.pair((Object)1L, (Object)"bbb"), KeyValue.pair((Object)0L, (Object)"ccc"), KeyValue.pair((Object)1L, (Object)"ddd"), KeyValue.pair((Object)0L, (Object)"eee"), KeyValue.pair((Object)1L, (Object)"fff"), KeyValue.pair((Object)0L, (Object)"ggg"), KeyValue.pair((Object)1L, (Object)"hhh"), KeyValue.pair((Object)0L, (Object)"iii"), KeyValue.pair((Object)1L, (Object)"jjj"));
        for (KeyValue record : records) {
            mockTime.sleep(10L);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), this.producerConfig, mockTime.milliseconds());
        }
    }

    @Test
    public void testResetWhenInternalTopicsAreSpecified() throws Exception {
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithIntermediateTopic(true, OUTPUT_TOPIC_2), this.streamsConfig);
        streams.start();
        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        streams.cleanUp();
        List internalTopics = cluster.getAllTopicsInCluster().stream().filter(StreamsResetter::matchesInternalTopicFormat).collect(Collectors.toList());
        this.cleanGlobal(false, "--internal-topics", String.join((CharSequence)",", internalTopics.subList(1, internalTopics.size())), appID);
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.assertInternalTopicsGotDeleted((String)internalTopics.get(0));
    }

    @Test
    public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        this.cleanGlobal(false, null, null, appID);
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.assertInternalTopicsGotDeleted(null);
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.cleanGlobal(false, null, null, appID);
    }

    @Test
    public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
        this.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(false);
    }

    @Test
    public void testReprocessingFromScratchAfterResetWithIntermediateInternalTopic() throws Exception {
        this.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(true);
    }

    private void testReprocessingFromScratchAfterResetWithIntermediateUserTopic(boolean useRepartitioned) throws Exception {
        if (!useRepartitioned) {
            cluster.createTopic(INTERMEDIATE_USER_TOPIC);
        }
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2), this.streamsConfig);
        streams.start();
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        List result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC_2, 40);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        mockTime.sleep(1L);
        KeyValue badMessage = new KeyValue((Object)-1L, (Object)"badRecord-ShouldBeSkipped");
        if (!useRepartitioned) {
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INTERMEDIATE_USER_TOPIC, Collections.singleton(badMessage), this.producerConfig, mockTime.milliseconds());
        }
        streams = new KafkaStreams(this.setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2_RERUN), this.streamsConfig);
        streams.cleanUp();
        this.cleanGlobal(!useRepartitioned, null, null, appID);
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.assertInternalTopicsGotDeleted(useRepartitioned ? null : INTERMEDIATE_USER_TOPIC);
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        List resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        MatcherAssert.assertThat(resultRerun2, (Matcher)CoreMatchers.equalTo(result2));
        if (!useRepartitioned) {
            Properties props = TestUtils.consumerConfig((String)cluster.bootstrapServers(), (String)(appID + "-result-consumer"), LongDeserializer.class, StringDeserializer.class, (Properties)this.commonClientConfig);
            List resultIntermediate = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(props, INTERMEDIATE_USER_TOPIC, 21);
            for (int i = 0; i < 10; ++i) {
                MatcherAssert.assertThat(resultIntermediate.get(i), (Matcher)CoreMatchers.equalTo(resultIntermediate.get(i + 11)));
            }
            MatcherAssert.assertThat(resultIntermediate.get(10), (Matcher)CoreMatchers.equalTo((Object)badMessage));
        }
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.cleanGlobal(!useRepartitioned, null, null, appID);
        if (!useRepartitioned) {
            cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
        }
    }

    private Topology setupTopologyWithIntermediateTopic(boolean useRepartitioned, String outputTopic2) {
        KStream stream;
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(INPUT_TOPIC);
        input.map(KeyValue::new).groupByKey().count().toStream().to(OUTPUT_TOPIC, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        if (useRepartitioned) {
            stream = input.repartition();
        } else {
            input.to(INTERMEDIATE_USER_TOPIC);
            stream = builder.stream(INTERMEDIATE_USER_TOPIC);
        }
        stream.groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(35L)).advanceBy(Duration.ofMillis(10L))).count().toStream().map((key, value) -> new KeyValue((Object)(key.window().start() + key.window().end()), value)).to(outputTopic2, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        return builder.build();
    }

    protected Topology setupTopologyWithoutIntermediateUserTopic() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(INPUT_TOPIC);
        input.map((key, value) -> new KeyValue(key, key)).to(OUTPUT_TOPIC, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        return builder.build();
    }

    protected boolean tryCleanGlobal(boolean withIntermediateTopics, String resetScenario, String resetScenarioArg, String appID) throws Exception {
        Map<String, Object> sslConfig;
        ArrayList<String> parameterList = new ArrayList<String>(Arrays.asList("--application-id", appID, "--bootstrap-servers", cluster.bootstrapServers(), "--input-topics", INPUT_TOPIC));
        if (withIntermediateTopics) {
            parameterList.add("--intermediate-topics");
            parameterList.add(INTERMEDIATE_USER_TOPIC);
        }
        if ((sslConfig = this.getClientSslConfig()) != null) {
            File configFile = TestUtils.tempFile();
            BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));
            writer.write("security.protocol=SSL\n");
            writer.write("ssl.truststore.location=" + sslConfig.get("ssl.truststore.location") + "\n");
            writer.write("ssl.truststore.password=" + ((Password)sslConfig.get("ssl.truststore.password")).value() + "\n");
            writer.close();
            parameterList.add("--config-file");
            parameterList.add(configFile.getAbsolutePath());
        }
        if (resetScenario != null) {
            parameterList.add(resetScenario);
        }
        if (resetScenarioArg != null) {
            parameterList.add(resetScenarioArg);
        }
        String[] parameters = parameterList.toArray(new String[0]);
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        return new StreamsResetter().run(parameters, cleanUpConfig) == 0;
    }

    protected void cleanGlobal(boolean withIntermediateTopics, String resetScenario, String resetScenarioArg, String appID) throws Exception {
        boolean cleanResult = this.tryCleanGlobal(withIntermediateTopics, resetScenario, resetScenarioArg, appID);
        Assert.assertTrue((boolean)cleanResult);
    }

    protected void assertInternalTopicsGotDeleted(String additionalExistingTopic) throws Exception {
        if (additionalExistingTopic != null) {
            cluster.waitForRemainingTopics(30000L, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets", additionalExistingTopic);
        } else {
            cluster.waitForRemainingTopics(30000L, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets");
        }
    }
}

