/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.TestUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={KafkaStreams.class, StreamThread.class, ClientMetrics.class})
public class KafkaStreamsTest {
    private static final int NUM_THREADS = 2;
    private static final String APPLICATION_ID = "appId";
    private static final String CLIENT_ID = "test-client";
    @Rule
    public TestName testName = new TestName();
    private MockClientSupplier supplier;
    private MockTime time;
    private Properties props;
    @Mock
    private StateDirectory stateDirectory;
    @Mock
    private StreamThread streamThreadOne;
    @Mock
    private StreamThread streamThreadTwo;
    @Mock
    private GlobalStreamThread globalStreamThread;
    @Mock
    private Metrics metrics;
    private StateListenerStub streamsStateListener;
    private Capture<List<MetricsReporter>> metricsReportersCapture;
    private Capture<StreamThread.StateListener> threadStatelistenerCapture;

    @Before
    public void before() throws Exception {
        this.time = new MockTime();
        this.supplier = new MockClientSupplier();
        this.supplier.setCluster(Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9999))));
        this.streamsStateListener = new StateListenerStub();
        this.threadStatelistenerCapture = EasyMock.newCapture();
        this.metricsReportersCapture = EasyMock.newCapture();
        this.props = new Properties();
        this.props.put("application.id", APPLICATION_ID);
        this.props.put("client.id", CLIENT_ID);
        this.props.put("bootstrap.servers", "localhost:2018");
        this.props.put("metric.reporters", MockMetricsReporter.class.getName());
        this.props.put("state.dir", TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", (Object)2);
        this.prepareStreams();
    }

    private void prepareStreams() throws Exception {
        PowerMock.expectNew(Metrics.class, (Object[])new Object[]{EasyMock.anyObject(MetricConfig.class), EasyMock.capture(this.metricsReportersCapture), EasyMock.anyObject(Time.class), EasyMock.anyObject(MetricsContext.class)}).andAnswer(() -> {
            for (MetricsReporter reporter : (List)this.metricsReportersCapture.getValue()) {
                reporter.init(Collections.emptyList());
            }
            return this.metrics;
        }).anyTimes();
        this.metrics.close();
        EasyMock.expectLastCall().andAnswer(() -> {
            for (MetricsReporter reporter : (List)this.metricsReportersCapture.getValue()) {
                reporter.close();
            }
            return null;
        }).anyTimes();
        PowerMock.mockStatic(ClientMetrics.class);
        EasyMock.expect((Object)ClientMetrics.version()).andReturn((Object)"1.56");
        EasyMock.expect((Object)ClientMetrics.commitId()).andReturn((Object)"1a2b3c4d5e");
        ClientMetrics.addVersionMetric((StreamsMetricsImpl)((StreamsMetricsImpl)EasyMock.anyObject(StreamsMetricsImpl.class)));
        ClientMetrics.addCommitIdMetric((StreamsMetricsImpl)((StreamsMetricsImpl)EasyMock.anyObject(StreamsMetricsImpl.class)));
        ClientMetrics.addApplicationIdMetric((StreamsMetricsImpl)((StreamsMetricsImpl)EasyMock.anyObject(StreamsMetricsImpl.class)), (String)((String)EasyMock.eq((Object)APPLICATION_ID)));
        ClientMetrics.addTopologyDescriptionMetric((StreamsMetricsImpl)((StreamsMetricsImpl)EasyMock.anyObject(StreamsMetricsImpl.class)), (String)EasyMock.anyString());
        ClientMetrics.addStateMetric((StreamsMetricsImpl)((StreamsMetricsImpl)EasyMock.anyObject(StreamsMetricsImpl.class)), (Gauge)((Gauge)EasyMock.anyObject()));
        ClientMetrics.addNumAliveStreamThreadMetric((StreamsMetricsImpl)((StreamsMetricsImpl)EasyMock.anyObject(StreamsMetricsImpl.class)), (Gauge)((Gauge)EasyMock.anyObject()));
        PowerMock.mockStatic(StreamThread.class);
        EasyMock.expect((Object)StreamThread.create((InternalTopologyBuilder)((InternalTopologyBuilder)EasyMock.anyObject(InternalTopologyBuilder.class)), (StreamsConfig)((StreamsConfig)EasyMock.anyObject(StreamsConfig.class)), (KafkaClientSupplier)((KafkaClientSupplier)EasyMock.anyObject(KafkaClientSupplier.class)), (Admin)((Admin)EasyMock.anyObject(Admin.class)), (UUID)((UUID)EasyMock.anyObject(UUID.class)), (String)((String)EasyMock.anyObject(String.class)), (StreamsMetricsImpl)((StreamsMetricsImpl)EasyMock.anyObject(StreamsMetricsImpl.class)), (Time)((Time)EasyMock.anyObject(Time.class)), (StreamsMetadataState)((StreamsMetadataState)EasyMock.anyObject(StreamsMetadataState.class)), (long)EasyMock.anyLong(), (StateDirectory)((StateDirectory)EasyMock.anyObject(StateDirectory.class)), (StateRestoreListener)((StateRestoreListener)EasyMock.anyObject(StateRestoreListener.class)), (int)EasyMock.anyInt())).andReturn((Object)this.streamThreadOne).andReturn((Object)this.streamThreadTwo);
        EasyMock.expect((Object)StreamThread.eosEnabled((StreamsConfig)((StreamsConfig)EasyMock.anyObject(StreamsConfig.class)))).andReturn((Object)false).anyTimes();
        EasyMock.expect((Object)StreamThread.processingMode((StreamsConfig)((StreamsConfig)EasyMock.anyObject(StreamsConfig.class)))).andReturn((Object)StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes();
        EasyMock.expect((Object)this.streamThreadOne.getId()).andReturn((Object)0L).anyTimes();
        EasyMock.expect((Object)this.streamThreadTwo.getId()).andReturn((Object)1L).anyTimes();
        this.prepareStreamThread(this.streamThreadOne, true);
        this.prepareStreamThread(this.streamThreadTwo, false);
        AtomicReference<GlobalStreamThread.State> globalThreadState = new AtomicReference<GlobalStreamThread.State>(GlobalStreamThread.State.CREATED);
        PowerMock.expectNew(GlobalStreamThread.class, (Object[])new Object[]{EasyMock.anyObject(ProcessorTopology.class), EasyMock.anyObject(StreamsConfig.class), EasyMock.anyObject(Consumer.class), EasyMock.anyObject(StateDirectory.class), EasyMock.anyLong(), EasyMock.anyObject(StreamsMetricsImpl.class), EasyMock.anyObject(Time.class), EasyMock.anyString(), EasyMock.anyObject(StateRestoreListener.class)}).andReturn((Object)this.globalStreamThread).anyTimes();
        EasyMock.expect((Object)this.globalStreamThread.state()).andAnswer(globalThreadState::get).anyTimes();
        this.globalStreamThread.setStateListener((StreamThread.StateListener)EasyMock.capture(this.threadStatelistenerCapture));
        EasyMock.expectLastCall().anyTimes();
        this.globalStreamThread.start();
        EasyMock.expectLastCall().andAnswer(() -> {
            globalThreadState.set(GlobalStreamThread.State.RUNNING);
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)this.globalStreamThread, (ThreadStateTransitionValidator)GlobalStreamThread.State.RUNNING, (ThreadStateTransitionValidator)GlobalStreamThread.State.CREATED);
            return null;
        }).anyTimes();
        this.globalStreamThread.shutdown();
        EasyMock.expectLastCall().andAnswer(() -> {
            this.supplier.restoreConsumer.close();
            for (MockProducer<byte[], byte[]> producer : this.supplier.producers) {
                producer.close();
            }
            globalThreadState.set(GlobalStreamThread.State.DEAD);
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)this.globalStreamThread, (ThreadStateTransitionValidator)GlobalStreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)GlobalStreamThread.State.RUNNING);
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)this.globalStreamThread, (ThreadStateTransitionValidator)GlobalStreamThread.State.DEAD, (ThreadStateTransitionValidator)GlobalStreamThread.State.PENDING_SHUTDOWN);
            return null;
        }).anyTimes();
        EasyMock.expect((Object)this.globalStreamThread.stillRunning()).andReturn((Object)(globalThreadState.get() == GlobalStreamThread.State.RUNNING ? 1 : 0)).anyTimes();
        this.globalStreamThread.join();
        EasyMock.expectLastCall().anyTimes();
        PowerMock.replay((Object[])new Object[]{StreamThread.class, Metrics.class, this.metrics, ClientMetrics.class, this.streamThreadOne, this.streamThreadTwo, GlobalStreamThread.class, this.globalStreamThread});
    }

    private void prepareStreamThread(StreamThread thread, boolean terminable) throws Exception {
        AtomicReference<StreamThread.State> state = new AtomicReference<StreamThread.State>(StreamThread.State.CREATED);
        EasyMock.expect((Object)thread.state()).andAnswer(state::get).anyTimes();
        thread.setStateListener((StreamThread.StateListener)EasyMock.capture(this.threadStatelistenerCapture));
        EasyMock.expectLastCall().anyTimes();
        thread.start();
        EasyMock.expectLastCall().andAnswer(() -> {
            state.set(StreamThread.State.STARTING);
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.STARTING, (ThreadStateTransitionValidator)StreamThread.State.CREATED);
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED, (ThreadStateTransitionValidator)StreamThread.State.STARTING);
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED);
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.RUNNING, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED);
            return null;
        }).anyTimes();
        thread.shutdown();
        EasyMock.expectLastCall().andAnswer(() -> {
            this.supplier.consumer.close();
            this.supplier.restoreConsumer.close();
            for (MockProducer<byte[], byte[]> producer : this.supplier.producers) {
                producer.close();
            }
            state.set(StreamThread.State.DEAD);
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)StreamThread.State.RUNNING);
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.DEAD, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN);
            return null;
        }).anyTimes();
        EasyMock.expect((Object)thread.isRunning()).andReturn((Object)(state.get() == StreamThread.State.RUNNING ? 1 : 0)).anyTimes();
        thread.join();
        if (terminable) {
            EasyMock.expectLastCall().anyTimes();
        } else {
            EasyMock.expectLastCall().andAnswer(() -> {
                Thread.sleep(50L);
                return null;
            }).anyTimes();
        }
        EasyMock.expect((Object)thread.activeTasks()).andStubReturn(Collections.emptyList());
        EasyMock.expect((Object)thread.allTasks()).andStubReturn(Collections.emptyMap());
    }

    @Test
    public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.close();
        Assert.assertEquals((Object)KafkaStreams.State.NOT_RUNNING, (Object)streams.state());
    }

    @Test
    public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.setStateListener((KafkaStreams.StateListener)this.streamsStateListener);
        Assert.assertEquals((long)0L, (long)this.streamsStateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.CREATED, (Object)streams.state());
        streams.start();
        TestUtils.waitForCondition(() -> this.streamsStateListener.numChanges == 2, (String)"Streams never started.");
        Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)streams.state());
        for (StreamThread thread : streams.threads) {
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED, (ThreadStateTransitionValidator)StreamThread.State.RUNNING);
        }
        Assert.assertEquals((long)3L, (long)this.streamsStateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)streams.state());
        for (StreamThread thread : streams.threads) {
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED);
        }
        Assert.assertEquals((long)3L, (long)this.streamsStateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)streams.state());
        ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)streams.threads[1], (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED);
        ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)streams.threads[1], (ThreadStateTransitionValidator)StreamThread.State.DEAD, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN);
        Assert.assertEquals((long)3L, (long)this.streamsStateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)streams.state());
        for (StreamThread thread : streams.threads) {
            if (thread == streams.threads[1]) continue;
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.RUNNING, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED);
        }
        Assert.assertEquals((long)4L, (long)this.streamsStateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)streams.state());
        streams.close();
        TestUtils.waitForCondition(() -> this.streamsStateListener.numChanges == 6, (String)"Streams never closed.");
        Assert.assertEquals((Object)KafkaStreams.State.NOT_RUNNING, (Object)streams.state());
    }

    @Test
    public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.setStateListener((KafkaStreams.StateListener)this.streamsStateListener);
        Assert.assertEquals((long)0L, (long)this.streamsStateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.CREATED, (Object)streams.state());
        streams.start();
        TestUtils.waitForCondition(() -> this.streamsStateListener.numChanges == 2, (String)"Streams never started.");
        Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)streams.state());
        for (StreamThread thread : streams.threads) {
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED, (ThreadStateTransitionValidator)StreamThread.State.RUNNING);
        }
        Assert.assertEquals((long)3L, (long)this.streamsStateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)streams.state());
        ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)streams.threads[1], (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED);
        ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)streams.threads[1], (ThreadStateTransitionValidator)StreamThread.State.DEAD, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN);
        Assert.assertEquals((long)3L, (long)this.streamsStateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)streams.state());
        for (StreamThread thread : streams.threads) {
            if (thread == streams.threads[1]) continue;
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED);
            ((StreamThread.StateListener)this.threadStatelistenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.DEAD, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN);
        }
        Assert.assertEquals((long)4L, (long)this.streamsStateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.ERROR, (Object)streams.state());
        streams.close();
        TestUtils.waitForCondition(() -> this.streamsStateListener.numChanges == 6, (String)"Streams never closed.");
        Assert.assertEquals((Object)KafkaStreams.State.NOT_RUNNING, (Object)streams.state());
    }

    @Test
    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
        StreamsBuilder builder = this.getBuilderWithSource();
        builder.globalTable("anyTopic");
        KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.close();
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.NOT_RUNNING, (String)"Streams never stopped.");
        Assert.assertTrue((boolean)this.supplier.consumer.closed());
        Assert.assertTrue((boolean)this.supplier.restoreConsumer.closed());
        for (MockProducer<byte[], byte[]> p : this.supplier.producers) {
            Assert.assertTrue((boolean)p.closed());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStateThreadClose() throws Exception {
        StreamsBuilder builder = this.getBuilderWithSource();
        builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertEquals((long)2L, (long)streams.threads.length);
            Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.CREATED);
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
            for (int i = 0; i < 2; ++i) {
                StreamThread tmpThread = streams.threads[i];
                tmpThread.shutdown();
                TestUtils.waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD, (String)"Thread never stopped.");
                streams.threads[i].join();
            }
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.ERROR, (String)"Streams never stopped.");
        }
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.NOT_RUNNING, (String)"Streams never stopped.");
        Assert.assertNull((Object)streams.globalStreamThread);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStateGlobalThreadClose() throws Exception {
        StreamsBuilder builder = this.getBuilderWithSource();
        builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
            GlobalStreamThread globalStreamThread = streams.globalStreamThread;
            globalStreamThread.shutdown();
            TestUtils.waitForCondition(() -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, (String)"Thread never stopped.");
            globalStreamThread.join();
            Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.ERROR);
        }
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            int newInitCount = MockMetricsReporter.INIT_COUNT.get();
            int initDiff = newInitCount - oldInitCount;
            Assert.assertTrue((String)"some reporters should be initialized by calling on construction", (initDiff > 0 ? 1 : 0) != 0);
            streams.start();
            int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
            streams.close();
            Assert.assertEquals((long)(oldCloseCount + initDiff), (long)MockMetricsReporter.CLOSE_COUNT.get());
        }
    }

    @Test
    public void testCloseIsIdempotent() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.close();
        int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
        streams.close();
        Assert.assertEquals((String)"subsequent close() calls should do nothing", (long)closeCount, (long)MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCannotStartOnceClosed() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.start();
        streams.close();
        try {
            streams.start();
            Assert.fail((String)"Should have throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            streams.close();
        }
    }

    @Test
    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
        streams.start();
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.setGlobalStateRestoreListener(null);
            Assert.fail((String)"Should throw an IllegalStateException");
        }
    }

    @Test
    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.start();
        try {
            streams.setUncaughtExceptionHandler(null);
            Assert.fail((String)"Should throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.start();
        try {
            streams.setStateListener(null);
            Assert.fail((String)"Should throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldAllowCleanupBeforeStartAndAfterClose() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        try {
            streams.cleanUp();
            streams.start();
        }
        finally {
            streams.close();
            streams.cleanUp();
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.start();
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
        try {
            streams.cleanUp();
            Assert.fail((String)"Should have thrown IllegalStateException");
        }
        catch (IllegalStateException expected) {
            Assert.assertEquals((Object)"Cannot clean up while running.", (Object)expected.getMessage());
        }
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.allMetadata();
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.allMetadataForStore("store");
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.queryMetadataForKey("store", (Object)"key", Serdes.String().serializer());
    }

    @Test
    public void shouldGetQueryMetadataWithSerializerWhenRunningOrRebalancing() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.start();
        Assert.assertEquals((Object)KeyQueryMetadata.NOT_AVAILABLE, (Object)streams.queryMetadataForKey("store", (Object)"key", Serdes.String().serializer()));
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.queryMetadataForKey("store", (Object)"key", (topic, key, value, numPartitions) -> 0);
    }

    @Test
    public void shouldReturnEmptyLocalStorePartitionLags() {
        ListOffsetsResult result = (ListOffsetsResult)EasyMock.mock(ListOffsetsResult.class);
        KafkaFutureImpl allFuture = new KafkaFutureImpl();
        allFuture.complete(Collections.emptyMap());
        EasyMock.expect((Object)result.all()).andReturn((Object)allFuture);
        MockAdminClient mockAdminClient = (MockAdminClient)EasyMock.partialMockBuilder(MockAdminClient.class).addMockedMethod("listOffsets", new Class[]{Map.class}).createMock();
        EasyMock.expect((Object)mockAdminClient.listOffsets((Map)EasyMock.anyObject())).andStubReturn((Object)result);
        MockClientSupplier mockClientSupplier = (MockClientSupplier)EasyMock.partialMockBuilder(MockClientSupplier.class).addMockedMethod("getAdmin").createMock();
        EasyMock.expect((Object)mockClientSupplier.getAdmin((Map)EasyMock.anyObject())).andReturn((Object)mockAdminClient);
        EasyMock.replay((Object[])new Object[]{result, mockAdminClient, mockClientSupplier});
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)mockClientSupplier, (Time)this.time);
        streams.start();
        Assert.assertEquals((long)0L, (long)streams.allLocalStorePartitionLags().size());
    }

    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier);){
            Assert.assertFalse((boolean)streams.close(Duration.ofMillis(10L)));
        }
    }

    @Test(expected=IllegalArgumentException.class)
    public void shouldThrowOnNegativeTimeoutForClose() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.close(Duration.ofMillis(-1L));
        }
    }

    @Test
    public void shouldNotBlockInCloseForZeroDuration() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertFalse((boolean)streams.close(Duration.ZERO));
        }
    }

    @Test
    public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
        PowerMock.mockStatic(Executors.class);
        ScheduledExecutorService cleanupSchedule = (ScheduledExecutorService)EasyMock.niceMock(ScheduledExecutorService.class);
        ScheduledExecutorService rocksDBMetricsRecordingTriggerThread = (ScheduledExecutorService)EasyMock.mock(ScheduledExecutorService.class);
        EasyMock.expect((Object)Executors.newSingleThreadScheduledExecutor((ThreadFactory)EasyMock.anyObject(ThreadFactory.class))).andReturn((Object)cleanupSchedule);
        EasyMock.expect((Object)Executors.newSingleThreadScheduledExecutor((ThreadFactory)EasyMock.anyObject(ThreadFactory.class))).andReturn((Object)rocksDBMetricsRecordingTriggerThread);
        EasyMock.expect(rocksDBMetricsRecordingTriggerThread.scheduleAtFixedRate((Runnable)EasyMock.anyObject(RocksDBMetricsRecordingTrigger.class), EasyMock.eq((long)0L), EasyMock.eq((long)1L), (TimeUnit)((Object)EasyMock.eq((Object)((Object)TimeUnit.MINUTES))))).andReturn(null);
        EasyMock.expect(rocksDBMetricsRecordingTriggerThread.shutdownNow()).andReturn(null);
        PowerMock.replay((Object[])new Object[]{Executors.class});
        PowerMock.replay((Object[])new Object[]{rocksDBMetricsRecordingTriggerThread});
        PowerMock.replay((Object[])new Object[]{cleanupSchedule});
        StreamsBuilder builder = new StreamsBuilder();
        builder.table("topic", Materialized.as((String)"store"));
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name());
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
        }
        PowerMock.verify((Object[])new Object[]{Executors.class});
        PowerMock.verify((Object[])new Object[]{rocksDBMetricsRecordingTriggerThread});
    }

    @Test
    public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() {
        PowerMock.mockStatic(Executors.class);
        ScheduledExecutorService cleanupSchedule = (ScheduledExecutorService)EasyMock.niceMock(ScheduledExecutorService.class);
        ScheduledExecutorService rocksDBMetricsRecordingTriggerThread = (ScheduledExecutorService)EasyMock.mock(ScheduledExecutorService.class);
        EasyMock.expect((Object)Executors.newSingleThreadScheduledExecutor((ThreadFactory)EasyMock.anyObject(ThreadFactory.class))).andReturn((Object)cleanupSchedule);
        PowerMock.replay((Object[])new Object[]{Executors.class, rocksDBMetricsRecordingTriggerThread, cleanupSchedule});
        StreamsBuilder builder = new StreamsBuilder();
        builder.table("topic", Materialized.as((String)"store"));
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.name());
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
        }
        PowerMock.verify((Object[])new Object[]{Executors.class, rocksDBMetricsRecordingTriggerThread});
    }

    @Test
    public void shouldWarnAboutRocksDBConfigSetterIsNotGuaranteedToBeBackwardsCompatible() {
        this.props.setProperty("rocksdb.config.setter", MockRocksDbConfigSetter.class.getName());
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister();){
            new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"stream-client [test-client] RocksDB's version will be bumped to version 6+ via KAFKA-8897 in a future release. If you use `org.rocksdb.CompactionOptionsFIFO#setTtl(long)` or `#ttl()` you will need to rewrite your code after KAFKA-8897 is resolved and set TTL via `org.rocksdb.Options` (or `org.rocksdb.ColumnFamilyOptions`)."));
        }
    }

    @Test
    public void shouldCleanupOldStateDirs() throws Exception {
        PowerMock.mockStatic(Executors.class);
        ScheduledExecutorService cleanupSchedule = (ScheduledExecutorService)EasyMock.mock(ScheduledExecutorService.class);
        EasyMock.expect((Object)Executors.newSingleThreadScheduledExecutor((ThreadFactory)EasyMock.anyObject(ThreadFactory.class))).andReturn((Object)cleanupSchedule).anyTimes();
        EasyMock.expect(cleanupSchedule.scheduleAtFixedRate((Runnable)EasyMock.anyObject(Runnable.class), EasyMock.eq((long)1L), EasyMock.eq((long)1L), (TimeUnit)((Object)EasyMock.eq((Object)((Object)TimeUnit.MILLISECONDS))))).andReturn(null);
        EasyMock.expect(cleanupSchedule.shutdownNow()).andReturn(null);
        PowerMock.expectNew(StateDirectory.class, (Object[])new Object[]{EasyMock.anyObject(StreamsConfig.class), EasyMock.anyObject(Time.class), EasyMock.eq((boolean)true)}).andReturn((Object)this.stateDirectory);
        EasyMock.expect((Object)this.stateDirectory.initializeProcessId()).andReturn((Object)UUID.randomUUID());
        this.stateDirectory.close();
        PowerMock.replayAll((Object[])new Object[]{Executors.class, cleanupSchedule, this.stateDirectory});
        this.props.setProperty("state.cleanup.delay.ms", "1");
        StreamsBuilder builder = new StreamsBuilder();
        builder.table("topic", Materialized.as((String)"store"));
        try (KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
        }
        PowerMock.verify((Object[])new Object[]{Executors.class, cleanupSchedule});
    }

    @Test
    public void statelessTopologyShouldNotCreateStateDirectory() throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String inputTopic = safeTestName + "-input";
        String outputTopic = safeTestName + "-output";
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{inputTopic}).addProcessor("process", () -> new Processor<String, String, String, String>(){
            private ProcessorContext context;

            public void init(ProcessorContext<String, String> context) {
                this.context = context;
            }

            public void process(Record<String, String> record) {
                if (((String)record.value()).length() % 2 == 0) {
                    this.context.forward(record.withValue((Object)((String)record.key() + (String)record.value())));
                }
            }
        }, new String[]{"source"}).addSink("sink", outputTopic, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"process"});
        this.startStreamsAndCheckDirExists(topology, false);
    }

    @Test
    public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String inputTopic = safeTestName + "-input";
        String outputTopic = safeTestName + "-output";
        String globalTopicName = safeTestName + "-global";
        String storeName = safeTestName + "-counts";
        String globalStoreName = safeTestName + "-globalStore";
        Topology topology = this.getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, false);
        this.startStreamsAndCheckDirExists(topology, false);
    }

    @Test
    public void statefulTopologyShouldCreateStateDirectory() throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String inputTopic = safeTestName + "-input";
        String outputTopic = safeTestName + "-output";
        String globalTopicName = safeTestName + "-global";
        String storeName = safeTestName + "-counts";
        String globalStoreName = safeTestName + "-globalStore";
        Topology topology = this.getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true);
        this.startStreamsAndCheckDirExists(topology, true);
    }

    @Test
    public void shouldThrowTopologyExceptionOnEmptyTopology() {
        try {
            new KafkaStreams(new StreamsBuilder().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
            Assert.fail((String)"Should have thrown TopologyException");
        }
        catch (TopologyException e) {
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)Matchers.equalTo((Object)"Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table."));
        }
    }

    @Test
    public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable("anyTopic");
        KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        MatcherAssert.assertThat((Object)streams.threads.length, (Matcher)Matchers.equalTo((Object)0));
    }

    @Test
    public void shouldTransitToRunningWithGlobalOnlyTopology() throws InterruptedException {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable("anyTopic");
        KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        MatcherAssert.assertThat((Object)streams.threads.length, (Matcher)Matchers.equalTo((Object)0));
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.CREATED);
        streams.start();
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)("Streams never started, state is " + streams.state()));
        streams.close();
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.NOT_RUNNING, (String)"Streams never stopped.");
    }

    @Deprecated
    private Topology getStatefulTopology(String inputTopic, String outputTopic, String globalTopicName, final String storeName, String globalStoreName, boolean isPersistentStore) {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)(isPersistentStore ? Stores.persistentKeyValueStore((String)storeName) : Stores.inMemoryKeyValueStore((String)storeName)), (Serde)Serdes.String(), (Serde)Serdes.Long());
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{inputTopic}).addProcessor("process", () -> new Processor<String, String, String, String>(){
            private ProcessorContext context;

            public void init(ProcessorContext<String, String> context) {
                this.context = context;
            }

            public void process(Record<String, String> record) {
                KeyValueStore kvStore = (KeyValueStore)this.context.getStateStore(storeName);
                kvStore.put(record.key(), (Object)5L);
                this.context.forward(record.withValue((Object)"5"));
                this.context.commit();
            }
        }, new String[]{"source"}).addStateStore(storeBuilder, new String[]{"process"}).addSink("sink", outputTopic, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"process"});
        StoreBuilder globalStoreBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)(isPersistentStore ? Stores.persistentKeyValueStore((String)globalStoreName) : Stores.inMemoryKeyValueStore((String)globalStoreName)), (Serde)Serdes.String(), (Serde)Serdes.String()).withLoggingDisabled();
        topology.addGlobalStore(globalStoreBuilder, "global", Serdes.String().deserializer(), Serdes.String().deserializer(), globalTopicName, globalTopicName + "-processor", new MockProcessorSupplier());
        return topology;
    }

    private StreamsBuilder getBuilderWithSource() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("source-topic");
        return builder;
    }

    private void startStreamsAndCheckDirExists(Topology topology, boolean shouldFilesExist) throws Exception {
        PowerMock.expectNew(StateDirectory.class, (Object[])new Object[]{EasyMock.anyObject(StreamsConfig.class), EasyMock.anyObject(Time.class), EasyMock.eq((boolean)shouldFilesExist)}).andReturn((Object)this.stateDirectory);
        EasyMock.expect((Object)this.stateDirectory.initializeProcessId()).andReturn((Object)UUID.randomUUID());
        PowerMock.replayAll((Object[])new Object[0]);
        new KafkaStreams(topology, this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        PowerMock.verifyAll();
    }

    public static class StateListenerStub
    implements KafkaStreams.StateListener {
        int numChanges = 0;
        KafkaStreams.State oldState;
        KafkaStreams.State newState;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap<KafkaStreams.State, Long>();

        public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
            long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0L;
            ++this.numChanges;
            this.oldState = oldState;
            this.newState = newState;
            this.mapStates.put(newState, prevCount + 1L);
        }
    }
}

