/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={IntegrationTest.class})
public class StoreQueryIntegrationTest {
    private static final Logger LOG = LoggerFactory.getLogger(StoreQueryIntegrationTest.class);
    private static final int NUM_BROKERS = 1;
    private static int port = 0;
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final String TABLE_NAME = "source-table";
    public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(1);
    @Rule
    public TestName testName = new TestName();
    private final List<KafkaStreams> streamsToCleanup = new ArrayList<KafkaStreams>();
    private final MockTime mockTime;

    public StoreQueryIntegrationTest() {
        this.mockTime = this.cluster.time;
    }

    @Before
    public void before() throws InterruptedException, IOException {
        this.cluster.start();
        this.cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
    }

    @After
    public void after() {
        for (KafkaStreams kafkaStreams : this.streamsToCleanup) {
            kafkaStreams.close();
        }
        this.cluster.stop();
    }

    @Test
    public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
        int batch1NumMessages = 100;
        boolean key = true;
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder builder = new StreamsBuilder();
        this.getStreamsBuilderWithTopology(builder, semaphore);
        KafkaStreams kafkaStreams1 = this.createKafkaStreams(builder, this.streamsConfiguration());
        KafkaStreams kafkaStreams2 = this.createKafkaStreams(builder, this.streamsConfiguration());
        List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60L));
        this.produceValueRange(1, 0, 100);
        MatcherAssert.assertThat((Object)semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)true)));
        StoreQueryIntegrationTest.until(() -> {
            KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, (Object)1, (topic, somekey, value, numPartitions) -> 0);
            QueryableStoreType queryableStoreType = QueryableStoreTypes.keyValueStore();
            ReadOnlyKeyValueStore store1 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
            ReadOnlyKeyValueStore store2 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
            boolean kafkaStreams1IsActive = keyQueryMetadata.activeHost().port() % 2 == 1;
            try {
                if (kafkaStreams1IsActive) {
                    MatcherAssert.assertThat((Object)store1.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                    MatcherAssert.assertThat((Object)store2.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
                } else {
                    MatcherAssert.assertThat((Object)store1.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
                    MatcherAssert.assertThat((Object)store2.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                }
                return true;
            }
            catch (InvalidStateStoreException exception) {
                this.verifyRetrievableException((Exception)((Object)exception));
                LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
                return false;
            }
        });
    }

    @Test
    public void shouldQuerySpecificActivePartitionStores() throws Exception {
        int batch1NumMessages = 100;
        boolean key = true;
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder builder = new StreamsBuilder();
        this.getStreamsBuilderWithTopology(builder, semaphore);
        KafkaStreams kafkaStreams1 = this.createKafkaStreams(builder, this.streamsConfiguration());
        KafkaStreams kafkaStreams2 = this.createKafkaStreams(builder, this.streamsConfiguration());
        List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60L));
        this.produceValueRange(1, 0, 100);
        MatcherAssert.assertThat((Object)semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)true)));
        StoreQueryIntegrationTest.until(() -> {
            KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, (Object)1, (topic, somekey, value, numPartitions) -> 0);
            int keyPartition = keyQueryMetadata.partition();
            int keyDontBelongPartition = keyPartition == 0 ? 1 : 0;
            boolean kafkaStreams1IsActive = keyQueryMetadata.activeHost().port() % 2 == 1;
            StoreQueryParameters storeQueryParam = StoreQueryParameters.fromNameAndType((String)TABLE_NAME, (QueryableStoreType)QueryableStoreTypes.keyValueStore()).withPartition(Integer.valueOf(keyPartition));
            ReadOnlyKeyValueStore store1 = null;
            ReadOnlyKeyValueStore store2 = null;
            if (kafkaStreams1IsActive) {
                store1 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
            } else {
                store2 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
            }
            if (kafkaStreams1IsActive) {
                MatcherAssert.assertThat((Object)store1, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                MatcherAssert.assertThat((Object)store2, (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
            } else {
                MatcherAssert.assertThat((Object)store2, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                MatcherAssert.assertThat((Object)store1, (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
            }
            StoreQueryParameters storeQueryParam2 = StoreQueryParameters.fromNameAndType((String)TABLE_NAME, (QueryableStoreType)QueryableStoreTypes.keyValueStore()).withPartition(Integer.valueOf(keyDontBelongPartition));
            try {
                if (kafkaStreams1IsActive) {
                    MatcherAssert.assertThat((Object)store1.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                    MatcherAssert.assertThat((Object)((ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam2)).get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
                    InvalidStateStoreException exception = (InvalidStateStoreException)Assert.assertThrows(InvalidStateStoreException.class, () -> {
                        Integer cfr_ignored_0 = (Integer)((ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam2)).get((Object)1);
                    });
                    MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.containsString((String)"The specified partition 1 for store source-table does not exist."));
                } else {
                    MatcherAssert.assertThat((Object)store2.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                    MatcherAssert.assertThat((Object)((ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam2)).get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
                    InvalidStateStoreException exception = (InvalidStateStoreException)Assert.assertThrows(InvalidStateStoreException.class, () -> {
                        Integer cfr_ignored_0 = (Integer)((ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam2)).get((Object)1);
                    });
                    MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.containsString((String)"The specified partition 1 for store source-table does not exist."));
                }
                return true;
            }
            catch (InvalidStateStoreException exception) {
                this.verifyRetrievableException((Exception)((Object)exception));
                LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
                return false;
            }
        });
    }

    @Test
    public void shouldQueryAllStalePartitionStores() throws Exception {
        int batch1NumMessages = 100;
        boolean key = true;
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder builder = new StreamsBuilder();
        this.getStreamsBuilderWithTopology(builder, semaphore);
        KafkaStreams kafkaStreams1 = this.createKafkaStreams(builder, this.streamsConfiguration());
        KafkaStreams kafkaStreams2 = this.createKafkaStreams(builder, this.streamsConfiguration());
        List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60L));
        this.produceValueRange(1, 0, 100);
        MatcherAssert.assertThat((Object)semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)true)));
        QueryableStoreType queryableStoreType = QueryableStoreTypes.keyValueStore();
        TestUtils.waitForCondition(() -> {
            ReadOnlyKeyValueStore store1 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType);
            return store1.get((Object)1) != null;
        }, (String)"store1 cannot find results for key");
        TestUtils.waitForCondition(() -> {
            ReadOnlyKeyValueStore store2 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType);
            return store2.get((Object)1) != null;
        }, (String)"store2 cannot find results for key");
    }

    @Test
    public void shouldQuerySpecificStalePartitionStores() throws Exception {
        int batch1NumMessages = 100;
        boolean key = true;
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder builder = new StreamsBuilder();
        this.getStreamsBuilderWithTopology(builder, semaphore);
        KafkaStreams kafkaStreams1 = this.createKafkaStreams(builder, this.streamsConfiguration());
        KafkaStreams kafkaStreams2 = this.createKafkaStreams(builder, this.streamsConfiguration());
        List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60L));
        this.produceValueRange(1, 0, 100);
        MatcherAssert.assertThat((Object)semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)true)));
        KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, (Object)1, (topic, somekey, value, numPartitions) -> 0);
        int keyPartition = keyQueryMetadata.partition();
        int keyDontBelongPartition = keyPartition == 0 ? 1 : 0;
        QueryableStoreType queryableStoreType = QueryableStoreTypes.keyValueStore();
        StoreQueryParameters param = StoreQueryParameters.fromNameAndType((String)TABLE_NAME, (QueryableStoreType)queryableStoreType).enableStaleStores().withPartition(Integer.valueOf(keyPartition));
        TestUtils.waitForCondition(() -> {
            ReadOnlyKeyValueStore store1 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams1, param);
            return store1.get((Object)1) != null;
        }, (String)"store1 cannot find results for key");
        TestUtils.waitForCondition(() -> {
            ReadOnlyKeyValueStore store2 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams2, param);
            return store2.get((Object)1) != null;
        }, (String)"store2 cannot find results for key");
        StoreQueryParameters otherParam = StoreQueryParameters.fromNameAndType((String)TABLE_NAME, (QueryableStoreType)queryableStoreType).enableStaleStores().withPartition(Integer.valueOf(keyDontBelongPartition));
        ReadOnlyKeyValueStore store3 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams1, otherParam);
        ReadOnlyKeyValueStore store4 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams2, otherParam);
        MatcherAssert.assertThat((Object)store3.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        MatcherAssert.assertThat((Object)store4.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
    }

    @Test
    public void shouldQuerySpecificStalePartitionStoresMultiStreamThreads() throws Exception {
        int batch1NumMessages = 100;
        boolean key = true;
        Semaphore semaphore = new Semaphore(0);
        int numStreamThreads = 2;
        StreamsBuilder builder = new StreamsBuilder();
        this.getStreamsBuilderWithTopology(builder, semaphore);
        Properties streamsConfiguration1 = this.streamsConfiguration();
        streamsConfiguration1.put("num.stream.threads", (Object)2);
        Properties streamsConfiguration2 = this.streamsConfiguration();
        streamsConfiguration2.put("num.stream.threads", (Object)2);
        KafkaStreams kafkaStreams1 = this.createKafkaStreams(builder, streamsConfiguration1);
        KafkaStreams kafkaStreams2 = this.createKafkaStreams(builder, streamsConfiguration2);
        List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60L));
        Assert.assertTrue((kafkaStreams1.metadataForLocalThreads().size() > 1 ? 1 : 0) != 0);
        Assert.assertTrue((kafkaStreams2.metadataForLocalThreads().size() > 1 ? 1 : 0) != 0);
        this.produceValueRange(1, 0, 100);
        MatcherAssert.assertThat((Object)semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)true)));
        KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, (Object)1, (Serializer)new IntegerSerializer());
        int keyPartition = keyQueryMetadata.partition();
        int keyDontBelongPartition = keyPartition == 0 ? 1 : 0;
        QueryableStoreType queryableStoreType = QueryableStoreTypes.keyValueStore();
        StoreQueryParameters param = StoreQueryParameters.fromNameAndType((String)TABLE_NAME, (QueryableStoreType)queryableStoreType).enableStaleStores().withPartition(Integer.valueOf(keyPartition));
        TestUtils.waitForCondition(() -> {
            ReadOnlyKeyValueStore store1 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams1, param);
            return store1.get((Object)1) != null;
        }, (String)"store1 cannot find results for key");
        TestUtils.waitForCondition(() -> {
            ReadOnlyKeyValueStore store2 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams2, param);
            return store2.get((Object)1) != null;
        }, (String)"store2 cannot find results for key");
        StoreQueryParameters otherParam = StoreQueryParameters.fromNameAndType((String)TABLE_NAME, (QueryableStoreType)queryableStoreType).enableStaleStores().withPartition(Integer.valueOf(keyDontBelongPartition));
        ReadOnlyKeyValueStore store3 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams1, otherParam);
        ReadOnlyKeyValueStore store4 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(kafkaStreams2, otherParam);
        MatcherAssert.assertThat((Object)store3.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        MatcherAssert.assertThat((Object)store4.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
    }

    @Test
    public void shouldQuerySpecificStalePartitionStoresMultiStreamThreadsNamedTopology() throws Exception {
        int batch1NumMessages = 100;
        boolean key = true;
        Semaphore semaphore = new Semaphore(0);
        int numStreamThreads = 2;
        Properties streamsConfiguration1 = this.streamsConfiguration();
        streamsConfiguration1.put("num.stream.threads", (Object)2);
        Properties streamsConfiguration2 = this.streamsConfiguration();
        streamsConfiguration2.put("num.stream.threads", (Object)2);
        String topologyA = "topology-A";
        KafkaStreamsNamedTopologyWrapper kafkaStreams1 = this.createNamedTopologyKafkaStreams(streamsConfiguration1);
        KafkaStreamsNamedTopologyWrapper kafkaStreams2 = this.createNamedTopologyKafkaStreams(streamsConfiguration2);
        List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
        NamedTopologyBuilder builder1A = kafkaStreams1.newNamedTopologyBuilder("topology-A", streamsConfiguration1);
        this.getStreamsBuilderWithTopology((StreamsBuilder)builder1A, semaphore);
        NamedTopologyBuilder builder2A = kafkaStreams2.newNamedTopologyBuilder("topology-A", streamsConfiguration2);
        this.getStreamsBuilderWithTopology((StreamsBuilder)builder2A, semaphore);
        kafkaStreams1.start(builder1A.build());
        kafkaStreams2.start(builder2A.build());
        IntegrationTestUtils.waitForApplicationState(kafkaStreamsList, KafkaStreams.State.RUNNING, Duration.ofSeconds(60L));
        Assert.assertTrue((kafkaStreams1.metadataForLocalThreads().size() > 1 ? 1 : 0) != 0);
        Assert.assertTrue((kafkaStreams2.metadataForLocalThreads().size() > 1 ? 1 : 0) != 0);
        this.produceValueRange(1, 0, 100);
        MatcherAssert.assertThat((Object)semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)true)));
        KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, (Object)1, (Serializer)new IntegerSerializer(), "topology-A");
        int keyPartition = keyQueryMetadata.partition();
        int keyDontBelongPartition = keyPartition == 0 ? 1 : 0;
        QueryableStoreType queryableStoreType = QueryableStoreTypes.keyValueStore();
        NamedTopologyStoreQueryParameters param = NamedTopologyStoreQueryParameters.fromNamedTopologyAndStoreNameAndType((String)"topology-A", (String)TABLE_NAME, (QueryableStoreType)queryableStoreType).enableStaleStores().withPartition(Integer.valueOf(keyPartition));
        TestUtils.waitForCondition(() -> {
            ReadOnlyKeyValueStore store1 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore((KafkaStreams)kafkaStreams1, param);
            return store1.get((Object)1) != null;
        }, (String)"store1 cannot find results for key");
        TestUtils.waitForCondition(() -> {
            ReadOnlyKeyValueStore store2 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore((KafkaStreams)kafkaStreams2, param);
            return store2.get((Object)1) != null;
        }, (String)"store2 cannot find results for key");
        NamedTopologyStoreQueryParameters otherParam = NamedTopologyStoreQueryParameters.fromNamedTopologyAndStoreNameAndType((String)"topology-A", (String)TABLE_NAME, (QueryableStoreType)queryableStoreType).enableStaleStores().withPartition(Integer.valueOf(keyDontBelongPartition));
        ReadOnlyKeyValueStore store3 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore((KafkaStreams)kafkaStreams1, otherParam);
        ReadOnlyKeyValueStore store4 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore((KafkaStreams)kafkaStreams2, otherParam);
        MatcherAssert.assertThat((Object)store3.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        MatcherAssert.assertThat((Object)store4.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
    }

    @Test
    public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Exception {
        int batch1NumMessages = 100;
        boolean key = true;
        int key2 = 2;
        int key3 = 3;
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder builder = new StreamsBuilder();
        this.getStreamsBuilderWithTopology(builder, semaphore);
        Properties streamsConfiguration1 = this.streamsConfiguration();
        streamsConfiguration1.put("num.stream.threads", (Object)1);
        KafkaStreams kafkaStreams1 = this.createKafkaStreams(builder, streamsConfiguration1);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams1), Duration.ofSeconds(60L));
        Optional streamThread = kafkaStreams1.addStreamThread();
        MatcherAssert.assertThat((Object)streamThread.isPresent(), (Matcher)Matchers.is((Object)true));
        StoreQueryIntegrationTest.until(() -> kafkaStreams1.state().isRunningOrRebalancing());
        this.produceValueRange(1, 0, 100);
        this.produceValueRange(2, 0, 100);
        this.produceValueRange(3, 0, 100);
        MatcherAssert.assertThat((Object)semaphore.tryAcquire(300, 60L, TimeUnit.SECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)true)));
        StoreQueryIntegrationTest.until(() -> KafkaStreams.State.RUNNING.equals((Object)kafkaStreams1.state()));
        StoreQueryIntegrationTest.until(() -> {
            QueryableStoreType queryableStoreType = QueryableStoreTypes.keyValueStore();
            ReadOnlyKeyValueStore store1 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
            try {
                MatcherAssert.assertThat((Object)store1.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                MatcherAssert.assertThat((Object)store1.get((Object)2), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                MatcherAssert.assertThat((Object)store1.get((Object)3), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                return true;
            }
            catch (InvalidStateStoreException exception) {
                this.verifyRetrievableException((Exception)((Object)exception));
                LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
                return false;
            }
        });
        Optional removedThreadName = kafkaStreams1.removeStreamThread();
        MatcherAssert.assertThat((Object)removedThreadName.isPresent(), (Matcher)Matchers.is((Object)true));
        StoreQueryIntegrationTest.until(() -> kafkaStreams1.state().isRunningOrRebalancing());
        StoreQueryIntegrationTest.until(() -> KafkaStreams.State.RUNNING.equals((Object)kafkaStreams1.state()));
        StoreQueryIntegrationTest.until(() -> {
            QueryableStoreType queryableStoreType = QueryableStoreTypes.keyValueStore();
            ReadOnlyKeyValueStore store1 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
            try {
                MatcherAssert.assertThat((Object)store1.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                MatcherAssert.assertThat((Object)store1.get((Object)2), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                MatcherAssert.assertThat((Object)store1.get((Object)3), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                return true;
            }
            catch (InvalidStateStoreException exception) {
                this.verifyRetrievableException((Exception)((Object)exception));
                LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
                return false;
            }
        });
    }

    private void verifyRetrievableException(Exception exception) {
        MatcherAssert.assertThat((String)"Unexpected exception thrown while getting the value from store.", (Object)exception.getMessage(), (Matcher)Matchers.is((Matcher)Matchers.anyOf((Matcher)Matchers.containsString((String)"Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"), (Matcher)Matchers.containsString((String)"The state store, source-table, may have migrated to another instance"), (Matcher)Matchers.containsString((String)"Cannot get state store source-table because the stream thread is STARTING, not RUNNING"))));
    }

    private static void until(TestCondition condition) {
        boolean success = false;
        long deadline = System.currentTimeMillis() + 60000L;
        while (!success && System.currentTimeMillis() < deadline) {
            try {
                success = condition.conditionMet();
                Thread.sleep(500L);
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void getStreamsBuilderWithTopology(StreamsBuilder builder, Semaphore semaphore) {
        builder.table(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), Materialized.as((String)TABLE_NAME).withCachingDisabled()).toStream().peek((k, v) -> semaphore.release());
    }

    private KafkaStreams createKafkaStreams(StreamsBuilder builder, Properties config) {
        KafkaStreams streams = new KafkaStreams(builder.build(config), config);
        this.streamsToCleanup.add(streams);
        return streams;
    }

    private KafkaStreamsNamedTopologyWrapper createNamedTopologyKafkaStreams(Properties config) {
        KafkaStreamsNamedTopologyWrapper streams = new KafkaStreamsNamedTopologyWrapper(config);
        this.streamsToCleanup.add((KafkaStreams)streams);
        return streams;
    }

    private void produceValueRange(int key, int start, int endExclusive) {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", this.cluster.bootstrapServers());
        producerProps.put("key.serializer", IntegerSerializer.class);
        producerProps.put("value.serializer", IntegerSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC_NAME, IntStream.range(start, endExclusive).mapToObj(i -> KeyValue.pair((Object)key, (Object)i)).collect(Collectors.toList()), producerProps, (Time)this.mockTime);
    }

    private Properties streamsConfiguration() {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        Properties config = new Properties();
        config.put("topology.optimization", "all");
        config.put("application.id", "app-" + safeTestName);
        config.put("application.server", "localhost:" + ++port);
        config.put("bootstrap.servers", this.cluster.bootstrapServers());
        config.put("state.dir", TestUtils.tempDirectory().getPath());
        config.put("default.key.serde", Serdes.Integer().getClass());
        config.put("default.value.serde", Serdes.Integer().getClass());
        config.put("num.standby.replicas", (Object)1);
        config.put("max.poll.records", (Object)100);
        config.put("heartbeat.interval.ms", (Object)200);
        config.put("session.timeout.ms", (Object)1000);
        config.put("commit.interval.ms", (Object)100L);
        return config;
    }
}

