/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class KafkaStreamsTest {
    private static final int NUM_BROKERS = 1;
    private static final int NUM_THREADS = 2;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final KStreamBuilder builder = new KStreamBuilder();
    private KafkaStreams streams;
    private Properties props;

    @Before
    public void before() {
        this.props = new Properties();
        this.props.setProperty("application.id", "appId");
        this.props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        this.props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        this.props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", (Object)2);
        this.streams = new KafkaStreams((TopologyBuilder)this.builder, this.props);
    }

    @Test
    public void testStateChanges() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        final KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.props);
        StateListenerStub stateListener = new StateListenerStub();
        streams.setStateListener((KafkaStreams.StateListener)stateListener);
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.CREATED);
        Assert.assertEquals((long)stateListener.numChanges, (long)0L);
        streams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return streams.state() == KafkaStreams.State.RUNNING;
            }
        }, (long)10000L, (String)"Streams never started.");
        streams.close();
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
    }

    @Test
    public void testStateCloseAfterCreate() {
        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.props);
        StateListenerStub stateListener = new StateListenerStub();
        streams.setStateListener((KafkaStreams.StateListener)stateListener);
        streams.close();
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
    }

    private void testStateThreadCloseHelper(int numThreads) throws Exception {
        Field threadsField = this.streams.getClass().getDeclaredField("threads");
        threadsField.setAccessible(true);
        StreamThread[] threads = (StreamThread[])threadsField.get(this.streams);
        Assert.assertEquals((long)numThreads, (long)threads.length);
        Assert.assertEquals((Object)this.streams.state(), (Object)KafkaStreams.State.CREATED);
        this.streams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return KafkaStreamsTest.this.streams.state() == KafkaStreams.State.RUNNING;
            }
        }, (long)10000L, (String)"Streams never started.");
        for (int i = 0; i < numThreads; ++i) {
            final StreamThread tmpThread = threads[i];
            tmpThread.close();
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return tmpThread.state() == StreamThread.State.DEAD;
                }
            }, (long)10000L, (String)"Thread never stopped.");
            threads[i].join();
        }
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return KafkaStreamsTest.this.streams.state() == KafkaStreams.State.ERROR;
            }
        }, (long)10000L, (String)"Streams never stopped.");
        this.streams.close();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return KafkaStreamsTest.this.streams.state() == KafkaStreams.State.NOT_RUNNING;
            }
        }, (long)10000L, (String)"Streams never stopped.");
        Field globalThreadField = this.streams.getClass().getDeclaredField("globalStreamThread");
        globalThreadField.setAccessible(true);
        GlobalStreamThread globalStreamThread = (GlobalStreamThread)globalThreadField.get(this.streams);
        Assert.assertEquals((Object)globalStreamThread, null);
    }

    @Test
    public void testStateThreadClose() throws Exception {
        int numThreads = 2;
        KStreamBuilder builder = new KStreamBuilder();
        builder.globalTable("anyTopic", "anyStore");
        this.props.put("num.stream.threads", (Object)2);
        new KafkaStreams((TopologyBuilder)builder, this.props);
        this.testStateThreadCloseHelper(2);
    }

    @Test
    public void testStateGlobalThreadClose() throws Exception {
        int numThreads = 2;
        KStreamBuilder builder = new KStreamBuilder();
        builder.globalTable("anyTopic", "anyStoreName");
        this.props.put("num.stream.threads", (Object)2);
        final KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.props);
        streams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return streams.state() == KafkaStreams.State.RUNNING;
            }
        }, (long)10000L, (String)"Streams never started.");
        Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
        globalThreadField.setAccessible(true);
        final GlobalStreamThread globalStreamThread = (GlobalStreamThread)globalThreadField.get(streams);
        globalStreamThread.close();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return globalStreamThread.state() == GlobalStreamThread.State.DEAD;
            }
        }, (long)10000L, (String)"Thread never stopped.");
        globalStreamThread.join();
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.ERROR);
        streams.close();
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.props);
        int newInitCount = MockMetricsReporter.INIT_COUNT.get();
        int initDiff = newInitCount - oldInitCount;
        Assert.assertTrue((String)"some reporters should be initialized by calling on construction", (initDiff > 0 ? 1 : 0) != 0);
        streams.start();
        int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
        streams.close();
        Assert.assertEquals((long)(oldCloseCount + initDiff), (long)MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCloseIsIdempotent() {
        this.streams.close();
        int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
        this.streams.close();
        Assert.assertEquals((String)"subsequent close() calls should do nothing", (long)closeCount, (long)MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test(expected=IllegalStateException.class)
    public void testCannotStartOnceClosed() {
        this.streams.start();
        this.streams.close();
        try {
            this.streams.start();
        }
        catch (IllegalStateException e) {
            Assert.assertEquals((Object)"Cannot start again.", (Object)e.getMessage());
            throw e;
        }
        finally {
            this.streams.close();
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testCannotStartTwice() {
        this.streams.start();
        try {
            this.streams.start();
        }
        catch (IllegalStateException e) {
            Assert.assertEquals((Object)"Cannot start again.", (Object)e.getMessage());
            throw e;
        }
        finally {
            this.streams.close();
        }
    }

    @Test
    public void testNumberDefaultMetrics() {
        KafkaStreams streams = this.createKafkaStreams();
        Map metrics = streams.metrics();
        Assert.assertEquals((long)metrics.size(), (long)16L);
    }

    @Test(expected=ConfigException.class)
    public void testIllegalMetricsConfig() {
        Properties props = new Properties();
        props.setProperty("application.id", "appId");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        props.setProperty("metrics.recording.level", "illegalConfig");
        KStreamBuilder builder = new KStreamBuilder();
        new KafkaStreams((TopologyBuilder)builder, props);
    }

    @Test
    public void testLegalMetricsConfig() {
        Properties props = new Properties();
        props.setProperty("application.id", "appId");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
        KStreamBuilder builder1 = new KStreamBuilder();
        KafkaStreams streams1 = new KafkaStreams((TopologyBuilder)builder1, props);
        streams1.close();
        props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.toString());
        KStreamBuilder builder2 = new KStreamBuilder();
        new KafkaStreams((TopologyBuilder)builder2, props);
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() {
        this.streams.allMetadata();
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
        this.streams.allMetadataForStore("store");
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
        this.streams.metadataForKey("store", (Object)"key", Serdes.String().serializer());
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
        this.streams.metadataForKey("store", (Object)"key", (StreamPartitioner)new StreamPartitioner<String, Object>(){

            public Integer partition(String key, Object value, int numPartitions) {
                return 0;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
        final AtomicBoolean keepRunning = new AtomicBoolean(true);
        try {
            Properties props = new Properties();
            props.setProperty("application.id", "appId");
            props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
            props.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
            props.setProperty("auto.offset.reset", "earliest");
            KStreamBuilder builder = new KStreamBuilder();
            final CountDownLatch latch = new CountDownLatch(1);
            String topic = "input";
            CLUSTER.createTopic("input");
            builder.stream(Serdes.String(), Serdes.String(), new String[]{"input"}).foreach((ForeachAction)new ForeachAction<String, String>(){

                public void apply(String key, String value) {
                    try {
                        latch.countDown();
                        while (keepRunning.get()) {
                            Thread.sleep(10L);
                        }
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
            });
            KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);
            streams.start();
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp("input", Collections.singletonList(new KeyValue((Object)"A", (Object)"A")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), System.currentTimeMillis());
            Assert.assertTrue((String)"Timed out waiting to receive single message", (boolean)latch.await(30L, TimeUnit.SECONDS));
            Assert.assertFalse((boolean)streams.close(10L, TimeUnit.MILLISECONDS));
        }
        finally {
            keepRunning.set(false);
        }
    }

    private KafkaStreams createKafkaStreams() {
        Properties props = new Properties();
        props.setProperty("application.id", "appId");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        KStreamBuilder builder = new KStreamBuilder();
        return new KafkaStreams((TopologyBuilder)builder, props);
    }

    @Test
    public void testCleanup() {
        Properties props = new Properties();
        props.setProperty("application.id", "testLocalCleanup");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);
        streams.cleanUp();
        streams.start();
        streams.close();
        streams.cleanUp();
    }

    @Test(expected=IllegalStateException.class)
    public void testCannotCleanupWhileRunning() throws Exception {
        Properties props = new Properties();
        props.setProperty("application.id", "testCannotCleanupWhileRunning");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        KStreamBuilder builder = new KStreamBuilder();
        final KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);
        streams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return streams.state() == KafkaStreams.State.RUNNING;
            }
        }, (long)10000L, (String)"Streams never started.");
        try {
            streams.cleanUp();
        }
        catch (IllegalStateException e) {
            Assert.assertEquals((Object)"Cannot clean up while running.", (Object)e.getMessage());
            throw e;
        }
        finally {
            streams.close();
        }
    }

    public static class StateListenerStub
    implements KafkaStreams.StateListener {
        public int numChanges = 0;
        public KafkaStreams.State oldState;
        public KafkaStreams.State newState;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap<KafkaStreams.State, Long>();
        private final boolean closeOnChange;
        private final KafkaStreams streams;

        public StateListenerStub() {
            this.closeOnChange = false;
            this.streams = null;
        }

        public StateListenerStub(boolean closeOnChange, KafkaStreams streams) {
            this.closeOnChange = closeOnChange;
            this.streams = streams;
        }

        public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
            long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0L;
            ++this.numChanges;
            this.oldState = oldState;
            this.newState = newState;
            this.mapStates.put(newState, prevCount + 1L);
            if (this.closeOnChange && (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR)) {
                this.streams.close();
            }
        }
    }
}

