/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamThreadTest {
    private final String clientId = "clientId";
    private final String applicationId = "stream-thread-test";
    private final MockTime mockTime = new MockTime();
    private final Metrics metrics = new Metrics();
    private final MockClientSupplier clientSupplier = new MockClientSupplier();
    private UUID processId = UUID.randomUUID();
    private final KStreamBuilder builder = new KStreamBuilder();
    private final StreamsConfig config = new StreamsConfig((Map)this.configProps(false));
    private final String stateDir = TestUtils.tempDirectory().getPath();
    private final StateDirectory stateDirectory = new StateDirectory("applicationId", this.stateDir, (Time)this.mockTime);
    private final ChangelogReader changelogReader;
    private final TopicPartition t1p1;
    private final TopicPartition t1p2;
    private final TopicPartition t2p1;
    private final TopicPartition t2p2;
    private final TopicPartition t3p1;
    private final TopicPartition t3p2;
    private final List<PartitionInfo> infos;
    private final Cluster metadata;
    private final PartitionAssignor.Subscription subscription;
    private final TaskId task1;
    private final TaskId task2;
    private final TaskId task3;
    private final TaskId task4;
    private static final String TOPIC = "topic";
    private final Set<TopicPartition> task0Assignment;
    private final Set<TopicPartition> task1Assignment;

    public StreamThreadTest() {
        this.changelogReader = new StoreChangelogReader(this.clientSupplier.restoreConsumer);
        this.t1p1 = new TopicPartition("topic1", 1);
        this.t1p2 = new TopicPartition("topic1", 2);
        this.t2p1 = new TopicPartition("topic2", 1);
        this.t2p2 = new TopicPartition("topic2", 2);
        this.t3p1 = new TopicPartition("topic3", 1);
        this.t3p2 = new TopicPartition("topic3", 2);
        this.infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]));
        this.metadata = new Cluster("cluster", Collections.singleton(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
        this.subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"), this.subscriptionUserData());
        this.task1 = new TaskId(0, 1);
        this.task2 = new TaskId(0, 2);
        this.task3 = new TaskId(1, 1);
        this.task4 = new TaskId(1, 2);
        this.task0Assignment = Collections.singleton(new TopicPartition(TOPIC, 0));
        this.task1Assignment = Collections.singleton(new TopicPartition(TOPIC, 1));
    }

    @Before
    public void setUp() throws Exception {
        this.processId = UUID.randomUUID();
    }

    private ByteBuffer subscriptionUserData() {
        UUID uuid = UUID.randomUUID();
        ByteBuffer buf = ByteBuffer.allocate(28);
        buf.putInt(1);
        buf.putLong(uuid.getMostSignificantBits());
        buf.putLong(uuid.getLeastSignificantBits());
        buf.putInt(0);
        buf.putInt(0);
        buf.rewind();
        return buf;
    }

    private Properties configProps(final boolean enableEos) {
        return new Properties(){
            {
                this.setProperty("application.id", "stream-thread-test");
                this.setProperty("bootstrap.servers", "localhost:2171");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("default.timestamp.extractor", MockTimestampExtractor.class.getName());
                this.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
                if (enableEos) {
                    this.setProperty("processing.guarantee", "exactly_once");
                }
            }
        };
    }

    @Test
    public void testPartitionAssignmentChangeForSingleGroup() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        StreamThread thread = this.getStreamThread();
        final HashMap<TaskId, HashSet<TopicPartition>> activeTasks = new HashMap<TaskId, HashSet<TopicPartition>>();
        thread.setPartitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return activeTasks;
            }
        });
        StateListenerStub stateListener = new StateListenerStub();
        thread.setStateListener((StreamThread.StateListener)stateListener);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.CREATED);
        ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
        thread.setState(StreamThread.State.RUNNING);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        List<Object> revokedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.PARTITIONS_REVOKED);
        List<Object> assignedPartitions = Collections.singletonList(this.t1p1);
        HashSet<TopicPartition> expectedGroup1 = new HashSet<TopicPartition>(Collections.singleton(this.t1p1));
        activeTasks.put(new TaskId(0, 1), expectedGroup1);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertEquals((long)stateListener.numChanges, (long)3L);
        Assert.assertEquals((Object)StreamThread.State.PARTITIONS_REVOKED, (Object)stateListener.oldState);
        thread.runOnce(-1L);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.RUNNING);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals((long)1L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        activeTasks.clear();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        Assert.assertFalse((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertEquals((long)0L, (long)thread.tasks().size());
        assignedPartitions = Collections.singletonList(this.t1p2);
        HashSet<TopicPartition> expectedGroup2 = new HashSet<TopicPartition>(Collections.singleton(this.t1p2));
        activeTasks.put(new TaskId(0, 2), expectedGroup2);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task2));
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task2)).partitions());
        Assert.assertEquals((long)1L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        activeTasks.clear();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        assignedPartitions = Arrays.asList(this.t1p1, this.t1p2);
        expectedGroup1 = new HashSet<TopicPartition>(Collections.singleton(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Collections.singleton(this.t1p2));
        activeTasks.put(new TaskId(0, 1), expectedGroup1);
        activeTasks.put(new TaskId(0, 2), expectedGroup2);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task2));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task2)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        assignedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        thread.close();
        Assert.assertTrue((thread.state() == StreamThread.State.PENDING_SHUTDOWN ? 1 : 0) != 0);
    }

    @Test
    public void testPartitionAssignmentChangeForMultipleGroups() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addSource("source3", new String[]{"topic3"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source2", "source3"});
        StreamThread thread = this.getStreamThread();
        final HashMap<TaskId, HashSet<TopicPartition>> activeTasks = new HashMap<TaskId, HashSet<TopicPartition>>();
        thread.setPartitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return activeTasks;
            }
        });
        StateListenerStub stateListener = new StateListenerStub();
        thread.setStateListener((StreamThread.StateListener)stateListener);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.CREATED);
        ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
        thread.setState(StreamThread.State.RUNNING);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        List<Object> revokedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.PARTITIONS_REVOKED);
        List<Object> assignedPartitions = Arrays.asList(this.t2p1, this.t2p2, this.t3p1, this.t3p2);
        HashSet<TopicPartition> expectedGroup1 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        HashSet<TopicPartition> expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p2, this.t3p2));
        activeTasks.put(new TaskId(1, 1), expectedGroup1);
        activeTasks.put(new TaskId(1, 2), expectedGroup2);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task3));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task4));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task3)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task4)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        assignedPartitions = Arrays.asList(this.t1p1, this.t2p1, this.t3p1);
        expectedGroup1 = new HashSet<TopicPartition>(Collections.singleton(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        activeTasks.put(new TaskId(0, 1), expectedGroup1);
        activeTasks.remove(this.task4);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task3));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task3)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        assignedPartitions = Arrays.asList(this.t1p1, this.t2p1, this.t3p1);
        expectedGroup1 = new HashSet<TopicPartition>(Collections.singleton(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task3));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task3)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        assignedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        thread.close();
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.PENDING_SHUTDOWN);
    }

    @Test
    public void testStateChangeStartClose() throws InterruptedException {
        final StreamThread thread = new StreamThread((TopologyBuilder)this.builder, this.config, (KafkaClientSupplier)this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, Time.SYSTEM, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        StateListenerStub stateListener = new StateListenerStub();
        thread.setStateListener((StreamThread.StateListener)stateListener);
        thread.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return thread.state() == StreamThread.State.RUNNING;
            }
        }, (long)10000L, (String)"Thread never started.");
        thread.close();
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.PENDING_SHUTDOWN);
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return thread.state() == StreamThread.State.DEAD;
            }
        }, (long)10000L, (String)"Thread never shut down.");
        thread.close();
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.DEAD);
    }

    @Test
    public void testHandingOverTaskFromOneToAnotherThread() throws InterruptedException {
        this.builder.addStateStore(Stores.create((String)"store").withByteArrayKeys().withByteArrayValues().persistent().build(), new String[0]);
        this.builder.addSource("source", new String[]{TOPIC});
        TopicPartition tp0 = new TopicPartition(TOPIC, 0);
        TopicPartition tp1 = new TopicPartition(TOPIC, 1);
        this.clientSupplier.consumer.assign(Arrays.asList(tp0, tp1));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(tp0, 0L);
        offsets.put(tp1, 0L);
        this.clientSupplier.consumer.updateBeginningOffsets(offsets);
        final StreamThread thread1 = new StreamThread((TopologyBuilder)this.builder, this.config, (KafkaClientSupplier)this.clientSupplier, "stream-thread-test", "clientId1", this.processId, this.metrics, Time.SYSTEM, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        StreamThread thread2 = new StreamThread((TopologyBuilder)this.builder, this.config, (KafkaClientSupplier)this.clientSupplier, "stream-thread-test", "clientId2", this.processId, this.metrics, Time.SYSTEM, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), this.task0Assignment);
        Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), this.task1Assignment);
        HashMap<TaskId, Set<TopicPartition>> thread1Assignment = new HashMap<TaskId, Set<TopicPartition>>(task0);
        HashMap<TaskId, Set<TopicPartition>> thread2Assignment = new HashMap<TaskId, Set<TopicPartition>>(task1);
        thread1.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(thread1Assignment));
        thread2.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(thread2Assignment));
        thread1.setState(StreamThread.State.RUNNING);
        thread2.setState(StreamThread.State.RUNNING);
        thread1.rebalanceListener.onPartitionsRevoked((Collection)Collections.EMPTY_SET);
        thread2.rebalanceListener.onPartitionsRevoked((Collection)Collections.EMPTY_SET);
        thread1.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        thread1.runOnce(-1L);
        thread2.rebalanceListener.onPartitionsAssigned(this.task1Assignment);
        thread2.runOnce(-1L);
        HashSet originalTaskAssignmentThread1 = new HashSet();
        originalTaskAssignmentThread1.addAll(thread1.tasks().keySet());
        HashSet originalTaskAssignmentThread2 = new HashSet();
        originalTaskAssignmentThread2.addAll(thread2.tasks().keySet());
        thread1.rebalanceListener.onPartitionsRevoked(this.task0Assignment);
        thread2.rebalanceListener.onPartitionsRevoked(this.task1Assignment);
        Assert.assertThat((Object)thread1.prevActiveTasks(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread1));
        Assert.assertThat((Object)thread2.prevActiveTasks(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread2));
        thread1Assignment.clear();
        thread1Assignment.putAll(task1);
        thread2Assignment.clear();
        thread2Assignment.putAll(task0);
        Thread runIt = new Thread(new Runnable(){

            @Override
            public void run() {
                thread1.rebalanceListener.onPartitionsAssigned((Collection)StreamThreadTest.this.task1Assignment);
                thread1.runOnce(-1L);
            }
        });
        runIt.start();
        thread2.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        thread2.runOnce(-1L);
        runIt.join();
        Assert.assertThat(thread1.tasks().keySet(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread2));
        Assert.assertThat(thread2.tasks().keySet(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread1));
    }

    @Test
    public void testMetrics() {
        StreamThread thread = new StreamThread((TopologyBuilder)this.builder, this.config, (KafkaClientSupplier)this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        String defaultGroupName = "stream-metrics";
        String defaultPrefix = "thread." + thread.threadClientId();
        Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId());
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".commit-latency"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".poll-latency"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".process-latency"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".punctuate-latency"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".task-created"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".task-closed"));
        Assert.assertNotNull((Object)this.metrics.getSensor(defaultPrefix + ".skipped-records"));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", "stream-metrics", "The average commit time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", "stream-metrics", "The maximum commit time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-metrics", "The average per-second number of commit calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-avg", "stream-metrics", "The average poll time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-max", "stream-metrics", "The maximum poll time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-rate", "stream-metrics", "The average per-second number of record-poll calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-avg", "stream-metrics", "The average process time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-max", "stream-metrics", "The maximum process time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-rate", "stream-metrics", "The average per-second number of process calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-avg", "stream-metrics", "The average punctuate time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-max", "stream-metrics", "The maximum punctuate time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-rate", "stream-metrics", "The average per-second number of punctuate calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-rate", "stream-metrics", "The average per-second number of closed tasks", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records.", defaultTags)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaybeCommit() throws IOException, InterruptedException {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            long commitInterval = 1000L;
            Properties props = this.configProps(false);
            props.setProperty("state.dir", baseDir.getCanonicalPath());
            props.setProperty("commit.interval.ms", Long.toString(1000L));
            StreamsConfig config = new StreamsConfig((Map)props);
            this.builder.addSource("source1", new String[]{"topic1"});
            StreamThread thread = new StreamThread((TopologyBuilder)this.builder, config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory){

                public void maybeCommit(long now) {
                    super.maybeCommit(now);
                }

                protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                    ProcessorTopology topology = this.builder.build(Integer.valueOf(id.topicGroupId));
                    return new TestStreamTask(id, this.applicationId, partitionsForTask, topology, (Consumer<byte[], byte[]>)this.consumer, StreamThreadTest.this.clientSupplier.getProducer(new HashMap<String, Object>()), this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), this.stateDirectory, (ChangelogReader)this.storeChangelogReader);
                }
            };
            this.initPartitionGrouper(config, thread, this.clientSupplier);
            ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
            List revokedPartitions = Collections.emptyList();
            List<TopicPartition> assignedPartitions = Arrays.asList(this.t1p1, this.t1p2);
            thread.setState(StreamThread.State.RUNNING);
            thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
            rebalanceListener.onPartitionsRevoked(revokedPartitions);
            rebalanceListener.onPartitionsAssigned(assignedPartitions);
            thread.runOnce(-1L);
            Assert.assertEquals((long)2L, (long)thread.tasks().size());
            this.mockTime.sleep(990L);
            thread.maybeCommit(this.mockTime.milliseconds());
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertFalse((boolean)((TestStreamTask)task).committed);
            }
            this.mockTime.sleep(11L);
            thread.maybeCommit(this.mockTime.milliseconds());
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertTrue((boolean)((TestStreamTask)task).committed);
                ((TestStreamTask)task).committed = false;
            }
            this.mockTime.sleep(990L);
            thread.maybeCommit(this.mockTime.milliseconds());
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertFalse((boolean)((TestStreamTask)task).committed);
            }
            this.mockTime.sleep(11L);
            thread.maybeCommit(this.mockTime.milliseconds());
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertTrue((boolean)((TestStreamTask)task).committed);
                ((TestStreamTask)task).committed = false;
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    @Test
    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
        this.builder.addSource("source1", new String[]{"someTopic"});
        StreamThread thread = new StreamThread((TopologyBuilder)this.builder, this.config, (KafkaClientSupplier)this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(assignment));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
        Assert.assertEquals((long)1L, (long)this.clientSupplier.producers.size());
        Producer globalProducer = (Producer)this.clientSupplier.producers.get(0);
        Assert.assertSame((Object)globalProducer, (Object)thread.threadProducer);
        for (StreamTask task : thread.tasks().values()) {
            Assert.assertSame((Object)globalProducer, (Object)((RecordCollectorImpl)task.recordCollector()).producer());
        }
        Assert.assertSame(this.clientSupplier.consumer, (Object)thread.consumer);
        Assert.assertSame(this.clientSupplier.restoreConsumer, (Object)thread.restoreConsumer);
    }

    @Test
    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() {
        this.builder.addSource("source1", new String[]{"someTopic"});
        MockClientSupplier clientSupplier = new MockClientSupplier("stream-thread-test");
        StreamThread thread = new StreamThread((TopologyBuilder)this.builder, new StreamsConfig((Map)this.configProps(true)), (KafkaClientSupplier)clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        assignment.put(new TaskId(0, 2), Collections.singleton(new TopicPartition("someTopic", 2)));
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(assignment));
        HashSet assignedPartitions = new HashSet();
        Collections.addAll(assignedPartitions, new TopicPartition("someTopic", 0), new TopicPartition("someTopic", 2));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce(-1L);
        Assert.assertNull((Object)thread.threadProducer);
        Assert.assertEquals((long)thread.tasks().size(), (long)clientSupplier.producers.size());
        Iterator<MockProducer> it = clientSupplier.producers.iterator();
        for (StreamTask task : thread.tasks().values()) {
            Assert.assertSame((Object)it.next(), (Object)((RecordCollectorImpl)task.recordCollector()).producer());
        }
        Assert.assertSame(clientSupplier.consumer, (Object)thread.consumer);
        Assert.assertSame(clientSupplier.restoreConsumer, (Object)thread.restoreConsumer);
    }

    @Test
    public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
        this.builder.addSource("source1", new String[]{"someTopic"});
        MockClientSupplier clientSupplier = new MockClientSupplier("stream-thread-test");
        StreamThread thread = new StreamThread((TopologyBuilder)this.builder, new StreamsConfig((Map)this.configProps(true)), (KafkaClientSupplier)clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(assignment));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
        thread.close();
        thread.run();
        for (StreamTask task : thread.tasks().values()) {
            Assert.assertTrue((boolean)((MockProducer)((RecordCollectorImpl)task.recordCollector()).producer()).closed());
        }
    }

    @Test
    public void shouldCloseThreadProducerOnCloseIfEosDisabled() {
        this.builder.addSource("source1", new String[]{"someTopic"});
        StreamThread thread = new StreamThread((TopologyBuilder)this.builder, this.config, (KafkaClientSupplier)this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(assignment));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
        thread.close();
        thread.run();
        Assert.assertTrue((boolean)((MockProducer)thread.threadProducer).closed());
    }

    @Test
    public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception {
        this.builder.addSource("name", new String[]{TOPIC}).addSink("out", "output", new String[0]);
        StreamThread thread = new StreamThread((TopologyBuilder)this.builder, this.config, (KafkaClientSupplier)this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        thread.setPartitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return Collections.singletonMap(new TaskId(0, 0), Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition(StreamThreadTest.TOPIC, 0)}));
            }
        });
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
    }

    @Test
    public void shouldNotCloseSuspendedTaskswice() throws Exception {
        this.builder.addSource("name", new String[]{TOPIC}).addSink("out", "output", new String[0]);
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition(TOPIC, 0)}), this.builder.build(Integer.valueOf(0)), (Consumer<byte[], byte[]>)this.clientSupplier.consumer, this.clientSupplier.getProducer(new HashMap<String, Object>()), this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), (Time)this.mockTime), this.changelogReader);
        StreamThread thread = new StreamThread((TopologyBuilder)this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                return testStreamTask;
            }
        };
        final HashSet<TopicPartition> activeTasks = new HashSet<TopicPartition>();
        activeTasks.add(new TopicPartition(TOPIC, 0));
        thread.setPartitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return new HashMap<TaskId, Set<TopicPartition>>(){
                    {
                        this.put(new TaskId(0, 0), activeTasks);
                    }
                };
            }
        });
        thread.setState(StreamThread.State.RUNNING);
        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        thread.rebalanceListener.onPartitionsAssigned(activeTasks);
        thread.runOnce(-1L);
        thread.rebalanceListener.onPartitionsRevoked(activeTasks);
        Assert.assertTrue((boolean)testStreamTask.suspended);
        Assert.assertFalse((boolean)testStreamTask.closed);
        activeTasks.clear();
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        Assert.assertTrue((boolean)testStreamTask.closed);
    }

    @Test
    public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"t1"}).groupByKey().count("count-one");
        builder.stream(new String[]{"t2"}).groupByKey().count("count-two");
        StreamThread thread = new StreamThread((TopologyBuilder)builder, this.config, (KafkaClientSupplier)this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0])));
        restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog", 0, null, new Node[0], new Node[0])));
        TopicPartition tp1 = new TopicPartition("stream-thread-test-count-one-changelog", 0);
        TopicPartition tp2 = new TopicPartition("stream-thread-test-count-two-changelog", 0);
        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<TopicPartition, Long>();
        beginningOffsets.put(tp1, 0L);
        beginningOffsets.put(tp2, 0L);
        restoreConsumer.updateBeginningOffsets(beginningOffsets);
        final HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        TopicPartition t1 = new TopicPartition("t1", 0);
        standbyTasks.put(new TaskId(0, 0), Utils.mkSet((Object[])new TopicPartition[]{t1}));
        thread.setPartitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return standbyTasks;
            }
        });
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        thread.runOnce(-1L);
        Assert.assertThat((Object)restoreConsumer.assignment(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TopicPartition[]{tp1})));
        standbyTasks.put(new TaskId(1, 0), Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t2", 0)}));
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        thread.runOnce(-1L);
        Assert.assertThat((Object)restoreConsumer.assignment(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TopicPartition[]{tp1, tp2})));
    }

    @Test
    public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"t1"}).groupByKey().count("count-one");
        builder.stream(new String[]{"t2"}).groupByKey().count("count-two");
        StreamThread thread = new StreamThread((TopologyBuilder)builder, this.config, (KafkaClientSupplier)this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0])));
        restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog", 0, null, new Node[0], new Node[0])));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
        offsets.put(new TopicPartition("stream-thread-test-count-two-changelog", 0), 0L);
        restoreConsumer.updateEndOffsets(offsets);
        restoreConsumer.updateBeginningOffsets(offsets);
        final HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        TopicPartition t1 = new TopicPartition("t1", 0);
        Set partitionsT1 = Utils.mkSet((Object[])new TopicPartition[]{t1});
        standbyTasks.put(new TaskId(0, 0), partitionsT1);
        final HashMap<TaskId, Set> activeTasks = new HashMap<TaskId, Set>();
        TopicPartition t2 = new TopicPartition("t2", 0);
        Set partitionsT2 = Utils.mkSet((Object[])new TopicPartition[]{t2});
        activeTasks.put(new TaskId(1, 0), partitionsT2);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t2, 0L));
        thread.setPartitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return standbyTasks;
            }

            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return activeTasks;
            }
        });
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        this.clientSupplier.consumer.assign((Collection)partitionsT2);
        thread.rebalanceListener.onPartitionsAssigned((Collection)Utils.mkSet((Object[])new TopicPartition[]{t2}));
        thread.runOnce(-1L);
        standbyTasks.clear();
        activeTasks.clear();
        standbyTasks.put(new TaskId(1, 0), Utils.mkSet((Object[])new TopicPartition[]{t2}));
        activeTasks.put(new TaskId(0, 0), Utils.mkSet((Object[])new TopicPartition[]{t1}));
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        this.clientSupplier.consumer.assign((Collection)partitionsT1);
        thread.rebalanceListener.onPartitionsAssigned((Collection)Utils.mkSet((Object[])new TopicPartition[]{t1}));
    }

    @Test
    public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(Pattern.compile("t.*")).to("out");
        final HashMap createdTasks = new HashMap();
        StreamThread thread = new StreamThread((TopologyBuilder)builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                ProcessorTopology topology = this.builder.build(Integer.valueOf(id.topicGroupId));
                TestStreamTask task = new TestStreamTask(id, this.applicationId, partitions, topology, (Consumer<byte[], byte[]>)this.consumer, StreamThreadTest.this.clientSupplier.getProducer(new HashMap<String, Object>()), this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), this.stateDirectory, (ChangelogReader)this.storeChangelogReader);
                createdTasks.put(partitions, task);
                return task;
            }
        };
        final HashMap activeTasks = new HashMap();
        TopicPartition t1 = new TopicPartition("t1", 0);
        HashSet<TopicPartition> task00Partitions = new HashSet<TopicPartition>();
        task00Partitions.add(t1);
        TaskId taskId = new TaskId(0, 0);
        activeTasks.put(taskId, task00Partitions);
        thread.setPartitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return activeTasks;
            }
        });
        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
        Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
        updatedTopicsField.setAccessible(true);
        Set updatedTopics = (Set)updatedTopicsField.get(subscriptionUpdates);
        updatedTopics.add(t1.topic());
        builder.updateSubscriptions(subscriptionUpdates, null);
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
        thread.runOnce(-1L);
        TestStreamTask firstTask = (TestStreamTask)((Object)createdTasks.get(task00Partitions));
        Assert.assertThat((Object)firstTask.id(), (Matcher)CoreMatchers.is((Object)taskId));
        task00Partitions.add(new TopicPartition("t2", 0));
        updatedTopics.add("t2");
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
        Assert.assertTrue((String)"task should have been closed as assignment has changed", (boolean)firstTask.closed);
        Assert.assertTrue((String)"tasks state manager should have been closed as assignment has changed", (boolean)firstTask.closedStateManager);
        Assert.assertThat((Object)((TestStreamTask)((Object)createdTasks.get(task00Partitions))).id(), (Matcher)CoreMatchers.is((Object)taskId));
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
        this.builder.addSource("source", new String[]{TOPIC}).addSink("sink", "dummyTopic", new String[]{"source"});
        MockClientSupplier clientSupplier = new MockClientSupplier("stream-thread-test");
        final StreamThread thread = new StreamThread((TopologyBuilder)this.builder, new StreamsConfig((Map)this.configProps(true)), (KafkaClientSupplier)clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        MockConsumer<byte[], byte[]> consumer = clientSupplier.consumer;
        consumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null)));
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(this.task1, this.task0Assignment);
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        thread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        thread.runOnce(-1L);
        Assert.assertThat((Object)thread.tasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        final MockProducer producer = clientSupplier.producers.get(0);
        consumer.updateBeginningOffsets(Collections.singletonMap(this.task0Assignment.iterator().next(), 0L));
        consumer.unsubscribe();
        consumer.assign(this.task0Assignment);
        consumer.addRecord(new ConsumerRecord(TOPIC, 0, 0L, (Object)new byte[0], (Object)new byte[0]));
        this.mockTime.sleep(this.config.getLong("commit.interval.ms") + 1L);
        thread.runOnce(-1L);
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return producer.history().size() == 1;
            }
        }, (String)"StreamsThread did not produce output record.");
        Assert.assertFalse((boolean)producer.transactionCommitted());
        this.mockTime.sleep(this.config.getLong("commit.interval.ms") + 1L);
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return producer.commitCount() == 1L;
            }
        }, (String)"StreamsThread did not commit transaction.");
        producer.fenceProducer();
        this.mockTime.sleep(this.config.getLong("commit.interval.ms") + 1L);
        consumer.addRecord(new ConsumerRecord(TOPIC, 0, 0L, (Object)new byte[0], (Object)new byte[0]));
        thread.runOnce(-1L);
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return thread.tasks().isEmpty();
            }
        }, (String)"StreamsThread did not remove fenced zombie task.");
        Assert.assertThat((Object)producer.commitCount(), (Matcher)CoreMatchers.equalTo((Object)1L));
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() throws Exception {
        this.builder.addSource("name", new String[]{TOPIC}).addSink("out", "output", new String[0]);
        MockClientSupplier clientSupplier = new MockClientSupplier("stream-thread-test");
        StreamThread thread = new StreamThread((TopologyBuilder)this.builder, new StreamsConfig((Map)this.configProps(true)), (KafkaClientSupplier)clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(this.task1, this.task0Assignment);
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        thread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        thread.runOnce(-1L);
        Assert.assertThat((Object)thread.tasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        thread.rebalanceListener.onPartitionsRevoked(null);
        clientSupplier.producers.get(0).fenceProducer();
        thread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"t1"}).groupByKey();
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", (Collection)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t1", 0)}), builder.build(Integer.valueOf(0)), (Consumer)this.clientSupplier.consumer, (Producer)this.clientSupplier.getProducer(new HashMap<String, Object>()), this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), (Time)this.mockTime), this.changelogReader){

            @Override
            public void close(boolean clean, boolean isZombie) {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamThread thread = new StreamThread((TopologyBuilder)builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                return testStreamTask;
            }
        };
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(testStreamTask.id(), testStreamTask.partitions);
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned((Collection)testStreamTask.partitions);
        thread.close();
        thread.join();
        Assert.assertFalse((String)"task shouldn't have been committed as there was an exception during shutdown", (boolean)testStreamTask.committed);
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown() throws Exception {
        MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("foo", false);
        this.builder.stream(new String[]{"t1"}).groupByKey().count((StateStoreSupplier)new MockStateStoreSupplier(stateStore));
        TopicPartition t1 = new TopicPartition("t1", 0);
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", (Collection)Utils.mkSet((Object[])new TopicPartition[]{t1}), this.builder.build(Integer.valueOf(0)), (Consumer)this.clientSupplier.consumer, (Producer)this.clientSupplier.getProducer(new HashMap<String, Object>()), this.config, (StreamsMetrics)new MockStreamsMetrics(this.metrics), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), (Time)this.mockTime), this.changelogReader){

            public void flushState() {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamThread thread = new StreamThread((TopologyBuilder)this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                return testStreamTask;
            }
        };
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(testStreamTask.id(), testStreamTask.partitions);
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        this.clientSupplier.consumer.assign((Collection)testStreamTask.partitions);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1, 0L));
        thread.rebalanceListener.onPartitionsAssigned((Collection)testStreamTask.partitions);
        thread.runOnce(-1L);
        Assert.assertTrue((boolean)stateStore.isOpen());
        thread.shutdown(true);
        Assert.assertFalse((String)"task shouldn't have been committed as there was an exception during shutdown", (boolean)testStreamTask.committed);
        Assert.assertFalse((boolean)stateStore.isOpen());
    }

    @Test
    public void shouldCaptureCommitFailedExceptionOnTaskSuspension() throws Exception {
        this.builder.stream(new String[]{"t1"});
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", (Collection)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t1", 0)}), this.builder.build(Integer.valueOf(0)), (Consumer)this.clientSupplier.consumer, (Producer)this.clientSupplier.getProducer(new HashMap<String, Object>()), this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), (Time)this.mockTime), this.changelogReader){

            public void suspend() {
                throw new CommitFailedException();
            }
        };
        StreamThread thread = new StreamThread((TopologyBuilder)this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                return testStreamTask;
            }
        };
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(testStreamTask.id(), testStreamTask.partitions);
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned((Collection)testStreamTask.partitions);
        thread.runOnce(-1L);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertFalse((boolean)testStreamTask.committed);
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"t1"}).groupByKey();
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", (Collection)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t1", 0)}), builder.build(Integer.valueOf(0)), (Consumer)this.clientSupplier.consumer, (Producer)this.clientSupplier.getProducer(new HashMap<String, Object>()), this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), (Time)this.mockTime), this.changelogReader){

            public void suspend() {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamThread thread = new StreamThread((TopologyBuilder)builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                return testStreamTask;
            }
        };
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(testStreamTask.id(), testStreamTask.partitions);
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned((Collection)testStreamTask.partitions);
        thread.runOnce(-1L);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertFalse((boolean)testStreamTask.committed);
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhileSuspendingState() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"t1"}).groupByKey();
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", (Collection)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t1", 0)}), builder.build(Integer.valueOf(0)), (Consumer)this.clientSupplier.consumer, (Producer)this.clientSupplier.getProducer(new HashMap<String, Object>()), this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), (Time)this.mockTime), this.changelogReader){

            protected void flushState() {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamThread thread = new StreamThread((TopologyBuilder)builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                return testStreamTask;
            }
        };
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(testStreamTask.id(), testStreamTask.partitions);
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned((Collection)testStreamTask.partitions);
        thread.runOnce(-1L);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertFalse((boolean)testStreamTask.committed);
    }

    @Test
    public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source", Pattern.compile("t.*"));
        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        StreamThread thread = new StreamThread(topologyBuilder, this.config, (KafkaClientSupplier)this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, (Time)this.mockTime, new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        HashMap<String, Object> configurationMap = new HashMap<String, Object>();
        configurationMap.put("__stream.thread.instance__", thread);
        configurationMap.put("num.standby.replicas", 0);
        partitionAssignor.configure(configurationMap);
        thread.setPartitionAssignor(partitionAssignor);
        Field nodeToSourceTopicsField = topologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
        nodeToSourceTopicsField.setAccessible(true);
        Map nodeToSourceTopics = (Map)nodeToSourceTopicsField.get(topologyBuilder);
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        TopicPartition topicPartition1 = new TopicPartition("topic-1", 0);
        TopicPartition topicPartition2 = new TopicPartition("topic-2", 0);
        TopicPartition topicPartition3 = new TopicPartition("topic-3", 0);
        TaskId taskId1 = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 0);
        TaskId taskId3 = new TaskId(0, 0);
        List<TaskId> activeTasks = Utils.mkList((Object[])new TaskId[]{taskId1});
        HashMap standbyTasks = new HashMap();
        AssignmentInfo info = new AssignmentInfo((List)activeTasks, standbyTasks, new HashMap());
        topicPartitions.addAll(Utils.mkList((Object[])new TopicPartition[]{topicPartition1}));
        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
        partitionAssignor.onAssignment(assignment);
        Assert.assertTrue((((List)nodeToSourceTopics.get("source")).size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-1"));
        topicPartitions.clear();
        activeTasks = Arrays.asList(taskId1, taskId2);
        info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap());
        topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2));
        assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
        partitionAssignor.onAssignment(assignment);
        Assert.assertTrue((((List)nodeToSourceTopics.get("source")).size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-1"));
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-2"));
        topicPartitions.clear();
        activeTasks = Arrays.asList(taskId1, taskId2, taskId3);
        info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap());
        topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2, topicPartition3));
        assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
        partitionAssignor.onAssignment(assignment);
        Assert.assertTrue((((List)nodeToSourceTopics.get("source")).size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-1"));
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-2"));
        Assert.assertTrue((boolean)((List)nodeToSourceTopics.get("source")).contains("topic-3"));
    }

    @Test
    public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        StateDirectory stateDirMock = this.mockStateDirInteractions(taskId);
        StreamThread thread = this.setupTest(taskId, stateDirMock);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        EasyMock.verify((Object[])new Object[]{stateDirMock});
    }

    @Test
    public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        StateDirectory stateDirMock = this.mockStateDirInteractions(taskId);
        StreamThread thread = this.setupTest(taskId, stateDirMock);
        thread.close();
        thread.shutdown(true);
        EasyMock.verify((Object[])new Object[]{stateDirMock});
    }

    private StreamThread setupTest(final TaskId taskId, StateDirectory stateDirectory) {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.table(TOPIC, TOPIC);
        final MockClientSupplier clientSupplier = new MockClientSupplier();
        TopicPartition topic = new TopicPartition(TOPIC, 0);
        Set partitions = Utils.mkSet((Object[])new TopicPartition[]{topic});
        Map<TopicPartition, Long> offsets = Collections.singletonMap(topic, 0L);
        clientSupplier.restoreConsumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null)));
        clientSupplier.restoreConsumer.updateBeginningOffsets(offsets);
        clientSupplier.restoreConsumer.updateEndOffsets(offsets);
        clientSupplier.consumer.assign((Collection)partitions);
        clientSupplier.consumer.updateBeginningOffsets(offsets);
        StreamThread thread = new StreamThread((TopologyBuilder)builder, this.config, clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L, stateDirectory){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                return new TestStreamTask(taskId, this.applicationId, (Collection)partitions, this.builder.build(Integer.valueOf(0)), (Consumer)clientSupplier.consumer, (Producer)clientSupplier.getProducer(new HashMap<String, Object>()), this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), this.stateDirectory, (ChangelogReader)this.storeChangelogReader){

                    public void suspend() {
                        throw new RuntimeException("KABOOM!!!");
                    }
                };
            }
        };
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(taskId, partitions);
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
        thread.rebalanceListener.onPartitionsAssigned((Collection)partitions);
        thread.runOnce(-1L);
        return thread;
    }

    @Test
    public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        StateDirectory stateDirMock = this.mockStateDirInteractions(taskId);
        StreamThread thread = this.setupStandbyTest(taskId, stateDirMock);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        EasyMock.verify((Object[])new Object[]{stateDirMock});
    }

    @Test
    public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        StateDirectory stateDirMock = this.mockStateDirInteractions(taskId);
        StreamThread thread = this.setupStandbyTest(taskId, stateDirMock);
        thread.close();
        thread.shutdown(true);
        EasyMock.verify((Object[])new Object[]{stateDirMock});
    }

    private StateDirectory mockStateDirInteractions(TaskId taskId) throws IOException {
        StateDirectory stateDirMock = (StateDirectory)EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect((Object)stateDirMock.lock((TaskId)EasyMock.eq((Object)taskId), EasyMock.anyInt())).andReturn((Object)true);
        EasyMock.expect((Object)stateDirMock.directoryForTask(taskId)).andReturn((Object)new File(this.stateDir));
        EasyMock.expect((Object)stateDirMock.unlock(taskId)).andReturn((Object)true);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{stateDirMock});
        return stateDirMock;
    }

    private StreamThread setupStandbyTest(final TaskId taskId, StateDirectory stateDirectory) {
        String storeName = "store";
        String changelogTopic = "stream-thread-test-store-changelog";
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"topic1"}).groupByKey().count("store");
        final MockClientSupplier clientSupplier = new MockClientSupplier();
        clientSupplier.restoreConsumer.updatePartitions("stream-thread-test-store-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-store-changelog", 0, null, null, null)));
        clientSupplier.restoreConsumer.updateBeginningOffsets((Map)new HashMap<TopicPartition, Long>(){
            {
                this.put(new TopicPartition("stream-thread-test-store-changelog", 0), 0L);
            }
        });
        clientSupplier.restoreConsumer.updateEndOffsets((Map)new HashMap<TopicPartition, Long>(){
            {
                this.put(new TopicPartition("stream-thread-test-store-changelog", 0), 0L);
            }
        });
        StreamThread thread = new StreamThread((TopologyBuilder)builder, this.config, clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L, stateDirectory){

            protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
                return new StandbyTask(taskId, this.applicationId, partitions, this.builder.build(Integer.valueOf(0)), (Consumer)clientSupplier.consumer, (ChangelogReader)new StoreChangelogReader(this.getName(), clientSupplier.restoreConsumer), StreamThreadTest.this.config, (StreamsMetrics)new StreamsMetricsImpl(new Metrics(), "groupName", Collections.emptyMap()), this.stateDirectory){

                    public void suspend() {
                        throw new RuntimeException("KABOOM!!!");
                    }

                    public void commit() {
                        throw new RuntimeException("KABOOM!!!");
                    }
                };
            }
        };
        HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        standbyTasks.put(taskId, Collections.singleton(new TopicPartition(TOPIC, 0)));
        thread.setPartitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(Collections.emptyMap(), standbyTasks));
        thread.setState(StreamThread.State.RUNNING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptySet());
        thread.runOnce(-1L);
        return thread;
    }

    private void initPartitionGrouper(StreamsConfig config, StreamThread thread, MockClientSupplier clientSupplier) {
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId));
        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread.config, clientSupplier.restoreConsumer);
        partitionAssignor.setInternalTopicManager((InternalTopicManager)internalTopicManager);
        Map assignments = partitionAssignor.assign(this.metadata, Collections.singletonMap("client", this.subscription));
        partitionAssignor.onAssignment((PartitionAssignor.Assignment)assignments.get("client"));
    }

    private StreamThread getStreamThread() {
        return new StreamThread((TopologyBuilder)this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, Time.SYSTEM, new StreamsMetadataState((TopologyBuilder)this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                ProcessorTopology topology = this.builder.build(Integer.valueOf(id.topicGroupId));
                return new TestStreamTask(id, this.applicationId, partitionsForTask, topology, (Consumer<byte[], byte[]>)this.consumer, StreamThreadTest.this.clientSupplier.getProducer(new HashMap<String, Object>()), this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), this.stateDirectory, (ChangelogReader)this.storeChangelogReader);
            }
        };
    }

    private static class StateListenerStub
    implements StreamThread.StateListener {
        int numChanges = 0;
        ThreadStateTransitionValidator oldState = null;
        ThreadStateTransitionValidator newState = null;

        private StateListenerStub() {
        }

        public void onChange(Thread thread, ThreadStateTransitionValidator newState, ThreadStateTransitionValidator oldState) {
            ++this.numChanges;
            if (this.newState != null && this.newState != oldState) {
                throw new RuntimeException("State mismatch " + oldState + " different from " + this.newState);
            }
            this.oldState = oldState;
            this.newState = newState;
        }
    }

    private class MockStreamsPartitionAssignor
    extends StreamPartitionAssignor {
        private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;
        private final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment;

        MockStreamsPartitionAssignor(Map<TaskId, Set<TopicPartition>> activeTaskAssignment) {
            this(activeTaskAssignment, Collections.emptyMap());
        }

        MockStreamsPartitionAssignor(Map<TaskId, Set<TopicPartition>> activeTaskAssignment, Map<TaskId, Set<TopicPartition>> standbyTaskAssignment) {
            this.activeTaskAssignment = activeTaskAssignment;
            this.standbyTaskAssignment = standbyTaskAssignment;
        }

        Map<TaskId, Set<TopicPartition>> activeTasks() {
            return this.activeTaskAssignment;
        }

        Map<TaskId, Set<TopicPartition>> standbyTasks() {
            return this.standbyTaskAssignment;
        }

        public void close() {
        }
    }

    private class TestStreamTask
    extends StreamTask {
        boolean committed;
        private boolean suspended;
        private boolean closed;
        private boolean closedStateManager;

        TestStreamTask(TaskId id, String applicationId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, Producer<byte[], byte[]> producer, StreamsConfig config, StreamsMetrics metrics, StateDirectory stateDirectory, ChangelogReader storeChangelogReader) {
            super(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, metrics, stateDirectory, new ThreadCache("", 0L, metrics), (Time)new MockTime(), producer);
            this.committed = false;
        }

        void commit(boolean startNewTransaction) {
            super.commit(startNewTransaction);
            this.committed = true;
        }

        protected void updateOffsetLimits() {
        }

        public void resume() {
            if (!this.suspended || this.closed) {
                throw new IllegalStateException("Should not resume task that is not suspended or already closed.");
            }
            super.resume();
            this.suspended = false;
        }

        void suspend(boolean clean) {
            if (this.suspended || this.closed) {
                throw new IllegalStateException("Should not suspend task that is already suspended or closed.");
            }
            super.suspend(clean);
            this.suspended = true;
        }

        public void close(boolean clean, boolean isZombie) {
            if (this.closed && clean) {
                throw new IllegalStateException("Should not close task that is already closed.");
            }
            super.close(clean, isZombie);
            this.closed = true;
        }

        public void closeSuspended(boolean clean, boolean isZombie, RuntimeException firstException) {
            if (this.closed && clean) {
                throw new IllegalStateException("Should not close task that is not suspended or already closed.");
            }
            super.closeSuspended(clean, isZombie, firstException);
            this.closed = true;
        }

        void closeStateManager(boolean writeCheckpoint) {
            super.closeStateManager(writeCheckpoint);
            this.closedStateManager = true;
        }
    }
}

