/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class KafkaStreamsTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final KStreamBuilder builder = new KStreamBuilder();
    private KafkaStreams streams;
    private Properties props;

    @Before
    public void before() {
        this.props = new Properties();
        this.props.setProperty("application.id", "appId");
        this.props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        this.props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        this.props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        this.streams = new KafkaStreams((TopologyBuilder)this.builder, this.props);
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() throws Exception {
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.props);
        int newInitCount = MockMetricsReporter.INIT_COUNT.get();
        int initDiff = newInitCount - oldInitCount;
        Assert.assertTrue((String)"some reporters should be initialized by calling on construction", (initDiff > 0 ? 1 : 0) != 0);
        StateListenerStub stateListener = new StateListenerStub();
        streams.setStateListener((KafkaStreams.StateListener)stateListener);
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.CREATED);
        Assert.assertEquals((long)stateListener.numChanges, (long)0L);
        streams.start();
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.RUNNING);
        Assert.assertEquals((long)stateListener.numChanges, (long)1L);
        Assert.assertEquals((Object)stateListener.oldState, (Object)KafkaStreams.State.CREATED);
        Assert.assertEquals((Object)stateListener.newState, (Object)KafkaStreams.State.RUNNING);
        Assert.assertEquals((long)stateListener.mapStates.get(KafkaStreams.State.RUNNING), (long)1L);
        int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
        streams.close();
        Assert.assertEquals((long)(oldCloseCount + initDiff), (long)MockMetricsReporter.CLOSE_COUNT.get());
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
        Assert.assertEquals((long)stateListener.mapStates.get(KafkaStreams.State.RUNNING), (long)1L);
        Assert.assertEquals((long)stateListener.mapStates.get(KafkaStreams.State.NOT_RUNNING), (long)1L);
    }

    @Test
    public void testCloseIsIdempotent() throws Exception {
        this.streams.close();
        int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
        this.streams.close();
        Assert.assertEquals((String)"subsequent close() calls should do nothing", (long)closeCount, (long)MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test(expected=IllegalStateException.class)
    public void testCannotStartOnceClosed() throws Exception {
        this.streams.start();
        this.streams.close();
        try {
            this.streams.start();
        }
        catch (IllegalStateException e) {
            Assert.assertEquals((Object)"Cannot start again.", (Object)e.getMessage());
            throw e;
        }
        finally {
            this.streams.close();
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testCannotStartTwice() throws Exception {
        this.streams.start();
        try {
            this.streams.start();
        }
        catch (IllegalStateException e) {
            Assert.assertEquals((Object)"Cannot start again.", (Object)e.getMessage());
            throw e;
        }
        finally {
            this.streams.close();
        }
    }

    @Test
    public void testNumberDefaultMetrics() {
        KafkaStreams streams = this.createKafkaStreams();
        Map metrics = streams.metrics();
        Assert.assertEquals((long)metrics.size(), (long)16L);
    }

    @Test(expected=ConfigException.class)
    public void testIllegalMetricsConfig() {
        Properties props = new Properties();
        props.setProperty("application.id", "appId");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        props.setProperty("metrics.recording.level", "illegalConfig");
        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);
    }

    @Test
    public void testLegalMetricsConfig() {
        Properties props = new Properties();
        props.setProperty("application.id", "appId");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
        KStreamBuilder builder1 = new KStreamBuilder();
        KafkaStreams streams1 = new KafkaStreams((TopologyBuilder)builder1, props);
        streams1.close();
        props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.toString());
        KStreamBuilder builder2 = new KStreamBuilder();
        KafkaStreams streams2 = new KafkaStreams((TopologyBuilder)builder2, props);
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
        this.streams.allMetadata();
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
        this.streams.allMetadataForStore("store");
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
        this.streams.metadataForKey("store", (Object)"key", Serdes.String().serializer());
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
        this.streams.metadataForKey("store", (Object)"key", (StreamPartitioner)new StreamPartitioner<String, Object>(){

            public Integer partition(String key, Object value, int numPartitions) {
                return 0;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
        final AtomicBoolean keepRunning = new AtomicBoolean(true);
        try {
            Properties props = new Properties();
            props.setProperty("application.id", "appId");
            props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
            props.setProperty("auto.offset.reset", "earliest");
            KStreamBuilder builder = new KStreamBuilder();
            final CountDownLatch latch = new CountDownLatch(1);
            String topic = "input";
            CLUSTER.createTopic("input");
            builder.stream(Serdes.String(), Serdes.String(), new String[]{"input"}).foreach((ForeachAction)new ForeachAction<String, String>(){

                public void apply(String key, String value) {
                    try {
                        latch.countDown();
                        while (keepRunning.get()) {
                            Thread.sleep(10L);
                        }
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
            });
            KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);
            streams.start();
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp("input", Collections.singletonList(new KeyValue((Object)"A", (Object)"A")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), System.currentTimeMillis());
            Assert.assertTrue((String)"Timed out waiting to receive single message", (boolean)latch.await(30L, TimeUnit.SECONDS));
            Assert.assertFalse((boolean)streams.close(10L, TimeUnit.MILLISECONDS));
        }
        finally {
            keepRunning.set(false);
        }
    }

    private KafkaStreams createKafkaStreams() {
        Properties props = new Properties();
        props.setProperty("application.id", "appId");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KStreamBuilder builder = new KStreamBuilder();
        return new KafkaStreams((TopologyBuilder)builder, props);
    }

    @Test
    public void testCleanup() throws Exception {
        Properties props = new Properties();
        props.setProperty("application.id", "testLocalCleanup");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);
        streams.cleanUp();
        streams.start();
        streams.close();
        streams.cleanUp();
    }

    @Test(expected=IllegalStateException.class)
    public void testCannotCleanupWhileRunning() throws Exception {
        Properties props = new Properties();
        props.setProperty("application.id", "testCannotCleanupWhileRunning");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KStreamBuilder builder = new KStreamBuilder();
        streams.start();
        try (KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);){
            streams.cleanUp();
        }
    }

    public static class StateListenerStub
    implements KafkaStreams.StateListener {
        public int numChanges = 0;
        public KafkaStreams.State oldState;
        public KafkaStreams.State newState;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap<KafkaStreams.State, Long>();

        public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
            long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0L;
            ++this.numChanges;
            this.oldState = oldState;
            this.newState = newState;
            this.mapStates.put(newState, prevCount + 1L);
        }
    }
}

