/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.AssignedStandbyTasks;
import org.apache.kafka.streams.processor.internals.AssignedStreamsTasks;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.MockChangelogReader;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

public class StreamThreadTest {
    private final String clientId = "clientId";
    private final String applicationId = "stream-thread-test";
    private final int threadIdx = 1;
    private final MockTime mockTime = new MockTime();
    private final Metrics metrics = new Metrics();
    private final MockClientSupplier clientSupplier = new MockClientSupplier();
    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
    private final StreamsConfig config = new StreamsConfig((Map)this.configProps(false));
    private final String stateDir = TestUtils.tempDirectory().getPath();
    private final StateDirectory stateDirectory = new StateDirectory(this.config, (Time)this.mockTime, true);
    private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal();
    private UUID processId = UUID.randomUUID();
    private InternalTopologyBuilder internalTopologyBuilder;
    private StreamsMetadataState streamsMetadataState;
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(1, 1);

    @Before
    public void setUp() {
        this.processId = UUID.randomUUID();
        this.internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(this.internalStreamsBuilder);
        this.internalTopologyBuilder.setApplicationId("stream-thread-test");
        this.streamsMetadataState = new StreamsMetadataState(this.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST);
    }

    private Properties configProps(boolean enableEoS) {
        return Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"stream-thread-test"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"buffered.records.per.partition", (Object)"3"), Utils.mkEntry((Object)"default.timestamp.extractor", (Object)MockTimestampExtractor.class.getName()), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getAbsolutePath()), Utils.mkEntry((Object)"processing.guarantee", (Object)(enableEoS ? "exactly_once" : "at_least_once"))}));
    }

    @Test
    public void testPartitionAssignmentChangeForSingleGroup() {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        StateListenerStub stateListener = new StateListenerStub();
        thread.setStateListener((StreamThread.StateListener)stateListener);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.CREATED);
        ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
        thread.setState(StreamThread.State.STARTING);
        List revokedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.PARTITIONS_REVOKED);
        List<TopicPartition> assignedPartitions = Collections.singletonList(this.t1p1);
        thread.taskManager().setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce();
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.RUNNING);
        Assert.assertEquals((long)4L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)StreamThread.State.PARTITIONS_ASSIGNED, (Object)stateListener.oldState);
        thread.shutdown();
        Assert.assertSame((Object)StreamThread.State.PENDING_SHUTDOWN, (Object)thread.state());
    }

    @Test
    public void testStateChangeStartClose() throws Exception {
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        StateListenerStub stateListener = new StateListenerStub();
        thread.setStateListener((StreamThread.StateListener)stateListener);
        thread.start();
        TestUtils.waitForCondition(() -> thread.state() == StreamThread.State.STARTING, (long)10000L, (String)"Thread never started.");
        thread.shutdown();
        TestUtils.waitForCondition(() -> thread.state() == StreamThread.State.DEAD, (long)10000L, (String)"Thread never shut down.");
        thread.shutdown();
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.DEAD);
    }

    private Cluster createCluster() {
        Node node = new Node(0, "localhost", 8121);
        return new Cluster("mockClusterId", Collections.singletonList(node), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), node);
    }

    private StreamThread createStreamThread(String clientId, StreamsConfig config, boolean eosEnabled) {
        if (eosEnabled) {
            this.clientSupplier.setApplicationIdForProducer("stream-thread-test");
        }
        this.clientSupplier.setClusterForAdminClient(this.createCluster());
        return StreamThread.create((InternalTopologyBuilder)this.internalTopologyBuilder, (StreamsConfig)config, (KafkaClientSupplier)this.clientSupplier, (AdminClient)this.clientSupplier.getAdminClient(config.getAdminConfigs(clientId)), (UUID)this.processId, (String)clientId, (Metrics)this.metrics, (Time)this.mockTime, (StreamsMetadataState)this.streamsMetadataState, (long)0L, (StateDirectory)this.stateDirectory, (StateRestoreListener)new MockStateRestoreListener(), (int)1);
    }

    @Test
    public void testMetricsCreatedAtStartup() {
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        String defaultGroupName = "stream-metrics";
        Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.getName());
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", "stream-metrics", "The average commit time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", "stream-metrics", "The maximum commit time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-metrics", "The average per-second number of commit calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-total", "stream-metrics", "The total number of commit calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-avg", "stream-metrics", "The average poll time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-max", "stream-metrics", "The maximum poll time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-rate", "stream-metrics", "The average per-second number of record-poll calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-total", "stream-metrics", "The total number of record-poll calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-avg", "stream-metrics", "The average process time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-max", "stream-metrics", "The maximum process time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-rate", "stream-metrics", "The average per-second number of process calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-total", "stream-metrics", "The total number of process calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-avg", "stream-metrics", "The average punctuate time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-max", "stream-metrics", "The maximum punctuate time in ms", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-rate", "stream-metrics", "The average per-second number of punctuate calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-total", "stream-metrics", "The total number of punctuate calls", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-rate", "stream-metrics", "The average per-second number of closed tasks", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-total", "stream-metrics", "The total number of closed tasks", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records.", defaultTags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-total", "stream-metrics", "The total number of skipped records.", defaultTags)));
        JmxReporter reporter = new JmxReporter("kafka.streams");
        this.metrics.addReporter((MetricsReporter)reporter);
        Assert.assertEquals((Object)"clientId-StreamThread-1", (Object)thread.getName());
        Assert.assertTrue((boolean)reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=%s", "stream-metrics", thread.getName())));
    }

    @Test
    public void shouldNotCommitBeforeTheCommitInterval() {
        long commitInterval = 1000L;
        Properties props = this.configProps(false);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig config = new StreamsConfig((Map)props);
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = this.mockTaskManagerCommit((Consumer<byte[], byte[]>)consumer, 1, 1);
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "");
        StreamThread thread = new StreamThread((Time)this.mockTime, config, null, consumer, consumer, null, taskManager, streamsMetrics, this.internalTopologyBuilder, "clientId", new LogContext(""), new AtomicInteger());
        thread.setNow(this.mockTime.milliseconds());
        thread.maybeCommit();
        this.mockTime.sleep(990L);
        thread.setNow(this.mockTime.milliseconds());
        thread.maybeCommit();
        EasyMock.verify((Object[])new Object[]{taskManager});
    }

    @Test
    public void shouldRespectNumIterationsInMainLoop() {
        MockProcessor mockProcessor = new MockProcessor(PunctuationType.WALL_CLOCK_TIME, 10L);
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addProcessor("processor1", () -> mockProcessor, new String[]{"source1"});
        this.internalTopologyBuilder.addProcessor("processor2", () -> new MockProcessor(PunctuationType.STREAM_TIME, 10L), new String[]{"source1"});
        Properties properties = new Properties();
        properties.put("commit.interval.ms", (Object)100L);
        StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig("stream-thread-test", "localhost:2171", Serdes.ByteArraySerde.class.getName(), Serdes.ByteArraySerde.class.getName(), properties));
        StreamThread thread = this.createStreamThread("clientId", config, false);
        thread.setState(StreamThread.State.STARTING);
        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        Set<TopicPartition> assignedPartitions = Collections.singleton(this.t1p1);
        thread.taskManager().setAssignmentMetadata(Collections.singletonMap(new TaskId(0, this.t1p1.partition()), assignedPartitions), Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(Collections.singleton(this.t1p1));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce();
        long offset = -1L;
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 0L);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 1L);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)2));
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)2));
        this.mockTime.sleep(11L);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 5L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 6L);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)2));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 11L);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 12L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 13L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 14L);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)3));
        mockProcessor.requestCommit();
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 15L);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)3));
        this.mockTime.sleep(90L);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)1));
    }

    @Test
    public void shouldNotCauseExceptionIfNothingCommitted() {
        long commitInterval = 1000L;
        Properties props = this.configProps(false);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig config = new StreamsConfig((Map)props);
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = this.mockTaskManagerCommit((Consumer<byte[], byte[]>)consumer, 1, 0);
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "");
        StreamThread thread = new StreamThread((Time)this.mockTime, config, null, consumer, consumer, null, taskManager, streamsMetrics, this.internalTopologyBuilder, "clientId", new LogContext(""), new AtomicInteger());
        thread.setNow(this.mockTime.milliseconds());
        thread.maybeCommit();
        this.mockTime.sleep(990L);
        thread.setNow(this.mockTime.milliseconds());
        thread.maybeCommit();
        EasyMock.verify((Object[])new Object[]{taskManager});
    }

    @Test
    public void shouldCommitAfterTheCommitInterval() {
        long commitInterval = 1000L;
        Properties props = this.configProps(false);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig config = new StreamsConfig((Map)props);
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = this.mockTaskManagerCommit((Consumer<byte[], byte[]>)consumer, 2, 1);
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "");
        StreamThread thread = new StreamThread((Time)this.mockTime, config, null, consumer, consumer, null, taskManager, streamsMetrics, this.internalTopologyBuilder, "clientId", new LogContext(""), new AtomicInteger());
        thread.setNow(this.mockTime.milliseconds());
        thread.maybeCommit();
        this.mockTime.sleep(1001L);
        thread.setNow(this.mockTime.milliseconds());
        thread.maybeCommit();
        EasyMock.verify((Object[])new Object[]{taskManager});
    }

    private TaskManager mockTaskManagerCommit(Consumer<byte[], byte[]> consumer, int numberOfCommits, int commits) {
        TaskManager taskManager = (TaskManager)EasyMock.createNiceMock(TaskManager.class);
        EasyMock.expect((Object)taskManager.commitAll()).andReturn((Object)commits).times(numberOfCommits);
        EasyMock.replay((Object[])new Object[]{taskManager, consumer});
        return taskManager;
    }

    @Test
    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        assignedPartitions.add(this.t1p2);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        activeTasks.put(this.task2, Collections.singleton(this.t1p2));
        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(assignedPartitions);
        HashMap<TopicPartition, Long> beginOffsets = new HashMap<TopicPartition, Long>();
        beginOffsets.put(this.t1p1, 0L);
        beginOffsets.put(this.t1p2, 0L);
        mockConsumer.updateBeginningOffsets(beginOffsets);
        thread.rebalanceListener.onPartitionsAssigned(new HashSet(assignedPartitions));
        Assert.assertEquals((long)1L, (long)this.clientSupplier.producers.size());
        Producer globalProducer = (Producer)this.clientSupplier.producers.get(0);
        for (Task task : thread.tasks().values()) {
            Assert.assertSame((Object)globalProducer, (Object)((RecordCollectorImpl)((StreamTask)task).recordCollector()).producer());
        }
        Assert.assertSame(this.clientSupplier.consumer, (Object)thread.consumer);
        Assert.assertSame(this.clientSupplier.restoreConsumer, (Object)thread.restoreConsumer);
    }

    @Test
    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)this.configProps(true)), true);
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        assignedPartitions.add(this.t1p2);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        activeTasks.put(this.task2, Collections.singleton(this.t1p2));
        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(assignedPartitions);
        HashMap<TopicPartition, Long> beginOffsets = new HashMap<TopicPartition, Long>();
        beginOffsets.put(this.t1p1, 0L);
        beginOffsets.put(this.t1p2, 0L);
        mockConsumer.updateBeginningOffsets(beginOffsets);
        thread.rebalanceListener.onPartitionsAssigned(new HashSet(assignedPartitions));
        thread.runOnce();
        Assert.assertEquals((long)thread.tasks().size(), (long)this.clientSupplier.producers.size());
        Assert.assertSame(this.clientSupplier.consumer, (Object)thread.consumer);
        Assert.assertSame(this.clientSupplier.restoreConsumer, (Object)thread.restoreConsumer);
    }

    @Test
    public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)this.configProps(true)), true);
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        assignedPartitions.add(this.t1p2);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        activeTasks.put(this.task2, Collections.singleton(this.t1p2));
        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(assignedPartitions);
        HashMap<TopicPartition, Long> beginOffsets = new HashMap<TopicPartition, Long>();
        beginOffsets.put(this.t1p1, 0L);
        beginOffsets.put(this.t1p2, 0L);
        mockConsumer.updateBeginningOffsets(beginOffsets);
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.shutdown();
        thread.run();
        for (Task task : thread.tasks().values()) {
            Assert.assertTrue((boolean)((MockProducer)((RecordCollectorImpl)((StreamTask)task).recordCollector()).producer()).closed());
        }
    }

    @Test
    public void shouldShutdownTaskManagerOnClose() {
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager)EasyMock.createNiceMock(TaskManager.class);
        taskManager.shutdown(true);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{taskManager, consumer});
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "");
        StreamThread thread = new StreamThread((Time)this.mockTime, this.config, null, consumer, consumer, null, taskManager, streamsMetrics, this.internalTopologyBuilder, "clientId", new LogContext(""), new AtomicInteger()).updateThreadMetadata(StreamThread.getSharedAdminClientId((String)"clientId"));
        thread.setStateListener((t, newState, oldState) -> {
            if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
                thread.shutdown();
            }
        });
        thread.run();
        EasyMock.verify((Object[])new Object[]{taskManager});
    }

    @Test
    public void shouldShutdownTaskManagerOnCloseWithoutStart() {
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager)EasyMock.createNiceMock(TaskManager.class);
        taskManager.shutdown(true);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{taskManager, consumer});
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "");
        StreamThread thread = new StreamThread((Time)this.mockTime, this.config, null, consumer, consumer, null, taskManager, streamsMetrics, this.internalTopologyBuilder, "clientId", new LogContext(""), new AtomicInteger()).updateThreadMetadata(StreamThread.getSharedAdminClientId((String)"clientId"));
        thread.shutdown();
        EasyMock.verify((Object[])new Object[]{taskManager});
    }

    @Test
    public void shouldNotThrowWhenPendingShutdownInRunOnce() {
        this.mockRunOnce(true);
    }

    @Test
    public void shouldNotThrowWithoutPendingShutdownInRunOnce() {
        this.mockRunOnce(false);
    }

    private void mockRunOnce(final boolean shutdownOnPoll) {
        final List<TopicPartition> assignedPartitions = Collections.singletonList(this.t1p1);
        class MockStreamThreadConsumer<K, V>
        extends MockConsumer<K, V> {
            private StreamThread streamThread;

            MockStreamThreadConsumer(OffsetResetStrategy offsetResetStrategy) {
                super(offsetResetStrategy);
            }

            public synchronized ConsumerRecords<K, V> poll(Duration timeout) {
                Assert.assertNotNull((Object)this.streamThread);
                if (shutdownOnPoll) {
                    this.streamThread.shutdown();
                }
                this.streamThread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
                return super.poll(timeout);
            }

            private void setStreamThread(StreamThread streamThread) {
                this.streamThread = streamThread;
            }
        }
        MockStreamThreadConsumer<byte[], byte[]> mockStreamThreadConsumer = new MockStreamThreadConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST);
        TaskManager taskManager = new TaskManager((ChangelogReader)new MockChangelogReader(), this.processId, "log-prefix", mockStreamThreadConsumer, this.streamsMetadataState, null, null, null, new AssignedStreamsTasks(new LogContext()), new AssignedStandbyTasks(new LogContext()));
        taskManager.setConsumer(mockStreamThreadConsumer);
        taskManager.setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap());
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "clientId");
        StreamThread thread = new StreamThread((Time)this.mockTime, this.config, null, mockStreamThreadConsumer, mockStreamThreadConsumer, null, taskManager, streamsMetrics, this.internalTopologyBuilder, "clientId", new LogContext(""), new AtomicInteger()).updateThreadMetadata(StreamThread.getSharedAdminClientId((String)"clientId"));
        ((MockStreamThreadConsumer)mockStreamThreadConsumer).setStreamThread(thread);
        mockStreamThreadConsumer.assign(assignedPartitions);
        mockStreamThreadConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.addRecord(mockStreamThreadConsumer, 1L, 0L);
        thread.setState(StreamThread.State.STARTING);
        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        thread.runOnce();
    }

    @Test
    public void shouldOnlyShutdownOnce() {
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager)EasyMock.createNiceMock(TaskManager.class);
        taskManager.shutdown(true);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{taskManager, consumer});
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "");
        StreamThread thread = new StreamThread((Time)this.mockTime, this.config, null, consumer, consumer, null, taskManager, streamsMetrics, this.internalTopologyBuilder, "clientId", new LogContext(""), new AtomicInteger()).updateThreadMetadata(StreamThread.getSharedAdminClientId((String)"clientId"));
        thread.shutdown();
        thread.run();
        EasyMock.verify((Object[])new Object[]{taskManager});
    }

    @Test
    public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() {
        this.internalTopologyBuilder.addSource(null, "name", null, null, null, new String[]{"topic"});
        this.internalTopologyBuilder.addSink("out", "output", null, null, null, new String[]{"name"});
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        standbyTasks.put(this.task1, Collections.singleton(this.t1p1));
        thread.taskManager().setAssignmentMetadata(Collections.emptyMap(), standbyTasks);
        thread.taskManager().createTasks(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
        this.internalTopologyBuilder.addSource(null, "source", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, new String[]{"source"});
        StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)this.configProps(true)), true);
        MockConsumer<byte[], byte[]> consumer = this.clientSupplier.consumer;
        consumer.updatePartitions("topic1", Collections.singletonList(new PartitionInfo("topic1", 1, null, null, null)));
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.tasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MockProducer producer = this.clientSupplier.producers.get(0);
        consumer.updateBeginningOffsets(Collections.singletonMap(assignedPartitions.iterator().next(), 0L));
        consumer.unsubscribe();
        consumer.assign(new HashSet(assignedPartitions));
        consumer.addRecord(new ConsumerRecord("topic1", 1, 0L, (Object)new byte[0], (Object)new byte[0]));
        this.mockTime.sleep(this.config.getLong("commit.interval.ms") + 1L);
        thread.runOnce();
        MatcherAssert.assertThat((Object)producer.history().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        Assert.assertFalse((boolean)producer.transactionCommitted());
        this.mockTime.sleep(this.config.getLong("commit.interval.ms") + 1L);
        TestUtils.waitForCondition(() -> producer.commitCount() == 1L, (String)"StreamsThread did not commit transaction.");
        producer.fenceProducer();
        this.mockTime.sleep(this.config.getLong("commit.interval.ms") + 1L);
        consumer.addRecord(new ConsumerRecord("topic1", 1, 1L, (Object)new byte[0], (Object)new byte[0]));
        try {
            thread.runOnce();
            Assert.fail((String)"Should have thrown TaskMigratedException");
        }
        catch (TaskMigratedException taskMigratedException) {
            // empty catch block
        }
        TestUtils.waitForCondition(() -> thread.tasks().isEmpty(), (String)"StreamsThread did not remove fenced zombie task.");
        MatcherAssert.assertThat((Object)producer.commitCount(), (Matcher)CoreMatchers.equalTo((Object)1L));
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTaks() {
        StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)this.configProps(true)), true);
        this.internalTopologyBuilder.addSource(null, "name", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("out", "output", null, null, null, new String[]{"name"});
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.tasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.clientSupplier.producers.get(0).fenceProducer();
        thread.rebalanceListener.onPartitionsRevoked(null);
        Assert.assertTrue((boolean)this.clientSupplier.producers.get(0).transactionInFlight());
        Assert.assertFalse((boolean)this.clientSupplier.producers.get(0).transactionCommitted());
        Assert.assertTrue((boolean)this.clientSupplier.producers.get(0).closed());
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCloseTransactionWhenSuspendingTasks() {
        StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)this.configProps(true)), true);
        this.internalTopologyBuilder.addSource(null, "name", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("out", "output", null, null, null, new String[]{"name"});
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce();
        MatcherAssert.assertThat((Object)thread.tasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.clientSupplier.producers.get(0).fenceProducerOnClose();
        thread.rebalanceListener.onPartitionsRevoked(null);
        Assert.assertFalse((boolean)this.clientSupplier.producers.get(0).transactionInFlight());
        Assert.assertTrue((boolean)this.clientSupplier.producers.get(0).transactionCommitted());
        Assert.assertFalse((boolean)this.clientSupplier.producers.get(0).closed());
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
    }

    @Test
    public void shouldReturnActiveTaskMetadataWhileRunningState() {
        this.internalTopologyBuilder.addSource(null, "source", null, null, null, new String[]{"topic1"});
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce();
        ThreadMetadata threadMetadata = thread.threadMetadata();
        Assert.assertEquals((Object)StreamThread.State.RUNNING.name(), (Object)threadMetadata.threadState());
        Assert.assertTrue((boolean)threadMetadata.activeTasks().contains(new TaskMetadata(this.task1.toString(), Utils.mkSet((Object[])new TopicPartition[]{this.t1p1}))));
        Assert.assertTrue((boolean)threadMetadata.standbyTasks().isEmpty());
    }

    @Test
    public void shouldReturnStandbyTaskMetadataWhileRunningState() {
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).groupByKey().count(Materialized.as((String)"count-one"));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0])));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
        restoreConsumer.updateEndOffsets(offsets);
        restoreConsumer.updateBeginningOffsets(offsets);
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        standbyTasks.put(this.task1, Collections.singleton(this.t1p1));
        thread.taskManager().setAssignmentMetadata(Collections.emptyMap(), standbyTasks);
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        thread.runOnce();
        ThreadMetadata threadMetadata = thread.threadMetadata();
        Assert.assertEquals((Object)StreamThread.State.RUNNING.name(), (Object)threadMetadata.threadState());
        Assert.assertTrue((boolean)threadMetadata.standbyTasks().contains(new TaskMetadata(this.task1.toString(), Utils.mkSet((Object[])new TopicPartition[]{this.t1p1}))));
        Assert.assertTrue((boolean)threadMetadata.activeTasks().isEmpty());
    }

    @Test
    public void shouldUpdateStandbyTask() throws Exception {
        String storeName1 = "count-one";
        String storeName2 = "table-two";
        String changelogName1 = "stream-thread-test-count-one-changelog";
        String changelogName2 = "stream-thread-test-table-two-changelog";
        TopicPartition partition1 = new TopicPartition("stream-thread-test-count-one-changelog", 1);
        TopicPartition partition2 = new TopicPartition("stream-thread-test-table-two-changelog", 1);
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).groupByKey().count(Materialized.as((String)"count-one"));
        MaterializedInternal materialized = new MaterializedInternal(Materialized.as((String)"table-two"), (InternalNameProvider)this.internalStreamsBuilder, "");
        this.internalStreamsBuilder.table("topic2", new ConsumedInternal(), materialized);
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 1, null, new Node[0], new Node[0])));
        restoreConsumer.assign((Collection)Utils.mkSet((Object[])new TopicPartition[]{partition1, partition2}));
        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L));
        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L));
        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L));
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(this.task3), ".checkpoint"));
        checkpoint.write(Collections.singletonMap(partition2, 5L));
        for (long i = 0L; i < 10L; ++i) {
            restoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-one-changelog", 1, i, (Object)("K" + i).getBytes(), (Object)("V" + i).getBytes()));
            restoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-table-two-changelog", 1, i, (Object)("K" + i).getBytes(), (Object)("V" + i).getBytes()));
        }
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        standbyTasks.put(this.task1, Collections.singleton(this.t1p1));
        standbyTasks.put(this.task3, Collections.singleton(this.t2p1));
        thread.taskManager().setAssignmentMetadata(Collections.emptyMap(), standbyTasks);
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        thread.runOnce();
        StandbyTask standbyTask1 = thread.taskManager().standbyTask(partition1);
        StandbyTask standbyTask2 = thread.taskManager().standbyTask(partition2);
        KeyValueStore store1 = (KeyValueStore)standbyTask1.getStore("count-one");
        KeyValueStore store2 = (KeyValueStore)standbyTask2.getStore("table-two");
        Assert.assertEquals((long)10L, (long)store1.approximateNumEntries());
        Assert.assertEquals((long)5L, (long)store2.approximateNumEntries());
        Assert.assertEquals((long)0L, (long)thread.standbyRecords().size());
    }

    @Test
    public void shouldCreateStandbyTask() {
        this.setupInternalTopologyWithoutState();
        this.internalTopologyBuilder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("myStore", true), new String[]{"processor1"});
        StandbyTask standbyTask = this.createStandbyTask();
        MatcherAssert.assertThat((Object)standbyTask, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.nullValue()));
    }

    @Test
    public void shouldNotCreateStandbyTaskWithoutStateStores() {
        this.setupInternalTopologyWithoutState();
        StandbyTask standbyTask = this.createStandbyTask();
        MatcherAssert.assertThat((Object)standbyTask, (Matcher)CoreMatchers.nullValue());
    }

    @Test
    public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled() {
        this.setupInternalTopologyWithoutState();
        MockKeyValueStoreBuilder storeBuilder = new MockKeyValueStoreBuilder("myStore", true);
        storeBuilder.withLoggingDisabled();
        this.internalTopologyBuilder.addStateStore((StoreBuilder)storeBuilder, new String[]{"processor1"});
        StandbyTask standbyTask = this.createStandbyTask();
        MatcherAssert.assertThat((Object)standbyTask, (Matcher)CoreMatchers.nullValue());
    }

    private void setupInternalTopologyWithoutState() {
        MockProcessor mockProcessor = new MockProcessor();
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addProcessor("processor1", () -> mockProcessor, new String[]{"source1"});
    }

    private StandbyTask createStandbyTask() {
        LogContext logContext = new LogContext("test");
        Logger log = logContext.logger(StreamThreadTest.class);
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "clientId");
        StreamThread.StandbyTaskCreator standbyTaskCreator = new StreamThread.StandbyTaskCreator(this.internalTopologyBuilder, this.config, streamsMetrics, this.stateDirectory, (ChangelogReader)new MockChangelogReader(), (Time)this.mockTime, log);
        return standbyTaskCreator.createTask((Consumer)new MockConsumer(OffsetResetStrategy.EARLIEST), new TaskId(1, 2), Collections.emptySet());
    }

    @Test
    public void shouldPunctuateActiveTask() {
        final ArrayList punctuatedStreamTime = new ArrayList();
        final ArrayList punctuatedWallClockTime = new ArrayList();
        ProcessorSupplier punctuateProcessor = () -> new Processor<Object, Object>(){

            public void init(ProcessorContext context) {
                context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
                context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add);
            }

            public void process(Object key, Object value) {
            }

            public void close() {
            }
        };
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).process(punctuateProcessor, new String[0]);
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(null);
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
        this.clientSupplier.consumer.assign(assignedPartitions);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce();
        Assert.assertEquals((long)0L, (long)punctuatedStreamTime.size());
        Assert.assertEquals((long)0L, (long)punctuatedWallClockTime.size());
        this.mockTime.sleep(100L);
        for (long i = 0L; i < 10L; ++i) {
            this.clientSupplier.consumer.addRecord(new ConsumerRecord("topic1", 1, i, i * 100L, TimestampType.CREATE_TIME, -1L, ("K" + i).getBytes().length, ("V" + i).getBytes().length, (Object)("K" + i).getBytes(), (Object)("V" + i).getBytes()));
        }
        thread.runOnce();
        Assert.assertEquals((long)1L, (long)punctuatedStreamTime.size());
        Assert.assertEquals((long)1L, (long)punctuatedWallClockTime.size());
        this.mockTime.sleep(100L);
        thread.runOnce();
        Assert.assertEquals((long)1L, (long)punctuatedStreamTime.size());
        Assert.assertEquals((long)2L, (long)punctuatedWallClockTime.size());
    }

    @Test
    public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        ThreadMetadata metadata = thread.threadMetadata();
        Assert.assertEquals((Object)StreamThread.State.CREATED.name(), (Object)metadata.threadState());
        thread.setState(StreamThread.State.STARTING);
        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        thread.setState(StreamThread.State.RUNNING);
        metadata = thread.threadMetadata();
        Assert.assertEquals((Object)StreamThread.State.RUNNING.name(), (Object)metadata.threadState());
    }

    @Test
    public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() {
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).groupByKey().count(Materialized.as((String)"count-one"));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Arrays.asList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0]), new PartitionInfo("stream-thread-test-count-one-changelog", 1, null, new Node[0], new Node[0])));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
        restoreConsumer.updateEndOffsets(offsets);
        restoreConsumer.updateBeginningOffsets(offsets);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        thread.setState(StreamThread.State.STARTING);
        thread.rebalanceListener.onPartitionsRevoked(assignedPartitions);
        this.assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        standbyTasks.put(this.task2, Collections.singleton(this.t1p2));
        thread.taskManager().setAssignmentMetadata(activeTasks, standbyTasks);
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        this.assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception {
        this.internalStreamsBuilder.stream(Collections.singleton("topic"), this.consumed).groupByKey().count(Materialized.as((String)"count"));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamThread thread = this.createStreamThread("clientId", this.config, false);
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        MockConsumer mockRestoreConsumer = (MockConsumer)thread.restoreConsumer;
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Set<TopicPartition> topicPartitionSet = Collections.singleton(topicPartition);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(new TaskId(0, 0), topicPartitionSet);
        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
        mockConsumer.updatePartitions("topic", Collections.singletonList(new PartitionInfo("topic", 0, null, new Node[0], new Node[0])));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        mockRestoreConsumer.updatePartitions("stream-thread-test-count-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-changelog", 0, null, new Node[0], new Node[0])));
        TopicPartition changelogPartition = new TopicPartition("stream-thread-test-count-changelog", 0);
        final Set<TopicPartition> changelogPartitionSet = Collections.singleton(changelogPartition);
        mockRestoreConsumer.updateBeginningOffsets(Collections.singletonMap(changelogPartition, 0L));
        mockRestoreConsumer.updateEndOffsets(Collections.singletonMap(changelogPartition, 2L));
        mockConsumer.schedulePollTask(() -> {
            thread.setState(StreamThread.State.PARTITIONS_REVOKED);
            thread.rebalanceListener.onPartitionsAssigned((Collection)topicPartitionSet);
        });
        try {
            thread.start();
            TestUtils.waitForCondition(() -> mockRestoreConsumer.assignment().size() == 1, (String)"Never restore first record");
            mockRestoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 0L, (Object)"K1".getBytes(), (Object)"V1".getBytes()));
            TestUtils.waitForCondition(() -> mockRestoreConsumer.position(changelogPartition) == 1L, (String)"Never restore first record");
            mockRestoreConsumer.setPollException((KafkaException)new InvalidOffsetException("Try Again!"){

                public Set<TopicPartition> partitions() {
                    return changelogPartitionSet;
                }
            });
            mockRestoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 0L, (Object)"K1".getBytes(), (Object)"V1".getBytes()));
            mockRestoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 1L, (Object)"K2".getBytes(), (Object)"V2".getBytes()));
            TestUtils.waitForCondition(() -> {
                mockRestoreConsumer.assign((Collection)changelogPartitionSet);
                return mockRestoreConsumer.position(changelogPartition) == 2L;
            }, (String)"Never finished restore");
        }
        finally {
            thread.shutdown();
            thread.join(10000L);
        }
    }

    @Test
    public void shouldRecordSkippedMetricForDeserializationException() {
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Properties config = this.configProps(false);
        config.setProperty("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class.getName());
        config.setProperty("default.value.serde", Serdes.Integer().getClass().getName());
        StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)config), false);
        thread.setState(StreamThread.State.STARTING);
        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        Set<TopicPartition> assignedPartitions = Collections.singleton(this.t1p1);
        thread.taskManager().setAssignmentMetadata(Collections.singletonMap(new TaskId(0, this.t1p1.partition()), assignedPartitions), Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(Collections.singleton(this.t1p1));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce();
        MetricName skippedTotalMetric = this.metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
        MetricName skippedRateMetric = this.metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
        Assert.assertEquals((Object)0.0, (Object)this.metrics.metric(skippedTotalMetric).metricValue());
        Assert.assertEquals((Object)0.0, (Object)this.metrics.metric(skippedRateMetric).metricValue());
        long offset = -1L;
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), ++offset, -1L, TimestampType.CREATE_TIME, -1L, -1, -1, (Object)new byte[0], (Object)"I am not an integer.".getBytes()));
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), ++offset, -1L, TimestampType.CREATE_TIME, -1L, -1, -1, (Object)new byte[0], (Object)"I am not an integer.".getBytes()));
        thread.runOnce();
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(skippedTotalMetric).metricValue());
        Assert.assertNotEquals((Object)0.0, (Object)this.metrics.metric(skippedRateMetric).metricValue());
        LogCaptureAppender.unregister(appender);
        List<String> strings = appender.getMessages();
        Assert.assertTrue((boolean)strings.contains("task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[0]"));
        Assert.assertTrue((boolean)strings.contains("task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[1]"));
    }

    @Test
    public void shouldReportSkippedRecordsForInvalidTimestamps() {
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Properties config = this.configProps(false);
        config.setProperty("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class.getName());
        StreamThread thread = this.createStreamThread("clientId", new StreamsConfig((Map)config), false);
        thread.setState(StreamThread.State.STARTING);
        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        Set<TopicPartition> assignedPartitions = Collections.singleton(this.t1p1);
        thread.taskManager().setAssignmentMetadata(Collections.singletonMap(new TaskId(0, this.t1p1.partition()), assignedPartitions), Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)thread.consumer;
        mockConsumer.assign(Collections.singleton(this.t1p1));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
        thread.runOnce();
        MetricName skippedTotalMetric = this.metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
        MetricName skippedRateMetric = this.metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
        Assert.assertEquals((Object)0.0, (Object)this.metrics.metric(skippedTotalMetric).metricValue());
        Assert.assertEquals((Object)0.0, (Object)this.metrics.metric(skippedRateMetric).metricValue());
        long offset = -1L;
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
        thread.runOnce();
        Assert.assertEquals((Object)2.0, (Object)this.metrics.metric(skippedTotalMetric).metricValue());
        Assert.assertNotEquals((Object)0.0, (Object)this.metrics.metric(skippedRateMetric).metricValue());
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
        thread.runOnce();
        Assert.assertEquals((Object)6.0, (Object)this.metrics.metric(skippedTotalMetric).metricValue());
        Assert.assertNotEquals((Object)0.0, (Object)this.metrics.metric(skippedRateMetric).metricValue());
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 1L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 1L);
        thread.runOnce();
        Assert.assertEquals((Object)6.0, (Object)this.metrics.metric(skippedTotalMetric).metricValue());
        Assert.assertNotEquals((Object)0.0, (Object)this.metrics.metric(skippedRateMetric).metricValue());
        LogCaptureAppender.unregister(appender);
        List<String> strings = appender.getMessages();
        Assert.assertTrue((boolean)strings.contains("task [0_1] Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[0] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
        Assert.assertTrue((boolean)strings.contains("task [0_1] Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[1] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
        Assert.assertTrue((boolean)strings.contains("task [0_1] Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[2] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
        Assert.assertTrue((boolean)strings.contains("task [0_1] Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[3] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
        Assert.assertTrue((boolean)strings.contains("task [0_1] Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[4] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
        Assert.assertTrue((boolean)strings.contains("task [0_1] Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[5] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
    }

    private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata metadata, StreamThread.State state) {
        Assert.assertEquals((Object)state.name(), (Object)metadata.threadState());
        Assert.assertTrue((boolean)metadata.activeTasks().isEmpty());
        Assert.assertTrue((boolean)metadata.standbyTasks().isEmpty());
    }

    @Test
    public void producerMetricsVerificationWithoutEOS() {
        MockProducer producer = new MockProducer();
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = this.mockTaskManagerCommit((Consumer<byte[], byte[]>)consumer, 1, 0);
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "");
        StreamThread thread = new StreamThread((Time)this.mockTime, this.config, (Producer)producer, consumer, consumer, null, taskManager, streamsMetrics, this.internalTopologyBuilder, "clientId", new LogContext(""), new AtomicInteger());
        MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap());
        KafkaMetric testMetric = new KafkaMetric(new Object(), testMetricName, (MetricValueProvider)((Measurable)(config, now) -> 0.0), null, (Time)new MockTime());
        producer.setMockMetrics(testMetricName, (Metric)testMetric);
        Map producerMetrics = thread.producerMetrics();
        Assert.assertEquals((Object)testMetricName, (Object)((Metric)producerMetrics.get(testMetricName)).metricName());
    }

    @Test
    public void adminClientMetricsVerification() {
        Node broker1 = new Node(0, "dummyHost-1", 1234);
        Node broker2 = new Node(1, "dummyHost-2", 1234);
        List<Node> cluster = Arrays.asList(broker1, broker2);
        MockAdminClient adminClient = new MockAdminClient(cluster, broker1, null);
        MockProducer producer = new MockProducer();
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager)EasyMock.createNiceMock(TaskManager.class);
        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(this.metrics, "");
        StreamThread thread = new StreamThread((Time)this.mockTime, this.config, (Producer)producer, consumer, consumer, null, taskManager, streamsMetrics, this.internalTopologyBuilder, "clientId", new LogContext(""), new AtomicInteger());
        MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap());
        KafkaMetric testMetric = new KafkaMetric(new Object(), testMetricName, (MetricValueProvider)((Measurable)(config, now) -> 0.0), null, (Time)new MockTime());
        EasyMock.expect((Object)taskManager.getAdminClient()).andReturn((Object)adminClient);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{taskManager, consumer});
        adminClient.setMockMetrics(testMetricName, (Metric)testMetric);
        Map adminClientMetrics = thread.adminClientMetrics();
        Assert.assertEquals((Object)testMetricName, (Object)((Metric)adminClientMetrics.get(testMetricName)).metricName());
    }

    private void addRecord(MockConsumer<byte[], byte[]> mockConsumer, long offset) {
        this.addRecord(mockConsumer, offset, -1L);
    }

    private void addRecord(MockConsumer<byte[], byte[]> mockConsumer, long offset, long timestamp) {
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), offset, timestamp, TimestampType.CREATE_TIME, -1L, -1, -1, (Object)new byte[0], (Object)new byte[0]));
    }

    private static class StateListenerStub
    implements StreamThread.StateListener {
        int numChanges = 0;
        ThreadStateTransitionValidator oldState = null;
        ThreadStateTransitionValidator newState = null;

        private StateListenerStub() {
        }

        public void onChange(Thread thread, ThreadStateTransitionValidator newState, ThreadStateTransitionValidator oldState) {
            ++this.numChanges;
            if (this.newState != null && this.newState != oldState) {
                throw new RuntimeException("State mismatch " + oldState + " different from " + this.newState);
            }
            this.oldState = oldState;
            this.newState = newState;
        }
    }
}

