/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.ThreadMetadataProvider;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamThreadTest {
    private final String clientId = "clientId";
    private final String applicationId = "stream-thread-test";
    private final MockTime mockTime = new MockTime();
    private final Metrics metrics = new Metrics();
    private MockClientSupplier clientSupplier = new MockClientSupplier();
    private UUID processId = UUID.randomUUID();
    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
    private InternalTopologyBuilder internalTopologyBuilder;
    private final StreamsConfig config = new StreamsConfig((Map)this.configProps(false));
    private final String stateDir = TestUtils.tempDirectory().getPath();
    private final StateDirectory stateDirectory = new StateDirectory("applicationId", this.stateDir, (Time)this.mockTime);
    private StreamsMetadataState streamsMetadataState;
    private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal();
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(1, 1);
    private final TaskId task4 = new TaskId(1, 2);
    private static final String TOPIC = "topic";
    private final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition("topic", 0));
    private final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition("topic", 1));

    @Before
    public void setUp() {
        this.processId = UUID.randomUUID();
        this.internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(this.internalStreamsBuilder);
        this.internalTopologyBuilder.setApplicationId("stream-thread-test");
        this.streamsMetadataState = new StreamsMetadataState(this.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST);
    }

    private Properties configProps(final boolean enableEos) {
        return new Properties(){
            {
                this.setProperty("application.id", "stream-thread-test");
                this.setProperty("bootstrap.servers", "localhost:2171");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("default.timestamp.extractor", MockTimestampExtractor.class.getName());
                this.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
                if (enableEos) {
                    this.setProperty("processing.guarantee", "exactly_once");
                }
            }
        };
    }

    @Test
    public void testPartitionAssignmentChangeForSingleGroup() {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        StreamThread thread = this.getStreamThread();
        final HashMap<TaskId, HashSet<TopicPartition>> activeTasks = new HashMap<TaskId, HashSet<TopicPartition>>();
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new StreamPartitionAssignor(){

            public Map<TaskId, Set<TopicPartition>> activeTasks() {
                return activeTasks;
            }
        });
        StateListenerStub stateListener = new StateListenerStub();
        thread.setStateListener((StreamThread.StateListener)stateListener);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.CREATED);
        ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
        thread.setState(StreamThread.State.RUNNING);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        List<Object> revokedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.PARTITIONS_REVOKED);
        List<Object> assignedPartitions = Collections.singletonList(this.t1p1);
        HashSet<TopicPartition> expectedGroup1 = new HashSet<TopicPartition>(Collections.singleton(this.t1p1));
        activeTasks.put(new TaskId(0, 1), expectedGroup1);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.RUNNING);
        Assert.assertEquals((long)4L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)StreamThread.State.PARTITIONS_ASSIGNED, (Object)stateListener.oldState);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertEquals(expectedGroup1, (Object)((Task)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals((long)1L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        activeTasks.clear();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        Assert.assertFalse((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertEquals((long)0L, (long)thread.tasks().size());
        assignedPartitions = Collections.singletonList(this.t1p2);
        HashSet<TopicPartition> expectedGroup2 = new HashSet<TopicPartition>(Collections.singleton(this.t1p2));
        activeTasks.put(new TaskId(0, 2), expectedGroup2);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task2));
        Assert.assertEquals(expectedGroup2, (Object)((Task)thread.tasks().get(this.task2)).partitions());
        Assert.assertEquals((long)1L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        activeTasks.clear();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        assignedPartitions = Arrays.asList(this.t1p1, this.t1p2);
        expectedGroup1 = new HashSet<TopicPartition>(Collections.singleton(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Collections.singleton(this.t1p2));
        activeTasks.put(new TaskId(0, 1), expectedGroup1);
        activeTasks.put(new TaskId(0, 2), expectedGroup2);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task2));
        Assert.assertEquals(expectedGroup1, (Object)((Task)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((Task)thread.tasks().get(this.task2)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        assignedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        thread.shutdown();
        Assert.assertTrue((thread.state() == StreamThread.State.PENDING_SHUTDOWN ? 1 : 0) != 0);
    }

    @Test
    public void testPartitionAssignmentChangeForMultipleGroups() {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.internalTopologyBuilder.addSource(null, "source3", null, null, null, new String[]{"topic3"});
        this.internalTopologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source2", "source3"});
        StreamThread thread = this.getStreamThread();
        final HashMap<TaskId, HashSet<TopicPartition>> activeTasks = new HashMap<TaskId, HashSet<TopicPartition>>();
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new StreamPartitionAssignor(){

            public Map<TaskId, Set<TopicPartition>> activeTasks() {
                return activeTasks;
            }
        });
        StateListenerStub stateListener = new StateListenerStub();
        thread.setStateListener((StreamThread.StateListener)stateListener);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.CREATED);
        ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
        thread.setState(StreamThread.State.RUNNING);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        List<Object> revokedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.PARTITIONS_REVOKED);
        List<Object> assignedPartitions = Arrays.asList(this.t2p1, this.t2p2, this.t3p1, this.t3p2);
        HashSet<TopicPartition> expectedGroup1 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        HashSet<TopicPartition> expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p2, this.t3p2));
        activeTasks.put(new TaskId(1, 1), expectedGroup1);
        activeTasks.put(new TaskId(1, 2), expectedGroup2);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task3));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task4));
        Assert.assertEquals(expectedGroup1, (Object)((Task)thread.tasks().get(this.task3)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((Task)thread.tasks().get(this.task4)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        assignedPartitions = Arrays.asList(this.t1p1, this.t2p1, this.t3p1);
        expectedGroup1 = new HashSet<TopicPartition>(Collections.singleton(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        activeTasks.put(new TaskId(0, 1), expectedGroup1);
        activeTasks.put(new TaskId(1, 1), expectedGroup2);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task3));
        Assert.assertEquals(expectedGroup1, (Object)((Task)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((Task)thread.tasks().get(this.task3)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        assignedPartitions = Arrays.asList(this.t1p1, this.t2p1, this.t3p1);
        expectedGroup1 = new HashSet<TopicPartition>(Collections.singleton(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task3));
        Assert.assertEquals(expectedGroup1, (Object)((Task)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((Task)thread.tasks().get(this.task3)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        assignedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        thread.shutdown();
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.PENDING_SHUTDOWN);
    }

    @Test
    public void testStateChangeStartClose() throws InterruptedException {
        final StreamThread thread = this.createStreamThread("clientId", this.config, false);
        StateListenerStub stateListener = new StateListenerStub();
        thread.setStateListener((StreamThread.StateListener)stateListener);
        thread.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return thread.state() == StreamThread.State.RUNNING;
            }
        }, (long)10000L, (String)"Thread never started.");
        thread.shutdown();
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.PENDING_SHUTDOWN);
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return thread.state() == StreamThread.State.DEAD;
            }
        }, (long)10000L, (String)"Thread never shut down.");
        thread.shutdown();
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.DEAD);
    }

    private StreamThread createStreamThread(String clientId, StreamsConfig config, boolean eosEnabled) {
        if (eosEnabled) {
            this.clientSupplier = new MockClientSupplier("stream-thread-test");
        }
        return StreamThread.create((InternalTopologyBuilder)this.internalTopologyBuilder, (StreamsConfig)config, (KafkaClientSupplier)this.clientSupplier, (UUID)this.processId, (String)clientId, (Metrics)this.metrics, (Time)this.mockTime, (StreamsMetadataState)this.streamsMetadataState, (long)0L, (StateDirectory)this.stateDirectory, (StateRestoreListener)new MockStateRestoreListener());
    }

    @Test
    public void testHandingOverTaskFromOneToAnotherThread() throws InterruptedException {
        this.internalTopologyBuilder.addStateStore(Stores.create((String)"store").withByteArrayKeys().withByteArrayValues().persistent().build(), new String[0]);
        this.internalTopologyBuilder.addSource(null, "source", null, null, null, new String[]{TOPIC});
        TopicPartition tp0 = new TopicPartition(TOPIC, 0);
        TopicPartition tp1 = new TopicPartition(TOPIC, 1);
        this.clientSupplier.consumer.assign(Arrays.asList(tp0, tp1));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(tp0, 0L);
        offsets.put(tp1, 0L);
        this.clientSupplier.consumer.updateBeginningOffsets(offsets);
        final StreamThread thread1 = this.createStreamThread("clientId1", this.config, false);
        StreamThread thread2 = this.createStreamThread("clientId2", this.config, false);
        Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), this.task0Assignment);
        Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), this.task1Assignment);
        HashMap<TaskId, Set<TopicPartition>> thread1Assignment = new HashMap<TaskId, Set<TopicPartition>>(task0);
        HashMap<TaskId, Set<TopicPartition>> thread2Assignment = new HashMap<TaskId, Set<TopicPartition>>(task1);
        thread1.setThreadMetadataProvider((ThreadMetadataProvider)new MockStreamsPartitionAssignor(thread1Assignment));
        thread2.setThreadMetadataProvider((ThreadMetadataProvider)new MockStreamsPartitionAssignor(thread2Assignment));
        thread1.setState(StreamThread.State.RUNNING);
        thread2.setState(StreamThread.State.RUNNING);
        thread1.rebalanceListener.onPartitionsRevoked((Collection)Collections.EMPTY_SET);
        thread2.rebalanceListener.onPartitionsRevoked((Collection)Collections.EMPTY_SET);
        thread1.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        thread1.runOnce(-1L);
        thread2.rebalanceListener.onPartitionsAssigned(this.task1Assignment);
        thread2.runOnce(-1L);
        HashSet originalTaskAssignmentThread1 = new HashSet();
        originalTaskAssignmentThread1.addAll(thread1.tasks().keySet());
        HashSet originalTaskAssignmentThread2 = new HashSet();
        originalTaskAssignmentThread2.addAll(thread2.tasks().keySet());
        thread1.rebalanceListener.onPartitionsRevoked(this.task0Assignment);
        thread2.rebalanceListener.onPartitionsRevoked(this.task1Assignment);
        Assert.assertThat((Object)thread1.prevActiveTasks(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread1));
        Assert.assertThat((Object)thread2.prevActiveTasks(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread2));
        thread1Assignment.clear();
        thread1Assignment.putAll(task1);
        thread2Assignment.clear();
        thread2Assignment.putAll(task0);
        Thread runIt = new Thread(new Runnable(){

            @Override
            public void run() {
                thread1.rebalanceListener.onPartitionsAssigned((Collection)StreamThreadTest.this.task1Assignment);
                thread1.runOnce(-1L);
            }
        });
        runIt.start();
        thread2.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        thread2.runOnce(-1L);
        runIt.join();
        Assert.assertThat(thread1.tasks().keySet(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread2));
        Assert.assertThat(thread2.tasks().keySet(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread1));
    }

    @Test
    public void testMetrics() {
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        String defaultGroupName = "stream-metrics";
        String defaultPrefix = "thread." + thread.threadClientId();
        Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId());
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".commit-latency"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".poll-latency"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".process-latency"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".punctuate-latency"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".task-created"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".task-closed"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".skipped-records"));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", "stream-metrics", "The average commit time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", "stream-metrics", "The maximum commit time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-metrics", "The average per-second number of commit calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-avg", "stream-metrics", "The average poll time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-max", "stream-metrics", "The maximum poll time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-rate", "stream-metrics", "The average per-second number of record-poll calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-avg", "stream-metrics", "The average process time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-max", "stream-metrics", "The maximum process time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-rate", "stream-metrics", "The average per-second number of process calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-avg", "stream-metrics", "The average punctuate time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-max", "stream-metrics", "The maximum punctuate time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-rate", "stream-metrics", "The average per-second number of punctuate calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-rate", "stream-metrics", "The average per-second number of closed tasks", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records.", defaultTags)));
    }

    @Test
    public void shouldNotCommitBeforeTheCommitInterval() {
        long commitInterval = 1000L;
        Properties props = this.configProps(false);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig config = new StreamsConfig((Map)props);
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = this.mockTaskManagerCommit((Consumer<byte[], byte[]>)consumer, 1, 1);
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "", "", Collections.emptyMap());
        StreamThread thread = new StreamThread(this.internalTopologyBuilder, "clientId", "", config, this.processId, (Time)this.mockTime, this.streamsMetadataState, taskManager, streamsMetrics, (KafkaClientSupplier)this.clientSupplier, consumer, this.stateDirectory);
        thread.maybeCommit(this.mockTime.milliseconds());
        this.mockTime.sleep(990L);
        thread.maybeCommit(this.mockTime.milliseconds());
        EasyMock.verify((Object[])new Object[]{taskManager});
    }

    @Test
    public void shouldNotCauseExceptionIfNothingCommited() {
        long commitInterval = 1000L;
        Properties props = this.configProps(false);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig config = new StreamsConfig((Map)props);
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = this.mockTaskManagerCommit((Consumer<byte[], byte[]>)consumer, 1, 0);
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "", "", Collections.emptyMap());
        StreamThread thread = new StreamThread(this.internalTopologyBuilder, "clientId", "", config, this.processId, (Time)this.mockTime, this.streamsMetadataState, taskManager, streamsMetrics, (KafkaClientSupplier)this.clientSupplier, consumer, this.stateDirectory);
        thread.maybeCommit(this.mockTime.milliseconds());
        this.mockTime.sleep(990L);
        thread.maybeCommit(this.mockTime.milliseconds());
        EasyMock.verify((Object[])new Object[]{taskManager});
    }

    @Test
    public void shouldCommitAfterTheCommitInterval() {
        long commitInterval = 1000L;
        Properties props = this.configProps(false);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig config = new StreamsConfig((Map)props);
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = this.mockTaskManagerCommit((Consumer<byte[], byte[]>)consumer, 2, 1);
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "", "", Collections.emptyMap());
        StreamThread thread = new StreamThread(this.internalTopologyBuilder, "clientId", "", config, this.processId, (Time)this.mockTime, this.streamsMetadataState, taskManager, streamsMetrics, (KafkaClientSupplier)this.clientSupplier, consumer, this.stateDirectory);
        thread.maybeCommit(this.mockTime.milliseconds());
        this.mockTime.sleep(1001L);
        thread.maybeCommit(this.mockTime.milliseconds());
        EasyMock.verify((Object[])new Object[]{taskManager});
    }

    private TaskManager mockTaskManagerCommit(Consumer<byte[], byte[]> consumer, int numberOfCommits, int commits) {
        TaskManager taskManager = (TaskManager)EasyMock.createMock(TaskManager.class);
        taskManager.setConsumer((Consumer)EasyMock.anyObject(Consumer.class));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)taskManager.commitAll()).andReturn((Object)commits).times(numberOfCommits);
        EasyMock.replay((Object[])new Object[]{taskManager, consumer});
        return taskManager;
    }

    @Test
    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() throws InterruptedException {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"someTopic"});
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new MockStreamsPartitionAssignor(assignment));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
        Assert.assertEquals((long)1L, (long)this.clientSupplier.producers.size());
        Producer globalProducer = (Producer)this.clientSupplier.producers.get(0);
        for (Task task : thread.tasks().values()) {
            Assert.assertSame((Object)globalProducer, (Object)((RecordCollectorImpl)((StreamTask)task).recordCollector()).producer());
        }
        Assert.assertSame(this.clientSupplier.consumer, (Object)thread.consumer);
        Assert.assertSame(this.clientSupplier.restoreConsumer, (Object)thread.restoreConsumer);
    }

    @Test
    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() throws InterruptedException {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"someTopic"});
        StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)this.configProps(true)), true);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        assignment.put(new TaskId(0, 2), Collections.singleton(new TopicPartition("someTopic", 2)));
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new MockStreamsPartitionAssignor(assignment));
        HashSet assignedPartitions = new HashSet();
        Collections.addAll(assignedPartitions, new TopicPartition("someTopic", 0), new TopicPartition("someTopic", 2));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertEquals((long)thread.tasks().size(), (long)this.clientSupplier.producers.size());
        Iterator<MockProducer> it = this.clientSupplier.producers.iterator();
        for (Task task : thread.tasks().values()) {
            Assert.assertSame((Object)it.next(), (Object)((RecordCollectorImpl)((StreamTask)task).recordCollector()).producer());
        }
        Assert.assertSame(this.clientSupplier.consumer, (Object)thread.consumer);
        Assert.assertSame(this.clientSupplier.restoreConsumer, (Object)thread.restoreConsumer);
    }

    @Test
    public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"someTopic"});
        StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)this.configProps(true)), true);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new MockStreamsPartitionAssignor(assignment));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
        thread.shutdown();
        thread.run();
        for (Task task : thread.tasks().values()) {
            Assert.assertTrue((boolean)((MockProducer)((RecordCollectorImpl)((StreamTask)task).recordCollector()).producer()).closed());
        }
    }

    @Test
    public void shouldShutdownTaskManagerOnClose() throws InterruptedException {
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager)EasyMock.createNiceMock(TaskManager.class);
        taskManager.setConsumer((Consumer)EasyMock.anyObject(Consumer.class));
        EasyMock.expectLastCall();
        taskManager.shutdown(true);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{taskManager, consumer});
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "", "", Collections.emptyMap());
        StreamThread thread = new StreamThread(this.internalTopologyBuilder, "clientId", "", this.config, this.processId, (Time)this.mockTime, this.streamsMetadataState, taskManager, streamsMetrics, (KafkaClientSupplier)this.clientSupplier, consumer, this.stateDirectory);
        thread.setState(StreamThread.State.RUNNING);
        thread.shutdown();
        thread.run();
        EasyMock.verify((Object[])new Object[]{taskManager});
    }

    @Test
    public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws InterruptedException {
        this.internalTopologyBuilder.addSource(null, "name", null, null, null, new String[]{TOPIC});
        this.internalTopologyBuilder.addSink("out", "output", null, null, null, new String[0]);
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new StreamPartitionAssignor(){

            public Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return Collections.singletonMap(new TaskId(0, 0), Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition(StreamThreadTest.TOPIC, 0)}));
            }
        });
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
    }

    @Test
    public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() {
        this.internalStreamsBuilder.stream(Collections.singleton("t1"), this.consumed).groupByKey().count("count-one");
        this.internalStreamsBuilder.stream(Collections.singleton("t2"), this.consumed).groupByKey().count("count-two");
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0])));
        restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog", 0, null, new Node[0], new Node[0])));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
        offsets.put(new TopicPartition("stream-thread-test-count-two-changelog", 0), 0L);
        restoreConsumer.updateEndOffsets(offsets);
        restoreConsumer.updateBeginningOffsets(offsets);
        final HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        TopicPartition t1 = new TopicPartition("t1", 0);
        Set partitionsT1 = Utils.mkSet((Object[])new TopicPartition[]{t1});
        standbyTasks.put(new TaskId(0, 0), partitionsT1);
        final HashMap<TaskId, Set> activeTasks = new HashMap<TaskId, Set>();
        TopicPartition t2 = new TopicPartition("t2", 0);
        Set partitionsT2 = Utils.mkSet((Object[])new TopicPartition[]{t2});
        activeTasks.put(new TaskId(1, 0), partitionsT2);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t2, 0L));
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new StreamPartitionAssignor(){

            public Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return standbyTasks;
            }

            public Map<TaskId, Set<TopicPartition>> activeTasks() {
                return activeTasks;
            }
        });
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        this.clientSupplier.consumer.assign((Collection)partitionsT2);
        thread.rebalanceListener.onPartitionsAssigned((Collection)Utils.mkSet((Object[])new TopicPartition[]{t2}));
        thread.runOnce(-1L);
        standbyTasks.clear();
        activeTasks.clear();
        standbyTasks.put(new TaskId(1, 0), Utils.mkSet((Object[])new TopicPartition[]{t2}));
        activeTasks.put(new TaskId(0, 0), Utils.mkSet((Object[])new TopicPartition[]{t1}));
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        this.clientSupplier.consumer.assign((Collection)partitionsT1);
        thread.rebalanceListener.onPartitionsAssigned((Collection)Utils.mkSet((Object[])new TopicPartition[]{t1}));
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws InterruptedException {
        this.internalTopologyBuilder.addSource(null, "source", null, null, null, new String[]{TOPIC});
        this.internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, new String[]{"source"});
        final StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)this.configProps(true)), true);
        MockConsumer<byte[], byte[]> consumer = this.clientSupplier.consumer;
        consumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null)));
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(this.task1, this.task0Assignment);
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new MockStreamsPartitionAssignor(activeTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        thread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        thread.runOnce(-1L);
        Assert.assertThat((Object)thread.tasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        final MockProducer producer = this.clientSupplier.producers.get(0);
        consumer.updateBeginningOffsets(Collections.singletonMap(this.task0Assignment.iterator().next(), 0L));
        consumer.unsubscribe();
        consumer.assign(this.task0Assignment);
        consumer.addRecord(new ConsumerRecord(TOPIC, 0, 0L, (Object)new byte[0], (Object)new byte[0]));
        this.mockTime.sleep(this.config.getLong("commit.interval.ms") + 1L);
        thread.runOnce(-1L);
        Assert.assertThat((Object)producer.history().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        Assert.assertFalse((boolean)producer.transactionCommitted());
        this.mockTime.sleep(this.config.getLong("commit.interval.ms") + 1L);
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return producer.commitCount() == 2L;
            }
        }, (String)"StreamsThread did not commit transaction.");
        producer.fenceProducer();
        this.mockTime.sleep(this.config.getLong("commit.interval.ms") + 1L);
        consumer.addRecord(new ConsumerRecord(TOPIC, 0, 0L, (Object)new byte[0], (Object)new byte[0]));
        try {
            thread.runOnce(-1L);
            Assert.fail((String)"Should have thrown TaskMigratedException");
        }
        catch (TaskMigratedException expected) {
            // empty catch block
        }
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return thread.tasks().isEmpty();
            }
        }, (String)"StreamsThread did not remove fenced zombie task.");
        Assert.assertThat((Object)producer.commitCount(), (Matcher)CoreMatchers.equalTo((Object)2L));
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() {
        this.internalTopologyBuilder.addSource(null, "name", null, null, null, new String[]{TOPIC});
        this.internalTopologyBuilder.addSink("out", "output", null, null, null, new String[0]);
        StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)this.configProps(true)), true);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(this.task1, this.task0Assignment);
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new MockStreamsPartitionAssignor(activeTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        thread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        thread.runOnce(-1L);
        Assert.assertThat((Object)thread.tasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        thread.rebalanceListener.onPartitionsRevoked(null);
        this.clientSupplier.producers.get(0).fenceProducer();
        thread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        try {
            thread.runOnce(-1L);
            Assert.fail((String)"Should have thrown TaskMigratedException");
        }
        catch (TaskMigratedException taskMigratedException) {
            // empty catch block
        }
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
    }

    @Test
    public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception {
        this.internalTopologyBuilder.addSource(null, "source", null, null, null, Pattern.compile("t.*"));
        this.internalTopologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        HashMap<String, Object> configurationMap = new HashMap<String, Object>();
        configurationMap.put("__stream.thread.instance__", thread);
        configurationMap.put("num.standby.replicas", 0);
        partitionAssignor.configure(configurationMap);
        thread.setThreadMetadataProvider((ThreadMetadataProvider)partitionAssignor);
        Field nodeToSourceTopicsField = this.internalTopologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
        nodeToSourceTopicsField.setAccessible(true);
        Map nodeToSourceTopics = (Map)nodeToSourceTopicsField.get(this.internalTopologyBuilder);
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        TopicPartition topicPartition1 = new TopicPartition("topic-1", 0);
        TopicPartition topicPartition2 = new TopicPartition("topic-2", 0);
        TopicPartition topicPartition3 = new TopicPartition("topic-3", 0);
        TaskId taskId1 = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 0);
        TaskId taskId3 = new TaskId(0, 0);
        List<TaskId> activeTasks = Utils.mkList((Object[])new TaskId[]{taskId1});
        HashMap standbyTasks = new HashMap();
        AssignmentInfo info = new AssignmentInfo((List)activeTasks, standbyTasks, new HashMap());
        topicPartitions.addAll(Utils.mkList((Object[])new TopicPartition[]{topicPartition1}));
        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
        partitionAssignor.onAssignment(assignment);
        Assert.assertTrue((((List)nodeToSourceTopics.get("source")).size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-1"));
        topicPartitions.clear();
        activeTasks = Arrays.asList(taskId1, taskId2);
        info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap());
        topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2));
        assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
        partitionAssignor.onAssignment(assignment);
        Assert.assertTrue((((List)nodeToSourceTopics.get("source")).size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-1"));
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-2"));
        topicPartitions.clear();
        activeTasks = Arrays.asList(taskId1, taskId2, taskId3);
        info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap());
        topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2, topicPartition3));
        assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
        partitionAssignor.onAssignment(assignment);
        Assert.assertTrue((((List)nodeToSourceTopics.get("source")).size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-1"));
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-2"));
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-3"));
    }

    private StreamThread getStreamThread() {
        return this.createStreamThread("clientId", this.config, false);
    }

    @Test
    public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException {
        this.internalTopologyBuilder.addSource(null, "source", null, null, null, new String[]{TOPIC});
        TaskId taskId = new TaskId(0, 0);
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.put(taskId, this.task0Assignment);
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new MockStreamsPartitionAssignor(assignment));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        thread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        thread.runOnce(-1L);
        ThreadMetadata threadMetadata = thread.threadMetadata();
        Assert.assertEquals((Object)StreamThread.State.RUNNING.name(), (Object)threadMetadata.threadState());
        Assert.assertTrue((boolean)threadMetadata.activeTasks().contains(new TaskMetadata(taskId.toString(), this.task0Assignment)));
        Assert.assertTrue((boolean)threadMetadata.standbyTasks().isEmpty());
    }

    @Test
    public void shouldReturnStandbyTaskMetadataWhileRunningState() throws InterruptedException {
        this.internalStreamsBuilder.stream(Collections.singleton("t1"), this.consumed).groupByKey().count("count-one");
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0])));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
        restoreConsumer.updateEndOffsets(offsets);
        restoreConsumer.updateBeginningOffsets(offsets);
        TaskId taskId = new TaskId(0, 0);
        HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        TopicPartition t1 = new TopicPartition("t1", 0);
        Set partitionsT1 = Utils.mkSet((Object[])new TopicPartition[]{t1});
        standbyTasks.put(taskId, partitionsT1);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new MockStreamsPartitionAssignor(activeTasks, standbyTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(this.task0Assignment);
        thread.rebalanceListener.onPartitionsAssigned(null);
        thread.runOnce(-1L);
        ThreadMetadata threadMetadata = thread.threadMetadata();
        Assert.assertEquals((Object)StreamThread.State.RUNNING.name(), (Object)threadMetadata.threadState());
        Assert.assertTrue((boolean)threadMetadata.standbyTasks().contains(new TaskMetadata(taskId.toString(), partitionsT1)));
        Assert.assertTrue((boolean)threadMetadata.activeTasks().isEmpty());
    }

    @Test
    public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException {
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        ThreadMetadata metadata = thread.threadMetadata();
        Assert.assertEquals((Object)StreamThread.State.CREATED.name(), (Object)metadata.threadState());
        thread.setState(StreamThread.State.RUNNING);
        metadata = thread.threadMetadata();
        Assert.assertEquals((Object)StreamThread.State.RUNNING.name(), (Object)metadata.threadState());
    }

    @Test
    public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() throws InterruptedException {
        this.internalStreamsBuilder.stream(Collections.singleton("t1"), this.consumed).groupByKey().count("count-one");
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Utils.mkList((Object[])new PartitionInfo[]{new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0]), new PartitionInfo("stream-thread-test-count-one-changelog", 1, null, new Node[0], new Node[0])}));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
        restoreConsumer.updateEndOffsets(offsets);
        restoreConsumer.updateBeginningOffsets(offsets);
        final HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        TopicPartition t1p0 = new TopicPartition("t1", 0);
        Set partitionsT1P0 = Utils.mkSet((Object[])new TopicPartition[]{t1p0});
        standbyTasks.put(new TaskId(0, 0), partitionsT1P0);
        final HashMap<TaskId, Set> activeTasks = new HashMap<TaskId, Set>();
        TopicPartition t1p1 = new TopicPartition("t1", 1);
        Set partitionsT1P1 = Utils.mkSet((Object[])new TopicPartition[]{t1p1});
        activeTasks.put(new TaskId(0, 1), partitionsT1P1);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
        thread.setThreadMetadataProvider((ThreadMetadataProvider)new StreamPartitionAssignor(){

            public Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return standbyTasks;
            }

            public Map<TaskId, Set<TopicPartition>> activeTasks() {
                return activeTasks;
            }
        });
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked((Collection)partitionsT1P0);
        this.assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED);
        this.clientSupplier.consumer.assign((Collection)partitionsT1P1);
        thread.rebalanceListener.onPartitionsAssigned((Collection)partitionsT1P1);
        this.assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED);
        thread.runOnce(-1L);
        standbyTasks.clear();
        activeTasks.clear();
        standbyTasks.put(new TaskId(0, 1), Utils.mkSet((Object[])new TopicPartition[]{t1p1}));
        activeTasks.put(new TaskId(0, 0), Utils.mkSet((Object[])new TopicPartition[]{t1p0}));
        Assert.assertFalse((boolean)thread.threadMetadata().activeTasks().isEmpty());
        Assert.assertFalse((boolean)thread.threadMetadata().standbyTasks().isEmpty());
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        this.assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED);
    }

    private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata metadata, StreamThread.State state) {
        Assert.assertEquals((Object)state.name(), (Object)metadata.threadState());
        Assert.assertTrue((boolean)metadata.activeTasks().isEmpty());
        Assert.assertTrue((boolean)metadata.standbyTasks().isEmpty());
    }

    private static class StateListenerStub
    implements StreamThread.StateListener {
        int numChanges = 0;
        ThreadStateTransitionValidator oldState = null;
        ThreadStateTransitionValidator newState = null;

        private StateListenerStub() {
        }

        public void onChange(Thread thread, ThreadStateTransitionValidator newState, ThreadStateTransitionValidator oldState) {
            ++this.numChanges;
            if (this.newState != null && this.newState != oldState) {
                throw new RuntimeException("State mismatch " + oldState + " different from " + this.newState);
            }
            this.oldState = oldState;
            this.newState = newState;
        }
    }

    private class MockStreamsPartitionAssignor
    extends StreamPartitionAssignor {
        private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;
        private final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment;

        MockStreamsPartitionAssignor(Map<TaskId, Set<TopicPartition>> activeTaskAssignment) {
            this(activeTaskAssignment, Collections.emptyMap());
        }

        MockStreamsPartitionAssignor(Map<TaskId, Set<TopicPartition>> activeTaskAssignment, Map<TaskId, Set<TopicPartition>> standbyTaskAssignment) {
            this.activeTaskAssignment = activeTaskAssignment;
            this.standbyTaskAssignment = standbyTaskAssignment;
        }

        public Map<TaskId, Set<TopicPartition>> activeTasks() {
            return this.activeTaskAssignment;
        }

        public Map<TaskId, Set<TopicPartition>> standbyTasks() {
            return this.standbyTaskAssignment;
        }

        public void close() {
        }
    }
}

