/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration.utils;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import kafka.api.Request;
import kafka.server.KafkaServer;
import kafka.server.MetadataCache;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.CompositeStateListener;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

public class IntegrationTestUtils {
    public static final long DEFAULT_TIMEOUT = 60000L;
    private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestUtils.class);

    public static <R> StateQueryResult<R> iqv2WaitForPartitions(KafkaStreams kafkaStreams, StateQueryRequest<R> request, Set<Integer> partitions) {
        long start = System.currentTimeMillis();
        long deadline = start + 60000L;
        do {
            StateQueryResult result;
            if (Thread.currentThread().isInterrupted()) {
                Assert.fail((String)"Test was interrupted.");
            }
            if ((result = kafkaStreams.query(request)).getPartitionResults().keySet().containsAll(partitions)) {
                return result;
            }
            Utils.sleep((long)100L);
        } while (System.currentTimeMillis() < deadline);
        throw new TimeoutException("The query never returned the desired partitions");
    }

    public static <R> StateQueryResult<R> iqv2WaitForResult(KafkaStreams kafkaStreams, StateQueryRequest<R> request) {
        StateQueryResult result;
        long start = System.currentTimeMillis();
        long deadline = start + 60000L;
        do {
            LinkedList<QueryResult<R>> allResults;
            if (Thread.currentThread().isInterrupted()) {
                Assert.fail((String)"Test was interrupted.");
            }
            if ((allResults = IntegrationTestUtils.getAllResults(result = kafkaStreams.query(request))).isEmpty()) {
                Utils.sleep((long)100L);
                continue;
            }
            boolean needToWait = allResults.stream().anyMatch(IntegrationTestUtils::needToWait);
            if (needToWait) {
                Utils.sleep((long)100L);
                continue;
            }
            return result;
        } while (System.currentTimeMillis() < deadline);
        throw new TimeoutException("The query never returned within the bound. Last result: " + result);
    }

    private static <R> LinkedList<QueryResult<R>> getAllResults(StateQueryResult<R> result) {
        LinkedList<QueryResult<R>> allResults = new LinkedList<QueryResult<R>>(result.getPartitionResults().values());
        if (result.getGlobalResult() != null) {
            allResults.add(result.getGlobalResult());
        }
        return allResults;
    }

    private static <R> boolean needToWait(QueryResult<R> queryResult) {
        return queryResult.isFailure() && (FailureReason.NOT_UP_TO_BOUND.equals((Object)queryResult.getFailureReason()) || FailureReason.NOT_PRESENT.equals((Object)queryResult.getFailureReason()));
    }

    public static String safeUniqueTestName(Class<?> testClass, TestName testName) {
        return (testClass.getSimpleName() + testName.getMethodName()).replace(':', '_').replace('.', '_').replace('[', '_').replace(']', '_').replace(' ', '_').replace('=', '_');
    }

    public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException {
        File node;
        String tmpDir = TestUtils.IO_TMP_DIR.getPath();
        String path = streamsConfiguration.getProperty("state.dir");
        if (path != null && (node = Paths.get(path, new String[0]).normalize().toFile()).getAbsolutePath().startsWith(tmpDir)) {
            Utils.delete((File)new File(node.getAbsolutePath()));
        }
    }

    public static void purgeLocalStreamsState(Collection<Properties> streamsConfigurations) throws IOException {
        for (Properties streamsConfig : streamsConfigurations) {
            IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
        }
    }

    public static void cleanStateBeforeTest(EmbeddedKafkaCluster cluster, String ... topics) {
        IntegrationTestUtils.cleanStateBeforeTest(cluster, 1, topics);
    }

    public static void cleanStateBeforeTest(EmbeddedKafkaCluster cluster, int partitionCount, String ... topics) {
        try {
            cluster.deleteAllTopicsAndWait(60000L);
            for (String topic : topics) {
                cluster.createTopic(topic, partitionCount, 1);
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void quietlyCleanStateAfterTest(EmbeddedKafkaCluster cluster, KafkaStreams driver) {
        try {
            driver.cleanUp();
            cluster.deleteAllTopicsAndWait(60000L);
        }
        catch (InterruptedException | RuntimeException e) {
            LOG.warn("Ignoring failure to clean test state", (Throwable)e);
        }
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Time time) {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, time, false);
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Headers headers, Time time) {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, headers, time, false);
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Time time, boolean enableTransactions) {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, null, time, enableTransactions);
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Headers headers, Time time, boolean enableTransactions) {
        try (KafkaProducer producer = new KafkaProducer(producerConfig);){
            if (enableTransactions) {
                producer.initTransactions();
                producer.beginTransaction();
            }
            for (KeyValue<K, V> record : records) {
                producer.send(new ProducerRecord(topic, null, Long.valueOf(time.milliseconds()), record.key, record.value, (Iterable)headers));
                time.sleep(1L);
            }
            if (enableTransactions) {
                producer.commitTransaction();
            } else {
                producer.flush();
            }
        }
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Long timestamp) {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, timestamp, false);
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Long timestamp, boolean enableTransactions) {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions);
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Headers headers, Long timestamp, boolean enableTransactions) {
        try (KafkaProducer producer = new KafkaProducer(producerConfig);){
            if (enableTransactions) {
                producer.initTransactions();
                producer.beginTransaction();
            }
            for (KeyValue<K, V> record : records) {
                producer.send(new ProducerRecord(topic, null, timestamp, record.key, record.value, (Iterable)headers));
            }
            if (enableTransactions) {
                producer.commitTransaction();
            }
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static <V, K> void produceSynchronously(Properties producerConfig, boolean eos, String topic, Optional<Integer> partition, List<KeyValueTimestamp<K, V>> toProduce) {
        try (KafkaProducer producer = new KafkaProducer(producerConfig);){
            if (eos) {
                producer.initTransactions();
                producer.beginTransaction();
            }
            LinkedList<Future> futures = new LinkedList<Future>();
            for (KeyValueTimestamp<K, V> record : toProduce) {
                Future f = producer.send(new ProducerRecord(topic, (Integer)partition.orElse(null), Long.valueOf(record.timestamp()), record.key(), record.value(), null));
                futures.add(f);
            }
            if (eos) {
                producer.commitTransaction();
            } else {
                producer.flush();
            }
            for (Future future : futures) {
                try {
                    future.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                    return;
                }
            }
        }
    }

    public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Long timestamp) throws Exception {
        try (KafkaProducer producer = new KafkaProducer(producerConfig);){
            producer.initTransactions();
            for (KeyValue<K, V> record : records) {
                producer.beginTransaction();
                Future f = producer.send(new ProducerRecord(topic, null, timestamp, record.key, record.value));
                f.get();
                producer.abortTransaction();
            }
        }
    }

    public static <V> void produceValuesSynchronously(String topic, Collection<V> records, Properties producerConfig, Time time) {
        IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false);
    }

    public static <V> void produceValuesSynchronously(String topic, Collection<V> records, Properties producerConfig, Time time, boolean enableTransactions) {
        ArrayList keyedRecords = new ArrayList();
        for (V value : records) {
            KeyValue kv = new KeyValue(null, value);
            keyedRecords.add(kv);
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions);
    }

    public static void waitForCompletion(KafkaStreams streams, int expectedPartitions, long timeoutMilliseconds) {
        double totalLag;
        int lagMetrics;
        long start = System.currentTimeMillis();
        do {
            lagMetrics = 0;
            totalLag = 0.0;
            for (Metric metric : streams.metrics().values()) {
                if (!metric.metricName().name().equals("records-lag") || ((String)metric.metricName().tags().get("client-id")).endsWith("restore-consumer")) continue;
                ++lagMetrics;
                totalLag += ((Number)metric.metricValue()).doubleValue();
            }
            if (lagMetrics < expectedPartitions || totalLag != 0.0) continue;
            return;
        } while (System.currentTimeMillis() - start < timeoutMilliseconds);
        throw new RuntimeException(String.format("Timed out waiting for completion. lagMetrics=[%s/%s] totalLag=[%s]", lagMetrics, expectedPartitions, totalLag));
    }

    public static void waitForStandbyCompletion(KafkaStreams streams, int expectedPartitions, long timeoutMilliseconds) {
        double totalLag;
        int lagMetrics;
        long start = System.currentTimeMillis();
        do {
            lagMetrics = 0;
            totalLag = 0.0;
            for (Metric metric : streams.metrics().values()) {
                if (!metric.metricName().name().equals("records-lag") || !((String)metric.metricName().tags().get("client-id")).endsWith("restore-consumer")) continue;
                ++lagMetrics;
                totalLag += ((Number)metric.metricValue()).doubleValue();
            }
            if (lagMetrics < expectedPartitions || totalLag != 0.0) continue;
            return;
        } while (System.currentTimeMillis() - start < timeoutMilliseconds);
        throw new RuntimeException(String.format("Timed out waiting for completion. lagMetrics=[%s/%s] totalLag=[%s]", lagMetrics, expectedPartitions, totalLag));
    }

    public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws Exception {
        return IntegrationTestUtils.waitUntilMinRecordsReceived(consumerConfig, topic, expectedNumRecords, 60000L);
    }

    public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords, long waitTime) throws Exception {
        ArrayList accumData = new ArrayList();
        String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime);
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestUtils.retryOnExceptionWithTimeout((long)waitTime, () -> {
                List readData = IntegrationTestUtils.readRecords(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                MatcherAssert.assertThat((String)reason, (Object)accumData.size(), (Matcher)Matchers.is((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(expectedNumRecords))));
            });
        }
        return accumData;
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws Exception {
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, 60000L);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords, long waitTime) throws Exception {
        ArrayList accumData = new ArrayList();
        String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime);
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestUtils.retryOnExceptionWithTimeout((long)waitTime, () -> {
                List readData = IntegrationTestUtils.readKeyValues(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                MatcherAssert.assertThat((String)(reason + ",  currently accumulated data is " + accumData), (Object)accumData.size(), (Matcher)Matchers.is((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(expectedNumRecords))));
            });
        }
        return accumData;
    }

    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilMinKeyValueWithTimestampRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords, long waitTime) throws Exception {
        ArrayList accumData = new ArrayList();
        String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime);
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestUtils.retryOnExceptionWithTimeout((long)waitTime, () -> {
                List readData = IntegrationTestUtils.readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                MatcherAssert.assertThat((String)reason, (Object)accumData.size(), (Matcher)Matchers.is((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(expectedNumRecords))));
            });
        }
        return accumData;
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(Properties consumerConfig, String topic, List<KeyValue<K, V>> expectedRecords) throws Exception {
        return IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, 60000L);
    }

    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(Properties consumerConfig, String topic, List<KeyValueTimestamp<K, V>> expectedRecords) throws Exception {
        return IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, 60000L, true);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(Properties consumerConfig, String topic, List<KeyValue<K, V>> expectedRecords, long waitTime) throws Exception {
        return IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false);
    }

    private static <K, V, T> List<T> waitUntilFinalKeyValueRecordsReceived(Properties consumerConfig, String topic, List<T> expectedRecords, long waitTime, boolean withTimestamp) throws Exception {
        ArrayList accumData = new ArrayList();
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestCondition valuesRead = () -> {
                List readData = withTimestamp ? IntegrationTestUtils.readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedRecords.size()) : IntegrationTestUtils.readKeyValues(topic, consumer, waitTime, expectedRecords.size());
                accumData.addAll(readData);
                List accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
                HashMap<Object, List> finalAccumData = new HashMap<Object, List>();
                for (Object kv : accumulatedActual) {
                    finalAccumData.computeIfAbsent(withTimestamp ? ((KeyValueTimestamp)kv).key() : ((KeyValue)kv).key, key -> new ArrayList()).add(kv);
                }
                HashMap<Object, List> finalExpected = new HashMap<Object, List>();
                for (Object kv : expectedRecords) {
                    finalExpected.computeIfAbsent(withTimestamp ? ((KeyValueTimestamp)kv).key() : ((KeyValue)kv).key, key -> new ArrayList()).add(kv);
                }
                return finalAccumData.equals(finalExpected);
            };
            String conditionDetails = "Did not receive all " + expectedRecords + " records from topic " + topic + " (got " + accumData + ")";
            TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        }
        return accumData;
    }

    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws Exception {
        return IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, 60000L);
    }

    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords, long waitTime) throws Exception {
        ArrayList accumData = new ArrayList();
        String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime);
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestUtils.retryOnExceptionWithTimeout((long)waitTime, () -> {
                List readData = IntegrationTestUtils.readValues(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                MatcherAssert.assertThat((String)reason, (Object)accumData.size(), (Matcher)Matchers.is((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(expectedNumRecords))));
            });
        }
        return accumData;
    }

    public static void waitForTopicPartitions(List<KafkaServer> servers, List<TopicPartition> partitions, long timeout) throws InterruptedException {
        long end = System.currentTimeMillis() + timeout;
        for (TopicPartition partition : partitions) {
            long remaining = end - System.currentTimeMillis();
            if (remaining <= 0L) {
                throw new AssertionError((Object)("timed out while waiting for partitions to become available. Timeout=" + timeout));
            }
            IntegrationTestUtils.waitUntilMetadataIsPropagated(servers, partition.topic(), partition.partition(), remaining);
        }
    }

    private static void waitUntilMetadataIsPropagated(List<KafkaServer> servers, String topic, int partition, long timeout) throws InterruptedException {
        String baseReason = String.format("Metadata for topic=%s partition=%d was not propagated to all brokers within %d ms. ", topic, partition, timeout);
        TestUtils.retryOnExceptionWithTimeout((long)timeout, () -> {
            ArrayList<KafkaServer> emptyPartitionInfos = new ArrayList<KafkaServer>();
            ArrayList<KafkaServer> invalidBrokerIds = new ArrayList<KafkaServer>();
            for (KafkaServer server : servers) {
                MetadataCache metadataCache = server.dataPlaneRequestProcessor().metadataCache();
                Option partitionInfo = metadataCache.getPartitionInfo(topic, partition);
                if (partitionInfo.isEmpty()) {
                    emptyPartitionInfos.add(server);
                    continue;
                }
                UpdateMetadataRequestData.UpdateMetadataPartitionState metadataPartitionState = (UpdateMetadataRequestData.UpdateMetadataPartitionState)partitionInfo.get();
                if (Request.isValidBrokerId((int)metadataPartitionState.leader())) continue;
                invalidBrokerIds.add(server);
            }
            String reason = baseReason + ". Brokers without partition info: " + emptyPartitionInfos + ". Brokers with invalid broker id for partition leader: " + invalidBrokerIds;
            MatcherAssert.assertThat((String)reason, (emptyPartitionInfos.isEmpty() && invalidBrokerIds.isEmpty() ? 1 : 0) != 0);
        });
    }

    public static void startApplicationAndWaitUntilRunning(List<KafkaStreams> streamsList, Duration timeout) throws Exception {
        ReentrantLock stateLock = new ReentrantLock();
        Condition stateUpdate = stateLock.newCondition();
        HashMap<KafkaStreams, KafkaStreams.State> stateMap = new HashMap<KafkaStreams, KafkaStreams.State>();
        for (KafkaStreams streams : streamsList) {
            stateMap.put(streams, streams.state());
            KafkaStreams.StateListener prevStateListener = IntegrationTestUtils.getStateListener(streams);
            KafkaStreams.StateListener newStateListener = (newState, oldState) -> {
                stateLock.lock();
                try {
                    stateMap.put(streams, newState);
                    if (newState == KafkaStreams.State.RUNNING && stateMap.values().stream().allMatch(state -> state == KafkaStreams.State.RUNNING)) {
                        stateUpdate.signalAll();
                    }
                }
                finally {
                    stateLock.unlock();
                }
            };
            streams.setStateListener((KafkaStreams.StateListener)(prevStateListener != null ? new CompositeStateListener(prevStateListener, newStateListener) : newStateListener));
        }
        for (KafkaStreams streams : streamsList) {
            streams.start();
        }
        long expectedEnd = System.currentTimeMillis() + timeout.toMillis();
        stateLock.lock();
        try {
            while (true) {
                HashMap nonRunningStreams = new HashMap();
                for (Map.Entry entry : stateMap.entrySet()) {
                    if (entry.getValue() == KafkaStreams.State.RUNNING) continue;
                    nonRunningStreams.put(entry.getKey(), entry.getValue());
                }
                if (nonRunningStreams.isEmpty()) {
                    return;
                }
                long millisRemaining = expectedEnd - System.currentTimeMillis();
                if (millisRemaining <= 0L) {
                    Assert.fail((String)("Application did not reach a RUNNING state for all streams instances. Non-running instances: " + nonRunningStreams));
                }
                stateUpdate.await(millisRemaining, TimeUnit.MILLISECONDS);
            }
        }
        finally {
            stateLock.unlock();
        }
    }

    public static void waitForApplicationState(List<KafkaStreams> streamsList, KafkaStreams.State state, Duration timeout) throws InterruptedException {
        TestUtils.retryOnExceptionWithTimeout((long)timeout.toMillis(), () -> {
            Map<KafkaStreams, KafkaStreams.State> streamsToStates = streamsList.stream().collect(Collectors.toMap(stream -> stream, KafkaStreams::state));
            Map<KafkaStreams, KafkaStreams.State> wrongStateMap = streamsToStates.entrySet().stream().filter(entry -> entry.getValue() != state).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            String reason = String.format("Expected all streams instances in %s to be %s within %d ms, but the following were not: %s", streamsList, state, timeout.toMillis(), wrongStateMap);
            MatcherAssert.assertThat((String)reason, (boolean)wrongStateMap.isEmpty());
        });
    }

    public static void waitForEmptyConsumerGroup(Admin adminClient, String applicationId, long timeoutMs) throws Exception {
        TestUtils.waitForCondition((TestCondition)new ConsumerGroupInactiveCondition(adminClient, applicationId), (long)timeoutMs, (String)("Test consumer group " + applicationId + " still active even after waiting " + timeoutMs + " ms."));
    }

    public static boolean isEmptyConsumerGroup(Admin adminClient, String applicationId) {
        try {
            ConsumerGroupDescription groupDescription = (ConsumerGroupDescription)((KafkaFuture)adminClient.describeConsumerGroups(Collections.singletonList(applicationId)).describedGroups().get(applicationId)).get();
            return groupDescription.members().isEmpty();
        }
        catch (InterruptedException | ExecutionException e) {
            return false;
        }
    }

    private static KafkaStreams.StateListener getStateListener(KafkaStreams streams) {
        try {
            if (streams instanceof KafkaStreamsNamedTopologyWrapper) {
                Field field = streams.getClass().getSuperclass().getDeclaredField("stateListener");
                field.setAccessible(true);
                return (KafkaStreams.StateListener)field.get(streams);
            }
            Field field = streams.getClass().getDeclaredField("stateListener");
            field.setAccessible(true);
            return (KafkaStreams.StateListener)field.get(streams);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Failed to get StateListener through reflection", e);
        }
    }

    public static <K, V> void verifyKeyValueTimestamps(Properties consumerConfig, String topic, List<KeyValueTimestamp<K, V>> expected) {
        List<ConsumerRecord<K, V>> results;
        try {
            results = IntegrationTestUtils.waitUntilMinRecordsReceived(consumerConfig, topic, expected.size());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        if (results.size() != expected.size()) {
            throw new AssertionError((Object)(IntegrationTestUtils.printRecords(results) + " != " + expected));
        }
        Iterator<KeyValueTimestamp<K, V>> expectedIterator = expected.iterator();
        for (ConsumerRecord<K, V> result : results) {
            KeyValueTimestamp<K, V> expected1 = expectedIterator.next();
            try {
                IntegrationTestUtils.compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp());
            }
            catch (AssertionError e) {
                throw new AssertionError(IntegrationTestUtils.printRecords(results) + " != " + expected, (Throwable)((Object)e));
            }
        }
    }

    public static void verifyKeyValueTimestamps(Properties consumerConfig, String topic, Set<KeyValueTimestamp<String, Long>> expected) {
        List results;
        try {
            results = IntegrationTestUtils.waitUntilMinRecordsReceived(consumerConfig, topic, expected.size());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        if (results.size() != expected.size()) {
            throw new AssertionError((Object)(IntegrationTestUtils.printRecords(results) + " != " + expected));
        }
        Set actual = results.stream().map(result -> new KeyValueTimestamp<Object, Object>(result.key(), result.value(), result.timestamp())).collect(Collectors.toSet());
        MatcherAssert.assertThat(actual, (Matcher)Matchers.equalTo(expected));
    }

    private static <K, V> void compareKeyValueTimestamp(ConsumerRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) {
        Objects.requireNonNull(record);
        Object recordKey = record.key();
        Object recordValue = record.value();
        long recordTimestamp = record.timestamp();
        AssertionError error = new AssertionError((Object)("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp + " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp));
        if (recordKey != null ? !recordKey.equals(expectedKey) : expectedKey != null) {
            throw error;
        }
        if (recordValue != null ? !recordValue.equals(expectedValue) : expectedValue != null) {
            throw error;
        }
        if (recordTimestamp != expectedTimestamp) {
            throw error;
        }
    }

    private static <K, V> String printRecords(List<ConsumerRecord<K, V>> result) {
        StringBuilder resultStr = new StringBuilder();
        resultStr.append("[\n");
        for (ConsumerRecord<K, V> record : result) {
            resultStr.append("  ").append(record.toString()).append("\n");
        }
        resultStr.append("]");
        return resultStr.toString();
    }

    private static <V> List<V> readValues(String topic, Consumer<Object, V> consumer, long waitTime, int maxMessages) {
        ArrayList<Object> returnList = new ArrayList<Object>();
        List<KeyValue<Object, V>> kvs = IntegrationTestUtils.readKeyValues(topic, consumer, waitTime, maxMessages);
        for (KeyValue<Object, V> kv : kvs) {
            returnList.add(kv.value);
        }
        return returnList;
    }

    private static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Consumer<K, V> consumer, long waitTime, int maxMessages) {
        ArrayList<KeyValue<K, V>> consumedValues = new ArrayList<KeyValue<K, V>>();
        List<ConsumerRecord<K, V>> records = IntegrationTestUtils.readRecords(topic, consumer, waitTime, maxMessages);
        for (ConsumerRecord<K, V> record : records) {
            consumedValues.add(new KeyValue(record.key(), record.value()));
        }
        return consumedValues;
    }

    private static <K, V> List<KeyValueTimestamp<K, V>> readKeyValuesWithTimestamp(String topic, Consumer<K, V> consumer, long waitTime, int maxMessages) {
        ArrayList<KeyValueTimestamp<K, V>> consumedValues = new ArrayList<KeyValueTimestamp<K, V>>();
        List<ConsumerRecord<K, V>> records = IntegrationTestUtils.readRecords(topic, consumer, waitTime, maxMessages);
        for (ConsumerRecord<K, V> record : records) {
            consumedValues.add(new KeyValueTimestamp<Object, Object>(record.key(), record.value(), record.timestamp()));
        }
        return consumedValues;
    }

    private static <K, V> List<ConsumerRecord<K, V>> readRecords(String topic, Consumer<K, V> consumer, long waitTime, int maxMessages) {
        consumer.subscribe(Collections.singletonList(topic));
        int pollIntervalMs = 100;
        ArrayList<ConsumerRecord<K, V>> consumerRecords = new ArrayList<ConsumerRecord<K, V>>();
        int totalPollTimeMs = 0;
        while ((long)totalPollTimeMs < waitTime && IntegrationTestUtils.continueConsuming(consumerRecords.size(), maxMessages)) {
            totalPollTimeMs += 100;
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
            for (ConsumerRecord record : records) {
                consumerRecords.add(record);
            }
        }
        return consumerRecords;
    }

    private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
        return maxMessages > 0 && messagesConsumed < maxMessages;
    }

    private static <K, V> KafkaConsumer<K, V> createConsumer(Properties consumerConfig) {
        Properties filtered = new Properties();
        filtered.putAll((Map<?, ?>)consumerConfig);
        filtered.setProperty("auto.offset.reset", "earliest");
        filtered.setProperty("enable.auto.commit", "true");
        return new KafkaConsumer(filtered);
    }

    public static KafkaStreams getStartedStreams(Properties streamsConfig, StreamsBuilder builder, boolean clean) {
        KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig);
        if (clean) {
            driver.cleanUp();
        }
        driver.start();
        return driver;
    }

    public static KafkaStreams getRunningStreams(Properties streamsConfig, StreamsBuilder builder, boolean clean) {
        KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig);
        if (clean) {
            driver.cleanUp();
        }
        CountDownLatch latch = new CountDownLatch(1);
        driver.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING) {
                latch.countDown();
            }
        });
        driver.start();
        try {
            latch.await(60000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Streams didn't start in time.", e);
        }
        return driver;
    }

    public static <S> S getStore(String storeName, KafkaStreams streams, QueryableStoreType<S> storeType) throws Exception {
        return IntegrationTestUtils.getStore(60000L, storeName, streams, storeType);
    }

    public static <S> S getStore(String storeName, KafkaStreams streams, boolean enableStaleQuery, QueryableStoreType<S> storeType) throws Exception {
        return IntegrationTestUtils.getStore(60000L, storeName, streams, enableStaleQuery, storeType);
    }

    public static <S> S getStore(long waitTime, String storeName, KafkaStreams streams, QueryableStoreType<S> storeType) throws Exception {
        return IntegrationTestUtils.getStore(waitTime, storeName, streams, false, storeType);
    }

    public static <S> S getStore(long waitTime, String storeName, KafkaStreams streams, boolean enableStaleQuery, QueryableStoreType<S> storeType) throws Exception {
        StoreQueryParameters param = enableStaleQuery ? StoreQueryParameters.fromNameAndType((String)storeName, storeType).enableStaleStores() : StoreQueryParameters.fromNameAndType((String)storeName, storeType);
        return IntegrationTestUtils.getStore(waitTime, streams, param);
    }

    public static <S> S getStore(KafkaStreams streams, StoreQueryParameters<S> param) throws Exception {
        return IntegrationTestUtils.getStore(60000L, streams, param);
    }

    public static <S> S getStore(long waitTime, KafkaStreams streams, StoreQueryParameters<S> param) throws Exception {
        long expectedEnd = System.currentTimeMillis() + waitTime;
        while (true) {
            block5: {
                try {
                    return (S)streams.store(param);
                }
                catch (InvalidStateStoreException e) {
                    if (System.currentTimeMillis() > expectedEnd) {
                        throw e;
                    }
                }
                catch (Exception e) {
                    if (System.currentTimeMillis() <= expectedEnd) break block5;
                    throw new AssertionError((Object)e);
                }
            }
            Thread.sleep(Math.min(100L, waitTime));
        }
    }

    public static long getTopicSize(Properties consumerConfig, String topicName) {
        long sum = 0L;
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            Collection partitions = consumer.partitionsFor(topicName).stream().map(info -> new TopicPartition(topicName, info.partition())).collect(Collectors.toList());
            Map beginningOffsets = consumer.beginningOffsets(partitions);
            Map endOffsets = consumer.endOffsets(partitions);
            for (TopicPartition partition : beginningOffsets.keySet()) {
                sum += (Long)endOffsets.get(partition) - (Long)beginningOffsets.get(partition);
            }
        }
        return sum;
    }

    private static Double getStreamsPollNumber(KafkaStreams kafkaStreams) {
        return (Double)((Metric)kafkaStreams.metrics().entrySet().stream().filter(entry -> ((MetricName)entry.getKey()).name().equals("poll-total")).findFirst().get().getValue()).metricValue();
    }

    public static void waitUntilStreamsHasPolled(KafkaStreams kafkaStreams, int pollNumber) throws InterruptedException {
        Double initialCount = IntegrationTestUtils.getStreamsPollNumber(kafkaStreams);
        TestUtils.retryOnExceptionWithTimeout((long)1000L, () -> MatcherAssert.assertThat((Object)IntegrationTestUtils.getStreamsPollNumber(kafkaStreams), (Matcher)Matchers.is((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Double.valueOf(initialCount + (double)pollNumber)))));
    }

    public static class TrackingStateRestoreListener
    implements StateRestoreListener {
        public final Map<TopicPartition, AtomicLong> changelogToStartOffset = new ConcurrentHashMap<TopicPartition, AtomicLong>();
        public final Map<TopicPartition, AtomicLong> changelogToEndOffset = new ConcurrentHashMap<TopicPartition, AtomicLong>();
        public final Map<TopicPartition, AtomicLong> changelogToTotalNumRestored = new ConcurrentHashMap<TopicPartition, AtomicLong>();

        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            this.changelogToStartOffset.put(topicPartition, new AtomicLong(startingOffset));
            this.changelogToEndOffset.put(topicPartition, new AtomicLong(endingOffset));
            this.changelogToTotalNumRestored.put(topicPartition, new AtomicLong(0L));
        }

        public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
            this.changelogToTotalNumRestored.get(topicPartition).addAndGet(numRestored);
        }

        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
        }

        public long totalNumRestored() {
            long totalNumRestored = 0L;
            for (AtomicLong numRestored : this.changelogToTotalNumRestored.values()) {
                totalNumRestored += numRestored.get();
            }
            return totalNumRestored;
        }
    }

    public static class StableAssignmentListener
    implements AssignorConfiguration.AssignmentListener {
        final AtomicInteger numStableAssignments = new AtomicInteger(0);
        int nextExpectedNumStableAssignments;

        public void onAssignmentComplete(boolean stable) {
            if (stable) {
                this.numStableAssignments.incrementAndGet();
            }
        }

        public int numStableAssignments() {
            return this.numStableAssignments.get();
        }

        public void prepareForRebalance() {
            this.nextExpectedNumStableAssignments = this.numStableAssignments.get() + 1;
        }

        public void waitForNextStableAssignment(long maxWaitMs) throws InterruptedException {
            TestUtils.waitForCondition(() -> this.numStableAssignments() >= this.nextExpectedNumStableAssignments, (long)maxWaitMs, () -> "Client did not reach " + this.nextExpectedNumStableAssignments + " stable assignments on time, numStableAssignments was " + this.numStableAssignments());
        }
    }

    private static class ConsumerGroupInactiveCondition
    implements TestCondition {
        private final Admin adminClient;
        private final String applicationId;

        private ConsumerGroupInactiveCondition(Admin adminClient, String applicationId) {
            this.adminClient = adminClient;
            this.applicationId = applicationId;
        }

        public boolean conditionMet() {
            return IntegrationTestUtils.isEmptyConsumerGroup(this.adminClient, this.applicationId);
        }
    }

    public static class StateListenerStub
    implements StreamThread.StateListener {
        boolean toPendingShutdownSeen = false;

        public void onChange(Thread thread, ThreadStateTransitionValidator newState, ThreadStateTransitionValidator oldState) {
            if (newState == StreamThread.State.PENDING_SHUTDOWN) {
                this.toPendingShutdownSeen = true;
            }
        }

        public boolean transitToPendingShutdownSeen() {
            return this.toPendingShutdownSeen;
        }
    }
}

