/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@Tag(value="integration")
public class PauseResumeIntegrationTest {
    private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45L);
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static Properties producerConfig;
    private static Properties consumerConfig;
    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE;
    private static final String INPUT_STREAM_1 = "input-stream-1";
    private static final String INPUT_STREAM_2 = "input-stream-2";
    private static final String OUTPUT_STREAM_1 = "output-stream-1";
    private static final String OUTPUT_STREAM_2 = "output-stream-2";
    private static final String TOPOLOGY1 = "topology1";
    private static final String TOPOLOGY2 = "topology2";
    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA;
    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA;
    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2;
    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA_ALL;
    private String appId;
    private KafkaStreams kafkaStreams;
    private KafkaStreams kafkaStreams2;
    private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper;

    private static Stream<Boolean> parameters() {
        return Stream.of(Boolean.TRUE, Boolean.FALSE);
    }

    @BeforeAll
    public static void startCluster() throws Exception {
        CLUSTER.start();
        producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class);
        consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void createTopics(TestInfo testInfo) throws InterruptedException {
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 1, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2);
        this.appId = IntegrationTestUtils.safeUniqueTestName(PauseResumeIntegrationTest.class, testInfo);
    }

    private Properties props(boolean stateUpdaterEnabled) {
        Properties properties = new Properties();
        properties.put("application.id", this.appId);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory((String)this.appId).getPath());
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.Long().getClass());
        properties.put("commit.interval.ms", (Object)1000L);
        properties.put("statestore.cache.max.bytes", (Object)0);
        properties.put("auto.offset.reset", "earliest");
        properties.put("heartbeat.interval.ms", (Object)100);
        properties.put("session.timeout.ms", (Object)1000);
        properties.put("__state.updater.enabled__", (Object)stateUpdaterEnabled);
        return properties;
    }

    @AfterEach
    public void shutdown() throws InterruptedException {
        for (KafkaStreams streams : Arrays.asList(this.kafkaStreams, this.kafkaStreams2, this.streamsNamedTopologyWrapper)) {
            if (streams == null) continue;
            streams.close(Duration.ofSeconds(30L));
        }
    }

    private static void produceToInputTopics(String topic, Collection<KeyValue<String, Long>> records) {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, (Time)PauseResumeIntegrationTest.CLUSTER.time);
    }

    @ParameterizedTest
    @MethodSource(value={"parameters"})
    public void shouldPauseAndResumeKafkaStreams(boolean stateUpdaterEnabled) throws Exception {
        this.kafkaStreams = this.buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
        this.kafkaStreams.start();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        this.awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        this.kafkaStreams.pause();
        Assert.assertTrue((boolean)this.kafkaStreams.isPaused());
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        IntegrationTestUtils.waitUntilStreamsHasPolled(this.kafkaStreams, 2);
        this.assertTopicSize(OUTPUT_STREAM_1, 5);
        this.kafkaStreams.resume();
        Assert.assertFalse((boolean)this.kafkaStreams.isPaused());
        this.awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
        this.assertTopicSize(OUTPUT_STREAM_1, 10);
    }

    @ParameterizedTest
    @MethodSource(value={"parameters"})
    public void shouldAllowForTopologiesToStartPaused(boolean stateUpdaterEnabled) throws Exception {
        this.kafkaStreams = this.buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
        this.kafkaStreams.pause();
        this.kafkaStreams.start();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.REBALANCING, STARTUP_TIMEOUT);
        Assert.assertTrue((boolean)this.kafkaStreams.isPaused());
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        IntegrationTestUtils.waitUntilStreamsHasPolled(this.kafkaStreams, 2);
        this.assertTopicSize(OUTPUT_STREAM_1, 0);
        this.kafkaStreams.resume();
        Assert.assertFalse((boolean)this.kafkaStreams.isPaused());
        this.awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        this.assertTopicSize(OUTPUT_STREAM_1, 5);
    }

    @ParameterizedTest
    @MethodSource(value={"parameters"})
    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(boolean stateUpdaterEnabled) throws Exception {
        this.streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(this.props(stateUpdaterEnabled));
        NamedTopologyBuilder builder1 = this.getNamedTopologyBuilder1();
        NamedTopologyBuilder builder2 = this.getNamedTopologyBuilder2();
        this.streamsNamedTopologyWrapper.start(Arrays.asList(builder1.build(), builder2.build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streamsNamedTopologyWrapper), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        this.awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        this.awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
        this.assertTopicSize(OUTPUT_STREAM_1, 5);
        this.assertTopicSize(OUTPUT_STREAM_2, 5);
        this.streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
        Assert.assertTrue((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        Assert.assertFalse((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
        Assert.assertFalse((boolean)this.streamsNamedTopologyWrapper.isPaused());
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        this.awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA2);
        this.assertTopicSize(OUTPUT_STREAM_1, 5);
        this.assertTopicSize(OUTPUT_STREAM_2, 10);
        this.streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
        Assert.assertFalse((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        this.awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
    }

    @ParameterizedTest
    @MethodSource(value={"parameters"})
    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(boolean stateUpdaterEnabled) throws Exception {
        this.streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(this.props(stateUpdaterEnabled));
        NamedTopologyBuilder builder1 = this.getNamedTopologyBuilder1();
        NamedTopologyBuilder builder2 = this.getNamedTopologyBuilder2();
        this.streamsNamedTopologyWrapper.start(Arrays.asList(builder1.build(), builder2.build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streamsNamedTopologyWrapper), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        this.awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        this.awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
        this.streamsNamedTopologyWrapper.pause();
        Assert.assertTrue((boolean)this.streamsNamedTopologyWrapper.isPaused());
        Assert.assertTrue((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        Assert.assertTrue((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        IntegrationTestUtils.waitUntilStreamsHasPolled((KafkaStreams)this.streamsNamedTopologyWrapper, 2);
        this.assertTopicSize(OUTPUT_STREAM_1, 5);
        this.assertTopicSize(OUTPUT_STREAM_2, 5);
        this.streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
        Assert.assertFalse((boolean)this.streamsNamedTopologyWrapper.isPaused());
        Assert.assertFalse((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        Assert.assertTrue((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
        this.awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
        this.assertTopicSize(OUTPUT_STREAM_1, 10);
        this.assertTopicSize(OUTPUT_STREAM_2, 5);
    }

    @ParameterizedTest
    @MethodSource(value={"parameters"})
    public void shouldAllowForNamedTopologiesToStartPaused(boolean stateUpdaterEnabled) throws Exception {
        this.streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(this.props(stateUpdaterEnabled));
        NamedTopologyBuilder builder1 = this.getNamedTopologyBuilder1();
        NamedTopologyBuilder builder2 = this.getNamedTopologyBuilder2();
        this.streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
        this.streamsNamedTopologyWrapper.start(Arrays.asList(builder1.build(), builder2.build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streamsNamedTopologyWrapper), KafkaStreams.State.REBALANCING, STARTUP_TIMEOUT);
        Assert.assertFalse((boolean)this.streamsNamedTopologyWrapper.isPaused());
        Assert.assertTrue((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        Assert.assertFalse((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        this.assertTopicSize(OUTPUT_STREAM_1, 0);
        this.assertTopicSize(OUTPUT_STREAM_2, 0);
        this.streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
        Assert.assertFalse((boolean)this.streamsNamedTopologyWrapper.isPaused());
        Assert.assertFalse((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        Assert.assertFalse((boolean)this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
        this.awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        this.awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
    }

    @ParameterizedTest
    @MethodSource(value={"parameters"})
    public void pauseResumeShouldWorkAcrossInstances(boolean stateUpdaterEnabled) throws Exception {
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        this.kafkaStreams = this.buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
        this.kafkaStreams.pause();
        this.kafkaStreams.start();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.REBALANCING, STARTUP_TIMEOUT);
        Assert.assertTrue((boolean)this.kafkaStreams.isPaused());
        this.kafkaStreams2 = this.buildKafkaStreams(OUTPUT_STREAM_2, stateUpdaterEnabled);
        this.kafkaStreams2.pause();
        this.kafkaStreams2.start();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams2), KafkaStreams.State.REBALANCING, STARTUP_TIMEOUT);
        Assert.assertTrue((boolean)this.kafkaStreams2.isPaused());
        this.assertTopicSize(OUTPUT_STREAM_1, 0);
        this.kafkaStreams2.close();
        this.kafkaStreams2.cleanUp();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams2), KafkaStreams.State.NOT_RUNNING, STARTUP_TIMEOUT);
        this.kafkaStreams.resume();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        this.awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
    }

    @ParameterizedTest
    @MethodSource(value={"parameters"})
    public void pausedTopologyShouldNotRestoreStateStores(boolean stateUpdaterEnabled) throws Exception {
        Properties properties1 = this.props(stateUpdaterEnabled);
        properties1.put("num.standby.replicas", (Object)1);
        Properties properties2 = this.props(stateUpdaterEnabled);
        properties2.put("num.standby.replicas", (Object)1);
        PauseResumeIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        this.kafkaStreams = this.buildKafkaStreams(OUTPUT_STREAM_1, properties1);
        this.kafkaStreams2 = this.buildKafkaStreams(OUTPUT_STREAM_1, properties2);
        this.kafkaStreams.start();
        this.kafkaStreams2.start();
        IntegrationTestUtils.waitForApplicationState(Arrays.asList(this.kafkaStreams, this.kafkaStreams2), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        this.awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        this.kafkaStreams.close();
        this.kafkaStreams2.close();
        this.kafkaStreams = this.buildKafkaStreams(OUTPUT_STREAM_1, properties1);
        this.kafkaStreams2 = this.buildKafkaStreams(OUTPUT_STREAM_1, properties2);
        this.kafkaStreams.cleanUp();
        this.kafkaStreams2.cleanUp();
        this.kafkaStreams.pause();
        this.kafkaStreams2.pause();
        this.kafkaStreams.start();
        this.kafkaStreams2.start();
        IntegrationTestUtils.waitForApplicationState(Arrays.asList(this.kafkaStreams, this.kafkaStreams2), KafkaStreams.State.REBALANCING, STARTUP_TIMEOUT);
        this.assertStreamsLocalStoreLagStaysConstant(this.kafkaStreams);
        this.assertStreamsLocalStoreLagStaysConstant(this.kafkaStreams2);
    }

    private void assertStreamsLocalStoreLagStaysConstant(KafkaStreams streams) throws InterruptedException {
        TestUtils.waitForCondition(() -> !streams.allLocalStorePartitionLags().isEmpty(), (String)"Lags for local store partitions were not found within the timeout!");
        IntegrationTestUtils.waitUntilStreamsHasPolled(streams, 2);
        long stateStoreLag1 = ((LagInfo)((Map)streams.allLocalStorePartitionLags().get("test-store")).get(0)).offsetLag();
        IntegrationTestUtils.waitUntilStreamsHasPolled(streams, 2);
        long stateStoreLag2 = ((LagInfo)((Map)streams.allLocalStorePartitionLags().get("test-store")).get(0)).offsetLag();
        Assert.assertTrue((stateStoreLag1 > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)stateStoreLag1, (long)stateStoreLag2);
    }

    private KafkaStreams buildKafkaStreams(String outputTopic, boolean stateUpdaterEnabled) {
        return this.buildKafkaStreams(outputTopic, this.props(stateUpdaterEnabled));
    }

    private KafkaStreams buildKafkaStreams(String outputTopic, Properties properties) {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(INPUT_STREAM_1).groupByKey().count(Materialized.as((String)"test-store")).toStream().to(outputTopic);
        return new KafkaStreams(builder.build(properties), properties);
    }

    private void assertTopicSize(String topicName, int size) {
        Assert.assertEquals((long)IntegrationTestUtils.getTopicSize(consumerConfig, topicName), (long)size);
    }

    private void awaitOutput(String topicName, int count, List<KeyValue<String, Long>> output) throws Exception {
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, topicName, count), (Matcher)CoreMatchers.equalTo(output));
    }

    private NamedTopologyBuilder getNamedTopologyBuilder1() {
        NamedTopologyBuilder builder1 = this.streamsNamedTopologyWrapper.newNamedTopologyBuilder(TOPOLOGY1);
        builder1.stream(INPUT_STREAM_1).groupByKey().count().toStream().to(OUTPUT_STREAM_1);
        return builder1;
    }

    private NamedTopologyBuilder getNamedTopologyBuilder2() {
        NamedTopologyBuilder builder2 = this.streamsNamedTopologyWrapper.newNamedTopologyBuilder(TOPOLOGY2);
        builder2.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        return builder2;
    }

    static {
        IN_MEMORY_STORE = Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"store"));
        STANDARD_INPUT_DATA = Arrays.asList(KeyValue.pair((Object)"A", (Object)100L), KeyValue.pair((Object)"B", (Object)200L), KeyValue.pair((Object)"A", (Object)300L), KeyValue.pair((Object)"C", (Object)400L), KeyValue.pair((Object)"C", (Object)-50L));
        COUNT_OUTPUT_DATA = Arrays.asList(KeyValue.pair((Object)"A", (Object)1L), KeyValue.pair((Object)"B", (Object)1L), KeyValue.pair((Object)"A", (Object)2L), KeyValue.pair((Object)"C", (Object)1L), KeyValue.pair((Object)"C", (Object)2L));
        COUNT_OUTPUT_DATA2 = Arrays.asList(KeyValue.pair((Object)"A", (Object)3L), KeyValue.pair((Object)"B", (Object)2L), KeyValue.pair((Object)"A", (Object)4L), KeyValue.pair((Object)"C", (Object)3L), KeyValue.pair((Object)"C", (Object)4L));
        COUNT_OUTPUT_DATA_ALL = new ArrayList<KeyValue<String, Long>>(){
            {
                this.addAll(COUNT_OUTPUT_DATA);
                this.addAll(COUNT_OUTPUT_DATA2);
            }
        };
    }
}

