/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class GlobalStreamThreadTest {
    private final KStreamBuilder builder = new KStreamBuilder();
    private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private GlobalStreamThread globalStreamThread;

    @Before
    public void before() {
        this.builder.globalTable("foo", "bar");
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("bootstrap.servers", "blah");
        properties.put("application.id", "blah");
        StreamsConfig config = new StreamsConfig(properties);
        this.globalStreamThread = new GlobalStreamThread(this.builder.buildGlobalStateTopology(), config, this.mockConsumer, new StateDirectory("appId", TestUtils.tempDirectory().getPath()), new Metrics(), (Time)new MockTime(), "client");
    }

    @Test
    public void shouldThrowStreamsExceptionOnStartupIfThereIsAnException() throws Exception {
        try {
            this.globalStreamThread.start();
            Assert.fail((String)"Should have thrown StreamsException if start up failed");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
        Assert.assertFalse((boolean)this.globalStreamThread.stillRunning());
    }

    @Test
    public void shouldBeRunningAfterSuccesulStart() throws Exception {
        this.initializeConsumer();
        this.globalStreamThread.start();
        Assert.assertTrue((boolean)this.globalStreamThread.stillRunning());
    }

    @Test(timeout=30000L)
    public void shouldStopRunningWhenClosedByUser() throws Exception {
        this.initializeConsumer();
        this.globalStreamThread.start();
        this.globalStreamThread.close();
        this.globalStreamThread.join();
    }

    @Test
    public void shouldCloseStateStoresOnClose() throws Exception {
        this.initializeConsumer();
        this.globalStreamThread.start();
        StateStore globalStore = (StateStore)this.builder.globalStateStores().get("bar");
        Assert.assertTrue((boolean)globalStore.isOpen());
        this.globalStreamThread.close();
        this.globalStreamThread.join();
        Assert.assertFalse((boolean)globalStore.isOpen());
    }

    private void initializeConsumer() {
        this.mockConsumer.updatePartitions("foo", Collections.singletonList(new PartitionInfo("foo", 0, null, new Node[0], new Node[0])));
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        this.mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        this.mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 0L));
    }
}

