/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=600L)
@Tag(value="integration")
public class RestoreIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(RestoreIntegrationTest.class);
    private static final Duration RESTORATION_DELAY = Duration.ofSeconds(1L);
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private String appId;
    private String inputStream;
    private final int numberOfKeys = 10000;
    private KafkaStreams kafkaStreams;
    private final List<Properties> streamsConfigurations = new ArrayList<Properties>();

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void createTopics(TestInfo testInfo) throws InterruptedException {
        this.appId = IntegrationTestUtils.safeUniqueTestName(testInfo);
        this.inputStream = this.appId + "-input-stream";
        CLUSTER.createTopic(this.inputStream, 2, 1);
    }

    private Properties props(boolean stateUpdaterEnabled) {
        return this.props(Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"__state.updater.enabled__", (Object)stateUpdaterEnabled)})));
    }

    private Properties props(Properties extraProperties) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", this.appId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("statestore.cache.max.bytes", (Object)0);
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory((String)this.appId).getPath());
        streamsConfiguration.put("default.key.serde", Serdes.IntegerSerde.class);
        streamsConfiguration.put("default.value.serde", Serdes.IntegerSerde.class);
        streamsConfiguration.put("commit.interval.ms", (Object)1000L);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        streamsConfiguration.putAll((Map<?, ?>)extraProperties);
        this.streamsConfigurations.add(streamsConfiguration);
        return streamsConfiguration;
    }

    @AfterEach
    public void shutdown() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfigurations);
        this.streamsConfigurations.clear();
    }

    @Test
    public void shouldRestoreNullRecord() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        String applicationId = "restoration-test-app";
        String stateStoreName = "stateStore";
        String inputTopic = "input";
        String outputTopic = "output";
        Properties props = new Properties();
        Properties streamsConfiguration = StreamsTestUtils.getStreamsConfig("restoration-test-app", CLUSTER.bootstrapServers(), Serdes.IntegerSerde.class.getName(), Serdes.BytesSerde.class.getName(), props);
        CLUSTER.createTopics("input");
        CLUSTER.createTopics("output");
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
        builder.table("input", Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)"stateStore")).withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Bytes()).withCachingDisabled()).toStream().to("output");
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, BytesSerializer.class);
        List initialKeyValues = Arrays.asList(KeyValue.pair((Object)3, (Object)new Bytes(new byte[]{3})), KeyValue.pair((Object)3, null), KeyValue.pair((Object)1, (Object)new Bytes(new byte[]{1})));
        IntegrationTestUtils.produceKeyValuesSynchronously("input", initialKeyValues, producerConfig, (Time)new MockTime());
        KafkaStreams streams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
        streams.start();
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, BytesDeserializer.class);
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, "output", initialKeyValues);
        streams.close();
        streams.cleanUp();
        List newKeyValues = Collections.singletonList(KeyValue.pair((Object)2, (Object)new Bytes(new byte[3])));
        IntegrationTestUtils.produceKeyValuesSynchronously("input", newKeyValues, producerConfig, (Time)new MockTime());
        streams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
        streams.start();
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, "output", newKeyValues);
        streams.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void shouldRestoreStateFromSourceTopicForReadOnlyStore(boolean stateUpdaterEnabled) throws Exception {
        AtomicInteger numReceived = new AtomicInteger(0);
        Topology topology = new Topology();
        Properties props = this.props(stateUpdaterEnabled);
        int offsetLimitDelta = 1000;
        int offsetCheckpointed = 1000;
        this.createStateForRestoration(this.inputStream, 0);
        this.setCommittedOffset(this.inputStream, 1000);
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig((Map)props), (Time)new MockTime(), true, false);
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(this.inputStream, 0), 999L));
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(this.inputStream, 1), 999L));
        CountDownLatch startupLatch = new CountDownLatch(1);
        CountDownLatch shutdownLatch = new CountDownLatch(1);
        topology.addReadOnlyStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"store"), (Serde)new Serdes.IntegerSerde(), (Serde)new Serdes.StringSerde()), "readOnlySource", (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer(), this.inputStream, "readOnlyProcessor", () -> new ReadOnlyStoreProcessor(numReceived, 1000, shutdownLatch));
        this.kafkaStreams = new KafkaStreams(topology, props);
        this.kafkaStreams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                startupLatch.countDown();
            }
        });
        AtomicLong restored = new AtomicLong(0L);
        this.kafkaStreams.setGlobalStateRestoreListener((StateRestoreListener)new IntegrationTestUtils.TrackingStateRestoreListener(restored));
        this.kafkaStreams.start();
        Assertions.assertTrue((boolean)startupLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat((Object)restored.get(), (Matcher)IsEqual.equalTo((Object)6000L));
        Assertions.assertTrue((boolean)shutdownLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat((Object)numReceived.get(), (Matcher)IsEqual.equalTo((Object)2000));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void shouldRestoreStateFromSourceTopicForGlobalTable(boolean stateUpdaterEnabled) throws Exception {
        AtomicInteger numReceived = new AtomicInteger(0);
        StreamsBuilder builder = new StreamsBuilder();
        Properties props = this.props(stateUpdaterEnabled);
        props.put("topology.optimization", "all");
        int offsetLimitDelta = 1000;
        int offsetCheckpointed = 1000;
        this.createStateForRestoration(this.inputStream, 0);
        this.setCommittedOffset(this.inputStream, 1000);
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig((Map)props), (Time)new MockTime(), true, false);
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(this.inputStream, 0), 999L));
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(this.inputStream, 1), 999L));
        CountDownLatch startupLatch = new CountDownLatch(1);
        CountDownLatch shutdownLatch = new CountDownLatch(1);
        builder.table(this.inputStream, Materialized.as((String)"store").withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Integer())).toStream().foreach((key, value) -> {
            if (numReceived.incrementAndGet() == 2000) {
                shutdownLatch.countDown();
            }
        });
        this.kafkaStreams = new KafkaStreams(builder.build(props), props);
        this.kafkaStreams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                startupLatch.countDown();
            }
        });
        AtomicLong restored = new AtomicLong(0L);
        this.kafkaStreams.setGlobalStateRestoreListener((StateRestoreListener)new IntegrationTestUtils.TrackingStateRestoreListener(restored));
        this.kafkaStreams.start();
        Assertions.assertTrue((boolean)startupLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat((Object)restored.get(), (Matcher)IsEqual.equalTo((Object)6000L));
        Assertions.assertTrue((boolean)shutdownLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat((Object)numReceived.get(), (Matcher)IsEqual.equalTo((Object)2000));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void shouldRestoreStateFromChangelogTopic(boolean stateUpdaterEnabled) throws Exception {
        String changelog = this.appId + "-store-changelog";
        CLUSTER.createTopic(changelog, 2, 1);
        AtomicInteger numReceived = new AtomicInteger(0);
        StreamsBuilder builder = new StreamsBuilder();
        Properties props = this.props(stateUpdaterEnabled);
        int offsetCheckpointed = 1000;
        this.createStateForRestoration(changelog, 0);
        this.createStateForRestoration(this.inputStream, 10000);
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig((Map)props), (Time)new MockTime(), true, false);
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(changelog, 0), 999L));
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(changelog, 1), 999L));
        CountDownLatch startupLatch = new CountDownLatch(1);
        CountDownLatch shutdownLatch = new CountDownLatch(1);
        builder.table(this.inputStream, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), Materialized.as((String)"store")).toStream().foreach((key, value) -> {
            if (numReceived.incrementAndGet() == 10000) {
                shutdownLatch.countDown();
            }
        });
        this.kafkaStreams = new KafkaStreams(builder.build(), props);
        this.kafkaStreams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                startupLatch.countDown();
            }
        });
        AtomicLong restored = new AtomicLong(0L);
        this.kafkaStreams.setGlobalStateRestoreListener((StateRestoreListener)new IntegrationTestUtils.TrackingStateRestoreListener(restored));
        this.kafkaStreams.start();
        Assertions.assertTrue((boolean)startupLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat((Object)restored.get(), (Matcher)IsEqual.equalTo((Object)8000L));
        Assertions.assertTrue((boolean)shutdownLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat((Object)numReceived.get(), (Matcher)IsEqual.equalTo((Object)10000));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void shouldSuccessfullyStartWhenLoggingDisabled(boolean stateUpdaterEnabled) throws InterruptedException {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(this.inputStream);
        stream.groupByKey().reduce(Integer::sum, Materialized.as((String)"reduce-store").withLoggingDisabled());
        CountDownLatch startupLatch = new CountDownLatch(1);
        this.kafkaStreams = new KafkaStreams(builder.build(), this.props(stateUpdaterEnabled));
        this.kafkaStreams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                startupLatch.countDown();
            }
        });
        this.kafkaStreams.start();
        Assertions.assertTrue((boolean)startupLatch.await(30L, TimeUnit.SECONDS));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void shouldProcessDataFromStoresWithLoggingDisabled(boolean stateUpdaterEnabled) throws InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputStream, Arrays.asList(KeyValue.pair((Object)1, (Object)1), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)3, (Object)3)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), (Time)RestoreIntegrationTest.CLUSTER.time);
        KeyValueBytesStoreSupplier lruMapSupplier = Stores.lruMap((String)this.inputStream, (int)10);
        StoreBuilder storeBuilder = new KeyValueStoreBuilder(lruMapSupplier, Serdes.Integer(), Serdes.Integer(), (Time)RestoreIntegrationTest.CLUSTER.time).withLoggingDisabled();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(storeBuilder);
        KStream stream = streamsBuilder.stream(this.inputStream);
        CountDownLatch processorLatch = new CountDownLatch(3);
        stream.process(() -> new KeyValueStoreProcessor(this.inputStream, processorLatch), new String[]{this.inputStream});
        Topology topology = streamsBuilder.build();
        this.kafkaStreams = new KafkaStreams(topology, this.props(stateUpdaterEnabled));
        CountDownLatch latch = new CountDownLatch(1);
        this.kafkaStreams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                latch.countDown();
            }
        });
        this.kafkaStreams.start();
        latch.await(30L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)processorLatch.await(30L, TimeUnit.SECONDS));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(boolean stateUpdaterEnabled) throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(this.inputStream, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), Materialized.as((KeyValueBytesStoreSupplier)RestoreIntegrationTest.getCloseCountingStore("store")));
        this.createStateForRestoration(this.inputStream, 0);
        Properties props1 = this.props(stateUpdaterEnabled);
        props1.put("num.standby.replicas", (Object)1);
        props1.put("state.dir", TestUtils.tempDirectory((String)(this.appId + "-1")).getPath());
        IntegrationTestUtils.purgeLocalStreamsState(props1);
        KafkaStreams streams1 = new KafkaStreams(builder.build(), props1);
        Properties props2 = this.props(stateUpdaterEnabled);
        props2.put("num.standby.replicas", (Object)1);
        props2.put("state.dir", TestUtils.tempDirectory((String)(this.appId + "-2")).getPath());
        IntegrationTestUtils.purgeLocalStreamsState(props2);
        KafkaStreams streams2 = new KafkaStreams(builder.build(), props2);
        Set<KafkaStreams.State> transitionedStates1 = Collections.newSetFromMap(new ConcurrentHashMap());
        Set<KafkaStreams.State> transitionedStates2 = Collections.newSetFromMap(new ConcurrentHashMap());
        IntegrationTestUtils.TrackingStateRestoreListener restoreListener = new IntegrationTestUtils.TrackingStateRestoreListener();
        IntegrationTestUtils.TrackingStandbyUpdateListener standbyUpdateListener = new IntegrationTestUtils.TrackingStandbyUpdateListener();
        streams1.setStandbyUpdateListener((StandbyUpdateListener)standbyUpdateListener);
        streams2.setStandbyUpdateListener((StandbyUpdateListener)standbyUpdateListener);
        streams1.setGlobalStateRestoreListener((StateRestoreListener)restoreListener);
        streams1.setStateListener((newState, oldState) -> transitionedStates1.add(newState));
        streams2.setStateListener((newState, oldState) -> transitionedStates2.add(newState));
        try {
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(streams1, streams2), Duration.ofSeconds(60L));
            IntegrationTestUtils.waitForCompletion(streams1, 1, 30000L);
            IntegrationTestUtils.waitForCompletion(streams2, 1, 30000L);
            IntegrationTestUtils.waitForStandbyCompletion(streams1, 1, 30000L);
            IntegrationTestUtils.waitForStandbyCompletion(streams2, 1, 30000L);
        }
        catch (Exception e) {
            streams1.close();
            streams2.close();
        }
        int initialStoreCloseCount = CloseCountingInMemoryStore.numStoresClosed();
        long initialNunRestoredCount = restoreListener.totalNumRestored();
        transitionedStates1.clear();
        transitionedStates2.clear();
        try {
            streams2.close();
            this.waitForTransitionTo(transitionedStates2, KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60L));
            this.waitForTransitionTo(transitionedStates1, KafkaStreams.State.REBALANCING, Duration.ofSeconds(60L));
            this.waitForTransitionTo(transitionedStates1, KafkaStreams.State.RUNNING, Duration.ofSeconds(60L));
            IntegrationTestUtils.waitForCompletion(streams1, 1, 30000L);
            IntegrationTestUtils.waitForStandbyCompletion(streams1, 1, 30000L);
            MatcherAssert.assertThat((Object)restoreListener.totalNumRestored(), (Matcher)CoreMatchers.equalTo((Object)initialNunRestoredCount));
            MatcherAssert.assertThat((Object)CloseCountingInMemoryStore.numStoresClosed(), (Matcher)IsEqual.equalTo((Object)(initialStoreCloseCount + 2)));
        }
        finally {
            streams1.close();
        }
        this.waitForTransitionTo(transitionedStates1, KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60L));
        MatcherAssert.assertThat((Object)CloseCountingInMemoryStore.numStoresClosed(), (Matcher)CoreMatchers.equalTo((Object)(initialStoreCloseCount + 4)));
    }

    @Test
    public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception {
        String inputTopic = "inputTopic";
        String outputTopic = "outputTopic";
        CLUSTER.createTopic("inputTopic", 5, 1);
        CLUSTER.createTopic("outputTopic", 5, 1);
        Map kafkaStreams1Configuration = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"state.dir", (Object)(TestUtils.tempDirectory((String)this.appId).getPath() + "-ks1")), Utils.mkEntry((Object)"client.id", (Object)(this.appId + "-ks1")), Utils.mkEntry((Object)StreamsConfig.restoreConsumerPrefix((String)"max.poll.records"), (Object)5)});
        Map kafkaStreams2Configuration = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"state.dir", (Object)(TestUtils.tempDirectory((String)this.appId).getPath() + "-ks2")), Utils.mkEntry((Object)"client.id", (Object)(this.appId + "-ks2")), Utils.mkEntry((Object)StreamsConfig.restoreConsumerPrefix((String)"max.poll.records"), (Object)5)});
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("inputTopic", Consumed.with((Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST)).groupByKey().reduce((oldVal, newVal) -> newVal).toStream().to("outputTopic");
        List<KeyValue<Integer, Integer>> sampleData = IntStream.range(0, 100).mapToObj(i -> new KeyValue((Object)i, (Object)i)).collect(Collectors.toList());
        this.sendEvents("inputTopic", sampleData);
        this.kafkaStreams = this.startKafkaStreams(builder, null, kafkaStreams1Configuration);
        this.validateReceivedMessages(sampleData, "outputTopic");
        this.kafkaStreams.close(Duration.ofMillis(60000L));
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfigurations);
        TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener("ks1", RESTORATION_DELAY);
        this.kafkaStreams = this.startKafkaStreams(builder, kafkaStreams1StateRestoreListener, kafkaStreams1Configuration);
        Assertions.assertTrue((boolean)kafkaStreams1StateRestoreListener.awaitUntilRestorationStarts());
        Assertions.assertTrue((boolean)kafkaStreams1StateRestoreListener.awaitUntilBatchRestoredIsCalled());
        TestStateRestoreListener kafkaStreams2StateRestoreListener = new TestStateRestoreListener("ks2", RESTORATION_DELAY);
        try (KafkaStreams kafkaStreams2 = this.startKafkaStreams(builder, kafkaStreams2StateRestoreListener, kafkaStreams2Configuration);){
            TestUtils.waitForCondition(() -> KafkaStreams.State.RUNNING == kafkaStreams2.state(), (long)60000L, () -> "kafkaStreams2 never transitioned to a RUNNING state.");
            Assertions.assertTrue((boolean)kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends());
            Assertions.assertTrue((boolean)kafkaStreams2StateRestoreListener.awaitUntilRestorationStarts());
            Assertions.assertTrue((boolean)kafkaStreams1StateRestoreListener.awaitUntilRestorationEnds());
            Assertions.assertTrue((boolean)kafkaStreams2StateRestoreListener.awaitUntilRestorationEnds());
        }
    }

    private void validateReceivedMessages(List<KeyValue<Integer, Integer>> expectedRecords, String outputTopic) throws Exception {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "group-" + this.appId);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", IntegerDeserializer.class.getName());
        consumerProperties.setProperty("value.deserializer", IntegerDeserializer.class.getName());
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerProperties, outputTopic, expectedRecords);
    }

    private KafkaStreams startKafkaStreams(StreamsBuilder streamsBuilder, StateRestoreListener stateRestoreListener, Map<String, Object> extraConfiguration) {
        Properties streamsConfiguration = this.props(Utils.mkObjectProperties(extraConfiguration));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration);
        kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
        kafkaStreams.start();
        return kafkaStreams;
    }

    private void sendEvents(String topic, List<KeyValue<Integer, Integer>> events) {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, events, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), (Time)RestoreIntegrationTest.CLUSTER.time);
    }

    private static KeyValueBytesStoreSupplier getCloseCountingStore(final String name) {
        return new KeyValueBytesStoreSupplier(){

            public String name() {
                return name;
            }

            public KeyValueStore<Bytes, byte[]> get() {
                return new CloseCountingInMemoryStore(name);
            }

            public String metricsScope() {
                return "close-counting";
            }
        };
    }

    private void createStateForRestoration(String changelogTopic, int startingOffset) {
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        try (KafkaProducer producer = new KafkaProducer(producerConfig, (Serializer)new IntegerSerializer(), (Serializer)new IntegerSerializer());){
            for (int i = 0; i < 10000; ++i) {
                int offset = startingOffset + i;
                producer.send(new ProducerRecord(changelogTopic, (Object)offset, (Object)offset));
            }
        }
    }

    private void setCommittedOffset(String topic, int limitDelta) {
        Properties consumerConfig = new Properties();
        consumerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerConfig.put("group.id", this.appId);
        consumerConfig.put("client.id", "commit-consumer");
        consumerConfig.put("key.deserializer", IntegerDeserializer.class);
        consumerConfig.put("value.deserializer", IntegerDeserializer.class);
        KafkaConsumer consumer = new KafkaConsumer(consumerConfig);
        List<TopicPartition> partitions = Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1));
        consumer.assign(partitions);
        consumer.seekToEnd(partitions);
        for (TopicPartition partition : partitions) {
            long position = consumer.position(partition);
            consumer.seek(partition, position - (long)limitDelta);
        }
        consumer.commitSync();
        consumer.close();
    }

    private void waitForTransitionTo(Set<KafkaStreams.State> observed, KafkaStreams.State state, Duration timeout) throws Exception {
        TestUtils.waitForCondition(() -> observed.contains(state), (long)timeout.toMillis(), () -> "Client did not transition to " + state + " on time. Observed transitions: " + observed);
    }

    private static class ReadOnlyStoreProcessor
    implements Processor<Integer, String, Void, Void> {
        private final AtomicInteger numReceived;
        private final int offsetLimitDelta;
        private final CountDownLatch shutdownLatch;
        KeyValueStore<Integer, String> store;

        public ReadOnlyStoreProcessor(AtomicInteger numReceived, int offsetLimitDelta, CountDownLatch shutdownLatch) {
            this.numReceived = numReceived;
            this.offsetLimitDelta = offsetLimitDelta;
            this.shutdownLatch = shutdownLatch;
        }

        public void init(ProcessorContext<Void, Void> context) {
            this.store = (KeyValueStore)context.getStateStore("store");
        }

        public void process(Record<Integer, String> record) {
            this.store.put(record.key(), record.value());
            if (this.numReceived.incrementAndGet() == this.offsetLimitDelta * 2) {
                this.shutdownLatch.countDown();
            }
        }
    }

    public static class KeyValueStoreProcessor
    implements Processor<Integer, Integer, Void, Void> {
        private final String topic;
        private final CountDownLatch processorLatch;
        private KeyValueStore<Integer, Integer> store;

        KeyValueStoreProcessor(String topic, CountDownLatch processorLatch) {
            this.topic = topic;
            this.processorLatch = processorLatch;
        }

        public void init(ProcessorContext<Void, Void> context) {
            this.store = (KeyValueStore)context.getStateStore(this.topic);
        }

        public void process(Record<Integer, Integer> record) {
            if (record.key() != null) {
                this.store.put(record.key(), record.value());
                this.processorLatch.countDown();
            }
        }
    }

    static class CloseCountingInMemoryStore
    extends InMemoryKeyValueStore {
        static AtomicInteger numStoresClosed = new AtomicInteger(0);

        CloseCountingInMemoryStore(String name) {
            super(name);
        }

        public void close() {
            numStoresClosed.incrementAndGet();
            super.close();
        }

        static int numStoresClosed() {
            return numStoresClosed.get();
        }
    }

    private static final class TestStateRestoreListener
    implements StateRestoreListener {
        private final String instanceName;
        private final Duration onBatchRestoredSleepDuration;
        private final CountDownLatch onRestoreStartLatch = new CountDownLatch(1);
        private final CountDownLatch onRestoreEndLatch = new CountDownLatch(1);
        private final CountDownLatch onRestoreSuspendedLatch = new CountDownLatch(1);
        private final CountDownLatch onBatchRestoredLatch = new CountDownLatch(1);

        TestStateRestoreListener(String instanceName, Duration onBatchRestoredSleepDuration) {
            this.onBatchRestoredSleepDuration = onBatchRestoredSleepDuration;
            this.instanceName = instanceName;
        }

        boolean awaitUntilRestorationStarts() throws InterruptedException {
            return TestStateRestoreListener.awaitLatchWithTimeout(this.onRestoreStartLatch);
        }

        boolean awaitUntilRestorationSuspends() throws InterruptedException {
            return TestStateRestoreListener.awaitLatchWithTimeout(this.onRestoreSuspendedLatch);
        }

        boolean awaitUntilRestorationEnds() throws InterruptedException {
            return TestStateRestoreListener.awaitLatchWithTimeout(this.onRestoreEndLatch);
        }

        public boolean awaitUntilBatchRestoredIsCalled() throws InterruptedException {
            return TestStateRestoreListener.awaitLatchWithTimeout(this.onBatchRestoredLatch);
        }

        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            log.info("[{}] called onRestoreStart. topicPartition={}, storeName={}, startingOffset={}, endingOffset={}", new Object[]{this.instanceName, topicPartition, storeName, startingOffset, endingOffset});
            this.onRestoreStartLatch.countDown();
        }

        public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
            log.info("[{}] called onBatchRestored. topicPartition={}, storeName={}, batchEndOffset={}, numRestored={}", new Object[]{this.instanceName, topicPartition, storeName, batchEndOffset, numRestored});
            Utils.sleep((long)this.onBatchRestoredSleepDuration.toMillis());
            this.onBatchRestoredLatch.countDown();
        }

        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
            log.info("[{}] called onRestoreEnd. topicPartition={}, storeName={}, totalRestored={}", new Object[]{this.instanceName, topicPartition, storeName, totalRestored});
            this.onRestoreEndLatch.countDown();
        }

        public void onRestoreSuspended(TopicPartition topicPartition, String storeName, long totalRestored) {
            log.info("[{}] called onRestoreSuspended. topicPartition={}, storeName={}, totalRestored={}", new Object[]{this.instanceName, topicPartition, storeName, totalRestored});
            this.onRestoreSuspendedLatch.countDown();
        }

        private static boolean awaitLatchWithTimeout(CountDownLatch latch) throws InterruptedException {
            return latch.await(60000L, TimeUnit.MILLISECONDS);
        }
    }
}

