/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.io.StringReader;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.utils.MockTime;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsTest;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={IntegrationTest.class})
public class QueryableStateIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
    private static final long DEFAULT_TIMEOUT_MS = 120000L;
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final int STREAM_THREE_PARTITIONS = 4;
    private final MockTime mockTime;
    private String streamOne;
    private String streamTwo;
    private String streamThree;
    private String streamConcurrent;
    private String outputTopic;
    private String outputTopicConcurrent;
    private String outputTopicConcurrentWindowed;
    private String outputTopicThree;
    private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2L, TimeUnit.DAYS);
    private static final int STREAM_TWO_PARTITIONS = 2;
    private static final int NUM_REPLICAS = 1;
    private Properties streamsConfiguration;
    private List<String> inputValues;
    private Set<String> inputValuesKeys;
    private KafkaStreams kafkaStreams;
    private Comparator<KeyValue<String, String>> stringComparator;
    private Comparator<KeyValue<String, Long>> stringLongComparator;
    @Rule
    public TestName testName;

    public QueryableStateIntegrationTest() {
        this.mockTime = QueryableStateIntegrationTest.CLUSTER.time;
        this.streamOne = "stream-one";
        this.streamTwo = "stream-two";
        this.streamThree = "stream-three";
        this.streamConcurrent = "stream-concurrent";
        this.outputTopic = "output";
        this.outputTopicConcurrent = "output-concurrent";
        this.outputTopicConcurrentWindowed = "output-concurrent-windowed";
        this.outputTopicThree = "output-three";
        this.testName = new TestName();
    }

    private void createTopics() throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamOne = this.streamOne + "-" + safeTestName;
        this.streamConcurrent = this.streamConcurrent + "-" + safeTestName;
        this.streamThree = this.streamThree + "-" + safeTestName;
        this.outputTopic = this.outputTopic + "-" + safeTestName;
        this.outputTopicConcurrent = this.outputTopicConcurrent + "-" + safeTestName;
        this.outputTopicConcurrentWindowed = this.outputTopicConcurrentWindowed + "-" + safeTestName;
        this.outputTopicThree = this.outputTopicThree + "-" + safeTestName;
        this.streamTwo = this.streamTwo + "-" + safeTestName;
        CLUSTER.createTopics(this.streamOne, this.streamConcurrent);
        CLUSTER.createTopic(this.streamTwo, 2, 1);
        CLUSTER.createTopic(this.streamThree, 4, 1);
        CLUSTER.createTopics(this.outputTopic, this.outputTopicConcurrent, this.outputTopicConcurrentWindowed, this.outputTopicThree);
    }

    private List<String> getInputValues() {
        List<String> input = new ArrayList<String>();
        ClassLoader classLoader = this.getClass().getClassLoader();
        String fileName = "QueryableStateIntegrationTest" + File.separator + "inputValues.txt";
        try (BufferedReader reader = new BufferedReader(new FileReader(Objects.requireNonNull(classLoader.getResource(fileName)).getFile()));){
            String line = reader.readLine();
            while (line != null) {
                input.add(line);
                line = reader.readLine();
            }
        }
        catch (Exception e) {
            log.warn("Unable to read '{}{}{}'. Using default inputValues list", new Object[]{"resources", File.separator, fileName});
            input = Arrays.asList("hello world", "all streams lead to kafka", "streams", "kafka streams", "the cat in the hat", "green eggs and ham", "that Sam i am", "up the creek without a paddle", "run forest run", "a tank full of gas", "eat sleep rave repeat", "one jolly sailor", "king of the world");
        }
        return input;
    }

    @Before
    public void before() throws Exception {
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfiguration.put("application.id", "app-" + safeTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("commit.interval.ms", (Object)100);
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.stringComparator = Comparator.comparing(o -> (String)o.key).thenComparing(o -> (String)o.value);
        this.stringLongComparator = Comparator.comparing(o -> (String)o.key).thenComparingLong(o -> (Long)o.value);
        this.inputValues = this.getInputValues();
        this.inputValuesKeys = new HashSet<String>();
        for (String sentence : this.inputValues) {
            String[] words = sentence.split("\\W+");
            Collections.addAll(this.inputValuesKeys, words);
        }
    }

    @After
    public void shutdown() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
        CLUSTER.deleteAllTopicsAndWait(0L);
    }

    private KafkaStreams createCountStream(String inputTopic, String outputTopic, String windowOutputTopic, String storeName, String windowStoreName, Properties streamsConfiguration) {
        StreamsBuilder builder = new StreamsBuilder();
        Serde stringSerde = Serdes.String();
        KStream textLines = builder.stream(inputTopic, Consumed.with((Serde)stringSerde, (Serde)stringSerde));
        KGroupedStream groupedByWord = textLines.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).groupBy(MockMapper.selectValueMapper());
        groupedByWord.count(Materialized.as((String)(storeName + "-" + inputTopic))).toStream().to(outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        groupedByWord.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(WINDOW_SIZE))).count(Materialized.as((String)(windowStoreName + "-" + inputTopic))).toStream((key, value) -> (String)key.key()).to(windowOutputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        return new KafkaStreams(builder.build(), streamsConfiguration);
    }

    private void verifyOffsetLagFetch(List<KafkaStreams> streamsList, Set<String> stores, List<Integer> partitionsPerStreamsInstance) {
        for (int i = 0; i < streamsList.size(); ++i) {
            Map localLags = streamsList.get(i).allLocalStorePartitionLags();
            int expectedPartitions = partitionsPerStreamsInstance.get(i);
            MatcherAssert.assertThat((Object)localLags.values().stream().mapToInt(Map::size).sum(), (Matcher)IsEqual.equalTo((Object)expectedPartitions));
            if (expectedPartitions <= 0) continue;
            MatcherAssert.assertThat(localLags.keySet(), (Matcher)IsEqual.equalTo(stores));
        }
    }

    @Deprecated
    private void verifyAllKVKeys(List<KafkaStreams> streamsList, KafkaStreams streams, KafkaStreamsTest.StateListenerStub stateListener, Set<String> keys, String storeName, long timeout, boolean pickInstanceByPort) throws Exception {
        TestUtils.retryOnExceptionWithTimeout((long)timeout, () -> {
            ArrayList<String> noMetadataKeys = new ArrayList<String>();
            ArrayList<String> nullStoreKeys = new ArrayList<String>();
            ArrayList<String> nullValueKeys = new ArrayList<String>();
            TreeMap<String, Exception> exceptionalKeys = new TreeMap<String, Exception>();
            StringSerializer serializer = new StringSerializer();
            for (String key : keys) {
                try {
                    KeyQueryMetadata queryMetadata = streams.queryMetadataForKey(storeName, (Object)key, (Serializer)serializer);
                    StreamsMetadata metadata = streams.metadataForKey(storeName, (Object)key, (Serializer)serializer);
                    if (queryMetadata == null || queryMetadata.equals((Object)KeyQueryMetadata.NOT_AVAILABLE)) {
                        noMetadataKeys.add(key);
                        continue;
                    }
                    MatcherAssert.assertThat((Object)metadata.hostInfo(), (Matcher)IsEqual.equalTo((Object)queryMetadata.activeHost()));
                    if (!pickInstanceByPort) {
                        MatcherAssert.assertThat((String)"Should have standbys to query from", (!queryMetadata.standbyHosts().isEmpty() ? 1 : 0) != 0);
                    }
                    int index = queryMetadata.activeHost().port();
                    KafkaStreams streamsWithKey = pickInstanceByPort ? (KafkaStreams)streamsList.get(index) : streams;
                    ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(storeName, streamsWithKey, true, QueryableStoreTypes.keyValueStore());
                    if (store == null) {
                        nullStoreKeys.add(key);
                        continue;
                    }
                    if (store.get((Object)key) != null) continue;
                    nullValueKeys.add(key);
                }
                catch (InvalidStateStoreException e) {
                    if (stateListener.mapStates.get(KafkaStreams.State.REBALANCING) >= 1L) continue;
                    throw new NoRetryException((Throwable)((Object)new AssertionError((Object)String.format("Received %s for key %s and expected at least one rebalancing state, but had none", ((Object)((Object)e)).getClass().getName(), key))));
                }
                catch (Exception e) {
                    exceptionalKeys.put(key, e);
                }
            }
            this.assertNoKVKeyFailures(storeName, timeout, noMetadataKeys, nullStoreKeys, nullValueKeys, exceptionalKeys);
        });
    }

    @Deprecated
    private void verifyAllWindowedKeys(List<KafkaStreams> streamsList, KafkaStreams streams, KafkaStreamsTest.StateListenerStub stateListenerStub, Set<String> keys, String storeName, Long from, Long to, long timeout, boolean pickInstanceByPort) throws Exception {
        TestUtils.retryOnExceptionWithTimeout((long)timeout, () -> {
            ArrayList<String> noMetadataKeys = new ArrayList<String>();
            ArrayList<String> nullStoreKeys = new ArrayList<String>();
            ArrayList<String> nullValueKeys = new ArrayList<String>();
            TreeMap<String, Exception> exceptionalKeys = new TreeMap<String, Exception>();
            StringSerializer serializer = new StringSerializer();
            for (String key : keys) {
                try {
                    KeyQueryMetadata queryMetadata = streams.queryMetadataForKey(storeName, (Object)key, (Serializer)serializer);
                    StreamsMetadata metadata = streams.metadataForKey(storeName, (Object)key, (Serializer)serializer);
                    if (queryMetadata == null || queryMetadata.equals((Object)KeyQueryMetadata.NOT_AVAILABLE)) {
                        noMetadataKeys.add(key);
                        continue;
                    }
                    MatcherAssert.assertThat((Object)metadata.hostInfo(), (Matcher)IsEqual.equalTo((Object)queryMetadata.activeHost()));
                    if (pickInstanceByPort) {
                        MatcherAssert.assertThat((Object)queryMetadata.standbyHosts().size(), (Matcher)IsEqual.equalTo((Object)0));
                    } else {
                        MatcherAssert.assertThat((String)"Should have standbys to query from", (!queryMetadata.standbyHosts().isEmpty() ? 1 : 0) != 0);
                    }
                    int index = queryMetadata.activeHost().port();
                    KafkaStreams streamsWithKey = pickInstanceByPort ? (KafkaStreams)streamsList.get(index) : streams;
                    ReadOnlyWindowStore store = (ReadOnlyWindowStore)IntegrationTestUtils.getStore(storeName, streamsWithKey, true, QueryableStoreTypes.windowStore());
                    if (store == null) {
                        nullStoreKeys.add(key);
                        continue;
                    }
                    if (store.fetch((Object)key, Instant.ofEpochMilli(from), Instant.ofEpochMilli(to)) != null) continue;
                    nullValueKeys.add(key);
                }
                catch (InvalidStateStoreException e) {
                    if (stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1L) continue;
                    throw new NoRetryException((Throwable)((Object)new AssertionError((Object)String.format("Received %s for key %s and expected at least one rebalancing state, but had none", ((Object)((Object)e)).getClass().getName(), key))));
                }
                catch (Exception e) {
                    exceptionalKeys.put(key, e);
                }
            }
            this.assertNoKVKeyFailures(storeName, timeout, noMetadataKeys, nullStoreKeys, nullValueKeys, exceptionalKeys);
        });
    }

    private void assertNoKVKeyFailures(String storeName, long timeout, List<String> noMetadataKeys, List<String> nullStoreKeys, List<String> nullValueKeys, Map<String, Exception> exceptionalKeys) throws IOException {
        StringBuilder reason = new StringBuilder();
        reason.append(String.format("Not all keys are available for store %s in %d ms", storeName, timeout));
        if (!noMetadataKeys.isEmpty()) {
            reason.append("\n    * No metadata is available for these keys: ").append(noMetadataKeys);
        }
        if (!nullStoreKeys.isEmpty()) {
            reason.append("\n    * No store is available for these keys: ").append(nullStoreKeys);
        }
        if (!nullValueKeys.isEmpty()) {
            reason.append("\n    * No value is available for these keys: ").append(nullValueKeys);
        }
        if (!exceptionalKeys.isEmpty()) {
            reason.append("\n    * Exceptions were raised for the following keys: ");
            for (Map.Entry<String, Exception> entry : exceptionalKeys.entrySet()) {
                reason.append(String.format("\n        %s:", entry.getKey()));
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                Exception exception = entry.getValue();
                exception.printStackTrace(new PrintStream(baos));
                BufferedReader reader = new BufferedReader(new StringReader(baos.toString()));
                Throwable throwable = null;
                try {
                    String line = reader.readLine();
                    while (line != null) {
                        reason.append("\n            ").append(line);
                        line = reader.readLine();
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (reader == null) continue;
                    if (throwable != null) {
                        try {
                            reader.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    reader.close();
                }
            }
        }
        MatcherAssert.assertThat((String)reason.toString(), (noMetadataKeys.isEmpty() && nullStoreKeys.isEmpty() && nullValueKeys.isEmpty() && exceptionalKeys.isEmpty() ? 1 : 0) != 0);
    }

    @Test
    public void shouldRejectNonExistentStoreName() throws InterruptedException {
        String uniqueTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String input = uniqueTestName + "-input";
        String storeName = uniqueTestName + "-input-table";
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(input, Materialized.as((String)storeName).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        Properties properties = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName)), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
        CLUSTER.createTopic(input);
        try (KafkaStreams streams = IntegrationTestUtils.getRunningStreams(properties, builder, true);){
            ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)streams.store(StoreQueryParameters.fromNameAndType((String)storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.notNullValue());
            InvalidStateStoreException exception = (InvalidStateStoreException)Assert.assertThrows(InvalidStateStoreException.class, () -> {
                ReadOnlyKeyValueStore cfr_ignored_0 = (ReadOnlyKeyValueStore)streams.store(StoreQueryParameters.fromNameAndType((String)"no-table", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
            });
            MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.is((Object)"Cannot get state store no-table because no such store is registered in the topology."));
        }
    }

    @Test
    public void shouldRejectWronglyTypedStore() throws InterruptedException {
        String uniqueTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String input = uniqueTestName + "-input";
        String storeName = uniqueTestName + "-input-table";
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(input, Materialized.as((String)storeName).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        CLUSTER.createTopic(input);
        Properties properties = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)(uniqueTestName + "-app")), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
        try (KafkaStreams streams = IntegrationTestUtils.getRunningStreams(properties, builder, true);){
            ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)streams.store(StoreQueryParameters.fromNameAndType((String)storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.notNullValue());
            ReadOnlySessionStore sessionStore = (ReadOnlySessionStore)streams.store(StoreQueryParameters.fromNameAndType((String)storeName, (QueryableStoreType)QueryableStoreTypes.sessionStore()));
            InvalidStateStoreException exception = (InvalidStateStoreException)Assert.assertThrows(InvalidStateStoreException.class, () -> sessionStore.fetch((Object)"a"));
            MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.is((Object)("Cannot get state store " + storeName + " because the queryable store type [class org.apache.kafka.streams.state.QueryableStoreTypes$SessionStoreType] does not accept the actual store type [class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore].")));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldBeAbleToQueryDuringRebalance() throws Exception {
        int numThreads = 2;
        ArrayList<KafkaStreams> streamsList = new ArrayList<KafkaStreams>(2);
        ArrayList<KafkaStreamsTest.StateListenerStub> listeners = new ArrayList<KafkaStreamsTest.StateListenerStub>(2);
        ProducerRunnable producerRunnable = new ProducerRunnable(this.streamThree, this.inputValues, 1);
        producerRunnable.run();
        String storeName = "word-count-store";
        String windowStoreName = "windowed-word-count-store";
        for (int i = 0; i < 2; ++i) {
            Properties props = (Properties)this.streamsConfiguration.clone();
            props.put("state.dir", TestUtils.tempDirectory((String)("shouldBeAbleToQueryDuringRebalance-" + i)).getPath());
            props.put("application.server", "localhost:" + i);
            props.put("client.id", "instance-" + i);
            KafkaStreams streams = this.createCountStream(this.streamThree, this.outputTopicThree, this.outputTopicConcurrentWindowed, "word-count-store", "windowed-word-count-store", props);
            KafkaStreamsTest.StateListenerStub listener = new KafkaStreamsTest.StateListenerStub();
            streams.setStateListener((KafkaStreams.StateListener)listener);
            listeners.add(listener);
            streamsList.add(streams);
        }
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsList, Duration.ofSeconds(60L));
        Set stores = Utils.mkSet((Object[])new String[]{"word-count-store-" + this.streamThree, "windowed-word-count-store-" + this.streamThree});
        this.verifyOffsetLagFetch(streamsList, stores, Arrays.asList(4, 4));
        try {
            int i;
            this.waitUntilAtLeastNumRecordProcessed(this.outputTopicThree, 1);
            for (i = 0; i < streamsList.size(); ++i) {
                this.verifyAllKVKeys(streamsList, (KafkaStreams)streamsList.get(i), (KafkaStreamsTest.StateListenerStub)listeners.get(i), this.inputValuesKeys, "word-count-store-" + this.streamThree, 120000L, true);
                this.verifyAllWindowedKeys(streamsList, (KafkaStreams)streamsList.get(i), (KafkaStreamsTest.StateListenerStub)listeners.get(i), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, WINDOW_SIZE, 120000L, true);
            }
            this.verifyOffsetLagFetch(streamsList, stores, Arrays.asList(4, 4));
            for (i = 1; i < streamsList.size(); ++i) {
                Duration closeTimeout = Duration.ofSeconds(60L);
                MatcherAssert.assertThat((String)String.format("Streams instance %s did not close in %d ms", i, closeTimeout.toMillis()), (boolean)((KafkaStreams)streamsList.get(i)).close(closeTimeout));
            }
            IntegrationTestUtils.waitForApplicationState(streamsList.subList(1, 2), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60L));
            this.verifyOffsetLagFetch(streamsList, stores, Arrays.asList(4, 0));
            IntegrationTestUtils.waitForApplicationState(streamsList.subList(0, 1), KafkaStreams.State.RUNNING, Duration.ofSeconds(60L));
            this.verifyOffsetLagFetch(streamsList, stores, Arrays.asList(4, 0));
            this.verifyAllKVKeys(streamsList, (KafkaStreams)streamsList.get(0), (KafkaStreamsTest.StateListenerStub)listeners.get(0), this.inputValuesKeys, "word-count-store-" + this.streamThree, 120000L, true);
            this.verifyAllWindowedKeys(streamsList, (KafkaStreams)streamsList.get(0), (KafkaStreamsTest.StateListenerStub)listeners.get(0), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, WINDOW_SIZE, 120000L, true);
            TestUtils.retryOnExceptionWithTimeout((long)120000L, () -> this.verifyOffsetLagFetch(streamsList, stores, Arrays.asList(8, 0)));
        }
        finally {
            for (KafkaStreams streams : streamsList) {
                streams.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception {
        int i;
        int numThreads = 2;
        ArrayList<KafkaStreams> streamsList = new ArrayList<KafkaStreams>(2);
        ArrayList<KafkaStreamsTest.StateListenerStub> listeners = new ArrayList<KafkaStreamsTest.StateListenerStub>(2);
        ProducerRunnable producerRunnable = new ProducerRunnable(this.streamThree, this.inputValues, 1);
        producerRunnable.run();
        String storeName = "word-count-store";
        String windowStoreName = "windowed-word-count-store";
        Set stores = Utils.mkSet((Object[])new String[]{"word-count-store-" + this.streamThree, "windowed-word-count-store-" + this.streamThree});
        for (i = 0; i < 2; ++i) {
            Properties props = (Properties)this.streamsConfiguration.clone();
            props.put("application.server", "localhost:" + i);
            props.put("client.id", "instance-" + i);
            props.put("num.standby.replicas", (Object)1);
            props.put("state.dir", TestUtils.tempDirectory((String)("shouldBeAbleQueryStandbyStateDuringRebalance-" + i)).getPath());
            KafkaStreams streams = this.createCountStream(this.streamThree, this.outputTopicThree, this.outputTopicConcurrentWindowed, "word-count-store", "windowed-word-count-store", props);
            KafkaStreamsTest.StateListenerStub listener = new KafkaStreamsTest.StateListenerStub();
            streams.setStateListener((KafkaStreams.StateListener)listener);
            listeners.add(listener);
            streamsList.add(streams);
        }
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsList, Duration.ofSeconds(60L));
        this.verifyOffsetLagFetch(streamsList, stores, Arrays.asList(8, 8));
        try {
            this.waitUntilAtLeastNumRecordProcessed(this.outputTopicThree, 1);
            for (i = 0; i < streamsList.size(); ++i) {
                this.verifyAllKVKeys(streamsList, (KafkaStreams)streamsList.get(i), (KafkaStreamsTest.StateListenerStub)listeners.get(i), this.inputValuesKeys, "word-count-store-" + this.streamThree, 120000L, false);
                this.verifyAllWindowedKeys(streamsList, (KafkaStreams)streamsList.get(i), (KafkaStreamsTest.StateListenerStub)listeners.get(i), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, WINDOW_SIZE, 120000L, false);
            }
            this.verifyOffsetLagFetch(streamsList, stores, Arrays.asList(8, 8));
            for (i = 1; i < streamsList.size(); ++i) {
                Duration closeTimeout = Duration.ofSeconds(60L);
                MatcherAssert.assertThat((String)String.format("Streams instance %s did not close in %d ms", i, closeTimeout.toMillis()), (boolean)((KafkaStreams)streamsList.get(i)).close(closeTimeout));
            }
            IntegrationTestUtils.waitForApplicationState(streamsList.subList(1, 2), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60L));
            this.verifyOffsetLagFetch(streamsList, stores, Arrays.asList(8, 0));
            this.verifyAllKVKeys(streamsList, (KafkaStreams)streamsList.get(0), (KafkaStreamsTest.StateListenerStub)listeners.get(0), this.inputValuesKeys, "word-count-store-" + this.streamThree, 120000L, false);
            this.verifyAllWindowedKeys(streamsList, (KafkaStreams)streamsList.get(0), (KafkaStreamsTest.StateListenerStub)listeners.get(0), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, WINDOW_SIZE, 120000L, false);
            TestUtils.retryOnExceptionWithTimeout((long)120000L, () -> this.verifyOffsetLagFetch(streamsList, stores, Arrays.asList(8, 0)));
        }
        finally {
            for (KafkaStreams streams : streamsList) {
                streams.close();
            }
        }
    }

    @Test
    public void shouldBeAbleToQueryStateWithZeroSizedCache() throws Exception {
        this.verifyCanQueryState(0);
    }

    @Test
    public void shouldBeAbleToQueryStateWithNonZeroSizedCache() throws Exception {
        this.verifyCanQueryState(0xA00000);
    }

    @Test
    public void shouldBeAbleToQueryFilterState() throws Exception {
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.Long().getClass());
        StreamsBuilder builder = new StreamsBuilder();
        String[] keys = new String[]{"hello", "goodbye", "welcome", "go", "kafka"};
        HashSet batch1 = new HashSet(Arrays.asList(new KeyValue((Object)keys[0], (Object)1L), new KeyValue((Object)keys[1], (Object)1L), new KeyValue((Object)keys[2], (Object)3L), new KeyValue((Object)keys[3], (Object)5L), new KeyValue((Object)keys[4], (Object)2L)));
        HashSet<KeyValue> expectedBatch1 = new HashSet<KeyValue>(Collections.singleton(new KeyValue((Object)keys[4], (Object)2L)));
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, batch1, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        Predicate filterPredicate = (key, value) -> key.contains("kafka");
        KTable t1 = builder.table(this.streamOne);
        KTable t2 = t1.filter(filterPredicate, Materialized.as((String)"queryFilter"));
        t1.filterNot(filterPredicate, Materialized.as((String)"queryFilterNot"));
        t2.toStream().to(this.outputTopic);
        this.kafkaStreams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        this.waitUntilAtLeastNumRecordProcessed(this.outputTopic, 1);
        ReadOnlyKeyValueStore myFilterStore = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("queryFilter", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        ReadOnlyKeyValueStore myFilterNotStore = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("queryFilterNot", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        for (KeyValue keyValue : expectedBatch1) {
            TestUtils.waitForCondition(() -> ((Long)expectedEntry.value).equals(myFilterStore.get(expectedEntry.key)), (String)"Cannot get expected result");
        }
        for (KeyValue keyValue : batch1) {
            if (expectedBatch1.contains(keyValue)) continue;
            TestUtils.waitForCondition(() -> myFilterStore.get(batchEntry.key) == null, (String)"Cannot get null result");
        }
        for (KeyValue keyValue : expectedBatch1) {
            TestUtils.waitForCondition(() -> myFilterNotStore.get(expectedEntry.key) == null, (String)"Cannot get null result");
        }
        for (KeyValue keyValue : batch1) {
            if (expectedBatch1.contains(keyValue)) continue;
            TestUtils.waitForCondition(() -> ((Long)batchEntry.value).equals(myFilterNotStore.get(batchEntry.key)), (String)"Cannot get expected result");
        }
    }

    @Test
    public void shouldBeAbleToQueryMapValuesState() throws Exception {
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        StreamsBuilder builder = new StreamsBuilder();
        String[] keys = new String[]{"hello", "goodbye", "welcome", "go", "kafka"};
        HashSet batch1 = new HashSet(Arrays.asList(new KeyValue((Object)keys[0], (Object)"1"), new KeyValue((Object)keys[1], (Object)"1"), new KeyValue((Object)keys[2], (Object)"3"), new KeyValue((Object)keys[3], (Object)"5"), new KeyValue((Object)keys[4], (Object)"2")));
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, batch1, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        KTable t1 = builder.table(this.streamOne);
        t1.mapValues(Long::valueOf, Materialized.as((String)"queryMapValues").withValueSerde(Serdes.Long())).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.kafkaStreams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        this.waitUntilAtLeastNumRecordProcessed(this.outputTopic, 5);
        ReadOnlyKeyValueStore myMapStore = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("queryMapValues", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        for (KeyValue keyValue : batch1) {
            Assert.assertEquals((Object)Long.valueOf((String)keyValue.value), (Object)myMapStore.get(keyValue.key));
        }
        KeyValueIterator range = myMapStore.range((Object)"hello", (Object)"kafka");
        while (range.hasNext()) {
            System.out.println(range.next());
        }
    }

    @Test
    public void shouldBeAbleToQueryKeysWithGivenPrefix() throws Exception {
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        StreamsBuilder builder = new StreamsBuilder();
        String[] keys = new String[]{"hello", "goodbye", "welcome", "go", "kafka"};
        HashSet batch1 = new HashSet(Arrays.asList(new KeyValue((Object)keys[0], (Object)"1"), new KeyValue((Object)keys[1], (Object)"1"), new KeyValue((Object)keys[2], (Object)"3"), new KeyValue((Object)keys[3], (Object)"5"), new KeyValue((Object)keys[4], (Object)"2")));
        List<KeyValue> expectedPrefixScanResult = Arrays.asList(new KeyValue((Object)keys[3], (Object)5L), new KeyValue((Object)keys[1], (Object)1L));
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, batch1, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        KTable t1 = builder.table(this.streamOne);
        t1.mapValues(Long::valueOf, Materialized.as((String)"queryMapValues").withValueSerde(Serdes.Long())).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.kafkaStreams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        this.waitUntilAtLeastNumRecordProcessed(this.outputTopic, 5);
        ReadOnlyKeyValueStore myMapStore = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("queryMapValues", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        int index = 0;
        KeyValueIterator range = myMapStore.prefixScan((Object)"go", Serdes.String().serializer());
        while (range.hasNext()) {
            Assert.assertEquals((Object)expectedPrefixScanResult.get(index++), (Object)range.next());
        }
    }

    @Test
    public void shouldBeAbleToQueryMapValuesAfterFilterState() throws Exception {
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        StreamsBuilder builder = new StreamsBuilder();
        String[] keys = new String[]{"hello", "goodbye", "welcome", "go", "kafka"};
        HashSet batch1 = new HashSet(Arrays.asList(new KeyValue((Object)keys[0], (Object)"1"), new KeyValue((Object)keys[1], (Object)"1"), new KeyValue((Object)keys[2], (Object)"3"), new KeyValue((Object)keys[3], (Object)"5"), new KeyValue((Object)keys[4], (Object)"2")));
        HashSet<KeyValue> expectedBatch1 = new HashSet<KeyValue>(Collections.singleton(new KeyValue((Object)keys[4], (Object)2L)));
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, batch1, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        Predicate filterPredicate = (key, value) -> key.contains("kafka");
        KTable t1 = builder.table(this.streamOne);
        KTable t2 = t1.filter(filterPredicate, Materialized.as((String)"queryFilter"));
        KTable t3 = t2.mapValues(Long::valueOf, Materialized.as((String)"queryMapValues").withValueSerde(Serdes.Long()));
        t3.toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.kafkaStreams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        this.waitUntilAtLeastNumRecordProcessed(this.outputTopic, 1);
        ReadOnlyKeyValueStore myMapStore = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("queryMapValues", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        for (KeyValue keyValue : expectedBatch1) {
            Assert.assertEquals((Object)keyValue.value, (Object)myMapStore.get(keyValue.key));
        }
        for (KeyValue keyValue : batch1) {
            KeyValue batchEntryMapValue = new KeyValue(keyValue.key, (Object)Long.valueOf((String)keyValue.value));
            if (expectedBatch1.contains(batchEntryMapValue)) continue;
            Assert.assertNull((Object)myMapStore.get(keyValue.key));
        }
    }

    private void verifyCanQueryState(int cacheSizeBytes) throws Exception {
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)cacheSizeBytes);
        StreamsBuilder builder = new StreamsBuilder();
        String[] keys = new String[]{"hello", "goodbye", "welcome", "go", "kafka"};
        TreeSet batch1 = new TreeSet(this.stringComparator);
        batch1.addAll(Arrays.asList(new KeyValue((Object)keys[0], (Object)"hello"), new KeyValue((Object)keys[1], (Object)"goodbye"), new KeyValue((Object)keys[2], (Object)"welcome"), new KeyValue((Object)keys[3], (Object)"go"), new KeyValue((Object)keys[4], (Object)"kafka")));
        TreeSet<KeyValue<String, Long>> expectedCount = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        for (String key : keys) {
            expectedCount.add((KeyValue<String, Long>)new KeyValue((Object)key, (Object)1L));
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, batch1, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        KStream s1 = builder.stream(this.streamOne);
        String storeName = "my-count";
        s1.groupByKey().count(Materialized.as((String)"my-count")).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        String windowStoreName = "windowed-count";
        s1.groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(WINDOW_SIZE))).count(Materialized.as((String)"windowed-count"));
        this.kafkaStreams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        this.waitUntilAtLeastNumRecordProcessed(this.outputTopic, 1);
        ReadOnlyKeyValueStore myCount = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("my-count", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        ReadOnlyWindowStore windowStore = (ReadOnlyWindowStore)IntegrationTestUtils.getStore("windowed-count", this.kafkaStreams, QueryableStoreTypes.windowStore());
        this.verifyCanGetByKey(keys, expectedCount, expectedCount, (ReadOnlyWindowStore<String, Long>)windowStore, (ReadOnlyKeyValueStore<String, Long>)myCount);
        this.verifyRangeAndAll(expectedCount, (ReadOnlyKeyValueStore<String, Long>)myCount);
    }

    @Test
    public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(this.streamThree);
        String storeName = "count-by-key";
        stream.groupByKey().count(Materialized.as((String)"count-by-key"));
        this.kafkaStreams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        KeyValue hello = KeyValue.pair((Object)"hello", (Object)"hello");
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamThree, Arrays.asList(hello, hello, hello, hello, hello, hello, hello, hello), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        int maxWaitMs = 30000;
        ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("count-by-key", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        TestUtils.waitForCondition(() -> Long.valueOf(8L).equals(store.get((Object)"hello")), (long)30000L, (String)"wait for count to be 8");
        this.kafkaStreams.close();
        this.kafkaStreams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams, 30000L);
        TestUtils.waitForCondition(() -> {
            try {
                Assert.assertEquals((Object)8L, (Object)((ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("count-by-key", this.kafkaStreams, QueryableStoreTypes.keyValueStore())).get((Object)"hello"));
                return true;
            }
            catch (InvalidStateStoreException ise) {
                return false;
            }
        }, (long)30000L, (String)"waiting for store count-by-key");
    }

    @Test
    @Deprecated
    public void shouldAllowToQueryAfterThreadDied() throws Exception {
        AtomicBoolean beforeFailure = new AtomicBoolean(true);
        AtomicBoolean failed = new AtomicBoolean(false);
        String storeName = "store";
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(this.streamOne);
        input.groupByKey().reduce((value1, value2) -> {
            if (value1.length() > 1 && beforeFailure.compareAndSet(true, false)) {
                throw new RuntimeException("Injected test exception");
            }
            return value1 + value2;
        }, Materialized.as((String)"store")).toStream().to(this.outputTopic);
        this.streamsConfiguration.put("num.stream.threads", (Object)2);
        this.kafkaStreams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        this.kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams, 30000L);
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, Arrays.asList(KeyValue.pair((Object)"a", (Object)"1"), KeyValue.pair((Object)"a", (Object)"2"), KeyValue.pair((Object)"b", (Object)"3"), KeyValue.pair((Object)"b", (Object)"4")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        int maxWaitMs = 30000;
        ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("store", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        TestUtils.waitForCondition(() -> "12".equals(store.get((Object)"a")) && "34".equals(store.get((Object)"b")), (long)30000L, (String)"wait for agg to be <a,12> and <b,34>");
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, Collections.singleton(KeyValue.pair((Object)"a", (Object)"5")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        TestUtils.waitForCondition(failed::get, (long)30000L, (String)"wait for thread to fail");
        ReadOnlyKeyValueStore store2 = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("store", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        try {
            TestUtils.waitForCondition(() -> !(!"125".equals(store2.get((Object)"a")) && !"1225".equals(store2.get((Object)"a")) && !"12125".equals(store2.get((Object)"a")) || !"34".equals(store2.get((Object)"b")) && !"344".equals(store2.get((Object)"b")) && !"3434".equals(store2.get((Object)"b"))), (long)30000L, (String)"wait for agg to be <a,125>||<a,1225>||<a,12125> and <b,34>||<b,344>||<b,3434>");
        }
        catch (Throwable t2) {
            throw new RuntimeException("Store content is a: " + (String)store2.get((Object)"a") + "; b: " + (String)store2.get((Object)"b"), t2);
        }
    }

    private void verifyRangeAndAll(Set<KeyValue<String, Long>> expectedCount, ReadOnlyKeyValueStore<String, Long> myCount) {
        TreeSet<KeyValue<String, Long>> countRangeResults = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        TreeSet<KeyValue<String, Long>> countAllResults = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        TreeSet<KeyValue<String, Long>> expectedRangeResults = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        expectedRangeResults.addAll(Arrays.asList(new KeyValue((Object)"hello", (Object)1L), new KeyValue((Object)"go", (Object)1L), new KeyValue((Object)"goodbye", (Object)1L), new KeyValue((Object)"kafka", (Object)1L)));
        try (KeyValueIterator range = myCount.range((Object)"go", (Object)"kafka");){
            while (range.hasNext()) {
                countRangeResults.add((KeyValue<String, Long>)range.next());
            }
        }
        var7_7 = null;
        try (KeyValueIterator all = myCount.all();){
            while (all.hasNext()) {
                countAllResults.add((KeyValue<String, Long>)all.next());
            }
        }
        catch (Throwable throwable) {
            var7_7 = throwable;
            throw throwable;
        }
        MatcherAssert.assertThat(countRangeResults, (Matcher)IsEqual.equalTo(expectedRangeResults));
        MatcherAssert.assertThat(countAllResults, (Matcher)IsEqual.equalTo(expectedCount));
    }

    private void verifyCanGetByKey(String[] keys, Set<KeyValue<String, Long>> expectedWindowState, Set<KeyValue<String, Long>> expectedCount, ReadOnlyWindowStore<String, Long> windowStore, ReadOnlyKeyValueStore<String, Long> myCount) throws Exception {
        TreeSet<KeyValue<String, Long>> windowState = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        TreeSet<KeyValue<String, Long>> countState = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        long timeout = System.currentTimeMillis() + 30000L;
        while ((windowState.size() < keys.length || countState.size() < keys.length) && System.currentTimeMillis() < timeout) {
            Thread.sleep(10L);
            for (String key : keys) {
                windowState.addAll(this.fetch(windowStore, key));
                Long value = (Long)myCount.get((Object)key);
                if (value == null) continue;
                countState.add((KeyValue<String, Long>)new KeyValue((Object)key, (Object)value));
            }
        }
        MatcherAssert.assertThat(windowState, (Matcher)IsEqual.equalTo(expectedWindowState));
        MatcherAssert.assertThat(countState, (Matcher)IsEqual.equalTo(expectedCount));
    }

    private void verifyGreaterOrEqual(String[] keys, Map<String, Long> expectedWindowedCount, Map<String, Long> expectedCount, ReadOnlyWindowStore<String, Long> windowStore, ReadOnlyKeyValueStore<String, Long> keyValueStore) {
        HashMap<String, Long> windowState = new HashMap<String, Long>();
        HashMap<String, Long> countState = new HashMap<String, Long>();
        for (String key : keys) {
            Map<String, Long> map = this.fetchMap(windowStore, key);
            windowState.putAll(map);
            Long value = (Long)keyValueStore.get((Object)key);
            if (value == null) continue;
            countState.put(key, value);
        }
        for (Map.Entry entry : windowState.entrySet()) {
            if (expectedWindowedCount.containsKey(entry.getKey())) {
                Long expectedValue = expectedWindowedCount.get(entry.getKey());
                Assert.assertTrue(((Long)entry.getValue() >= expectedValue ? 1 : 0) != 0);
            }
            expectedWindowedCount.put((String)entry.getKey(), (Long)entry.getValue());
        }
        for (Map.Entry entry : countState.entrySet()) {
            if (expectedCount.containsKey(entry.getKey())) {
                Long expectedValue = expectedCount.get(entry.getKey());
                Assert.assertTrue(((Long)entry.getValue() >= expectedValue ? 1 : 0) != 0);
            }
            expectedCount.put((String)entry.getKey(), (Long)entry.getValue());
        }
    }

    private void waitUntilAtLeastNumRecordProcessed(String topic, int numRecs) throws Exception {
        long timeout = 120000L;
        Properties config = new Properties();
        config.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        config.setProperty("group.id", "queryable-state-consumer");
        config.setProperty("auto.offset.reset", "earliest");
        config.setProperty("key.deserializer", StringDeserializer.class.getName());
        config.setProperty("value.deserializer", LongDeserializer.class.getName());
        IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config, topic, numRecs, 120000L);
    }

    private Set<KeyValue<String, Long>> fetch(ReadOnlyWindowStore<String, Long> store, String key) {
        WindowStoreIterator fetch = store.fetch((Object)key, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(System.currentTimeMillis()));
        if (fetch.hasNext()) {
            KeyValue next = (KeyValue)fetch.next();
            return Collections.singleton(KeyValue.pair((Object)key, (Object)next.value));
        }
        return Collections.emptySet();
    }

    private Map<String, Long> fetchMap(ReadOnlyWindowStore<String, Long> store, String key) {
        WindowStoreIterator fetch = store.fetch((Object)key, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(System.currentTimeMillis()));
        if (fetch.hasNext()) {
            KeyValue next = (KeyValue)fetch.next();
            return Collections.singletonMap(key, next.value);
        }
        return Collections.emptyMap();
    }

    private class ProducerRunnable
    implements Runnable {
        private final String topic;
        private final List<String> inputValues;
        private final int numIterations;
        private int currIteration = 0;
        boolean shutdown = false;

        ProducerRunnable(String topic, List<String> inputValues, int numIterations) {
            this.topic = topic;
            this.inputValues = inputValues;
            this.numIterations = numIterations;
        }

        private synchronized void incrementIteration() {
            ++this.currIteration;
        }

        synchronized int getCurrIteration() {
            return this.currIteration;
        }

        synchronized void shutdown() {
            this.shutdown = true;
        }

        @Override
        public void run() {
            Properties producerConfig = new Properties();
            producerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
            producerConfig.put("acks", "all");
            producerConfig.put("key.serializer", StringSerializer.class);
            producerConfig.put("value.serializer", StringSerializer.class);
            try (KafkaProducer producer = new KafkaProducer(producerConfig, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());){
                while (this.getCurrIteration() < this.numIterations && !this.shutdown) {
                    for (String value : this.inputValues) {
                        producer.send(new ProducerRecord(this.topic, (Object)value));
                    }
                    this.incrementIteration();
                }
            }
        }
    }
}

