/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ActiveTaskCreator;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.MockChangelogReader;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.RecordDeserializer;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StandbyTaskCreator;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateUpdater;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.TaskMetadataImpl;
import org.apache.kafka.streams.processor.internals.Tasks;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockStandbyUpdateListener;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsInstanceOf;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;

@ExtendWith(value={MockitoExtension.class})
@MockitoSettings(strictness=Strictness.STRICT_STUBS)
public class StreamThreadTest {
    private static final String APPLICATION_ID = "stream-thread-test";
    private static final UUID PROCESS_ID = UUID.fromString("87bf53a8-54f2-485f-a4b6-acdbec0a8b3d");
    private static final String CLIENT_ID = "stream-thread-test-" + PROCESS_ID;
    public static final String STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG = "stream-thread-test-count-one-changelog";
    public static final String STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG = "stream-thread-test-table-two-changelog";
    private final int threadIdx = 1;
    private final Metrics metrics = new Metrics();
    private final MockTime mockTime = new MockTime();
    private final String stateDir = TestUtils.tempDirectory().getPath();
    private final MockClientSupplier clientSupplier = new MockClientSupplier();
    private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal();
    private final ChangelogReader changelogReader = new MockChangelogReader();
    private StateDirectory stateDirectory = null;
    private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(this.internalTopologyBuilder);
    private StreamThread thread = null;
    private final Consumer<byte[], byte[]> mainConsumer = (Consumer)Mockito.mock(Consumer.class);
    @Mock
    private Consumer<byte[], byte[]> consumer;
    private static final BiConsumer<Throwable, Boolean> HANDLER = (e, b) -> {
        if (e instanceof RuntimeException) {
            throw (RuntimeException)e;
        }
        if (e instanceof Error) {
            throw (Error)e;
        }
        throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", (Throwable)e);
    };
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(1, 1);

    static Stream<Arguments> data() {
        return Stream.of(Arguments.of((Object[])new Object[]{false, false}), Arguments.of((Object[])new Object[]{true, false}), Arguments.of((Object[])new Object[]{true, true}));
    }

    @BeforeEach
    public void setUp() {
        Thread.currentThread().setName(CLIENT_ID + "-StreamThread-" + 1);
        this.internalTopologyBuilder.setApplicationId(APPLICATION_ID);
    }

    @AfterEach
    public void tearDown() {
        if (this.thread != null) {
            this.thread.shutdown();
            this.thread = null;
        }
        this.stateDirectory = null;
    }

    private Properties configProps(boolean enableEoS, boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        return Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)APPLICATION_ID), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"buffered.records.per.partition", (Object)"3"), Utils.mkEntry((Object)"default.timestamp.extractor", (Object)MockTimestampExtractor.class.getName()), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getAbsolutePath()), Utils.mkEntry((Object)"processing.guarantee", (Object)(enableEoS ? "exactly_once_v2" : "at_least_once")), Utils.mkEntry((Object)"default.key.serde", (Object)Serdes.ByteArraySerde.class.getName()), Utils.mkEntry((Object)"default.value.serde", (Object)Serdes.ByteArraySerde.class.getName()), Utils.mkEntry((Object)"__state.updater.enabled__", (Object)Boolean.toString(stateUpdaterEnabled)), Utils.mkEntry((Object)"__processing.threads.enabled__", (Object)Boolean.toString(processingThreadsEnabled))}));
    }

    private Cluster createCluster() {
        Node node = new Node(-1, "localhost", 8121);
        return new Cluster("mockClusterId", Collections.singletonList(node), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), node);
    }

    private StreamThread createStreamThread(String clientId, boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        return this.createStreamThread(clientId, (Time)this.mockTime, stateUpdaterEnabled, processingThreadsEnabled);
    }

    private StreamThread createStreamThread(String clientId, Time time, boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        return this.createStreamThread(clientId, config, time);
    }

    private StreamThread createStreamThread(String clientId, StreamsConfig config) {
        return this.createStreamThread(clientId, config, (Time)this.mockTime);
    }

    private StreamThread createStreamThread(String clientId, StreamsConfig config, Time time) {
        if (!"at_least_once".equals(config.getString("processing.guarantee"))) {
            this.clientSupplier.setApplicationIdForProducer(APPLICATION_ID);
        }
        this.clientSupplier.setCluster(this.createCluster());
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, config.getString("built.in.metrics.version"), time);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.stateDirectory = new StateDirectory(config, (Time)this.mockTime, true, false);
        StreamsMetadataState streamsMetadataState = new StreamsMetadataState(new TopologyMetadata(this.internalTopologyBuilder, config), StreamsMetadataState.UNKNOWN_HOST, new LogContext(String.format("stream-client [%s] ", CLIENT_ID)));
        return StreamThread.create((TopologyMetadata)topologyMetadata, (StreamsConfig)config, (KafkaClientSupplier)this.clientSupplier, (Admin)this.clientSupplier.getAdmin(config.getAdminConfigs(clientId)), (UUID)PROCESS_ID, (String)clientId, (StreamsMetricsImpl)streamsMetrics, (Time)time, (StreamsMetadataState)streamsMetadataState, (long)0L, (StateDirectory)this.stateDirectory, (StateRestoreListener)new MockStateRestoreListener(), (StandbyUpdateListener)new MockStandbyUpdateListener(), (int)1, null, HANDLER);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldChangeStateInRebalanceListener(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.thread = this.createStreamThread(CLIENT_ID, stateUpdaterEnabled, processingThreadsEnabled);
        StateListenerStub stateListener = new StateListenerStub();
        this.thread.setStateListener((StreamThread.StateListener)stateListener);
        Assertions.assertEquals((Object)this.thread.state(), (Object)StreamThread.State.CREATED);
        ConsumerRebalanceListener rebalanceListener = this.thread.rebalanceListener();
        this.thread.setState(StreamThread.State.STARTING);
        List revokedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        Assertions.assertEquals((Object)this.thread.state(), (Object)StreamThread.State.PARTITIONS_REVOKED);
        List<TopicPartition> assignedPartitions = Collections.singletonList(this.t1p1);
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((Object)this.thread.state(), (Object)StreamThread.State.RUNNING);
        Assertions.assertEquals((int)4, (int)stateListener.numChanges);
        Assertions.assertEquals((Object)StreamThread.State.PARTITIONS_ASSIGNED, (Object)stateListener.oldState);
        this.thread.shutdown();
        Assertions.assertSame((Object)StreamThread.State.PENDING_SHUTDOWN, (Object)this.thread.state());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldChangeStateAtStartClose(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.thread = this.createStreamThread(CLIENT_ID, (Time)new MockTime(1L), stateUpdaterEnabled, processingThreadsEnabled);
        StateListenerStub stateListener = new StateListenerStub();
        this.thread.setStateListener((StreamThread.StateListener)stateListener);
        this.thread.start();
        TestUtils.waitForCondition(() -> this.thread.state() == StreamThread.State.STARTING, (long)10000L, (String)"Thread never started.");
        this.thread.shutdown();
        TestUtils.waitForCondition(() -> this.thread.state() == StreamThread.State.DEAD, (long)10000L, (String)"Thread never shut down.");
        this.thread.shutdown();
        Assertions.assertEquals((Object)this.thread.state(), (Object)StreamThread.State.DEAD);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldCreateMetricsAtStartup(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.thread = this.createStreamThread(CLIENT_ID, (Time)new MockTime(1L), stateUpdaterEnabled, processingThreadsEnabled);
        String defaultGroupName = "stream-thread-metrics";
        Map<String, String> defaultTags = Collections.singletonMap("thread-id", this.thread.getName());
        String descriptionIsNotVerified = "";
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-total", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-ratio", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-avg", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-max", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-rate", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-total", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-ratio", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-records-avg", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-records-max", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-avg", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-max", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-rate", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-total", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-ratio", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-records-avg", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-records-max", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-avg", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-max", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-rate", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-total", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-ratio", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-rate", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-total", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-rate", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-total", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-rate", "stream-thread-metrics", "", defaultTags)));
        Assertions.assertNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-total", "stream-thread-metrics", "", defaultTags)));
        String taskGroupName = "stream-task-metrics";
        Map taskTags = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"task-id", (Object)"all"), Utils.mkEntry((Object)"thread-id", (Object)this.thread.getName())});
        Assertions.assertNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", "stream-task-metrics", "", taskTags)));
        Assertions.assertNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", "stream-task-metrics", "", taskTags)));
        Assertions.assertNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-task-metrics", "", taskTags)));
        JmxReporter reporter = new JmxReporter();
        KafkaMetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
        reporter.contextChange((MetricsContext)metricsContext);
        this.metrics.addReporter((MetricsReporter)reporter);
        Assertions.assertEquals((Object)(CLIENT_ID + "-StreamThread-1"), (Object)this.thread.getName());
        Assertions.assertTrue((boolean)reporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s", "stream-thread-metrics", "thread-id", this.thread.getName())));
        Assertions.assertFalse((boolean)reporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=all", "thread-id", this.thread.getName())));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCommitBeforeTheCommitInterval(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        long commitInterval = 1000L;
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig config = new StreamsConfig((Map)props);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        Task runningTask = (Task)Mockito.mock(Task.class);
        TaskManager taskManager = this.mockTaskManagerCommit(runningTask, 1);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        this.mockTime.sleep(990L);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        ((TaskManager)Mockito.verify((Object)taskManager)).commit((Collection)Utils.mkSet((Object[])new Task[]{runningTask}));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotPurgeBeforeThePurgeInterval(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        long commitInterval = 1000L;
        long purgeInterval = 2000L;
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(1000L));
        props.setProperty("repartition.purge.interval.ms", Long.toString(2000L));
        StreamsConfig config = new StreamsConfig((Map)props);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = this.mockTaskManagerPurge();
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        this.mockTime.sleep(1990L);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        ((TaskManager)Mockito.verify((Object)taskManager)).maybePurgeCommittedRecords();
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldAlsoPurgeWhenNothingGetsCommitted(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        long purgeInterval = 1000L;
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(1000L));
        props.setProperty("repartition.purge.interval.ms", Long.toString(1000L));
        StreamsConfig config = new StreamsConfig((Map)props);
        Consumer consumer = (Consumer)Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskId taskId = new TaskId(0, 0);
        StreamTask runningTask = StreamsTestUtils.TaskBuilder.statelessTask(taskId).inState(Task.State.RUNNING).build();
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        Mockito.when((Object)taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, runningTask));
        Mockito.when((Object)taskManager.commit(Collections.singleton(runningTask))).thenReturn((Object)0);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread((Consumer<byte[], byte[]>)consumer, taskManager, config, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.mockTime.sleep(1010L);
        this.thread.maybeCommit();
        ((TaskManager)Mockito.verify((Object)taskManager)).maybePurgeCommittedRecords();
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotProcessWhenPartitionRevoked(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeFalse((boolean)processingThreadsEnabled);
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        StreamsConfig config = new StreamsConfig((Map)props);
        Mockito.when((Object)this.mainConsumer.poll((Duration)Mockito.any())).thenReturn((Object)ConsumerRecords.empty());
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.mainConsumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.mainConsumer, taskManager, config, topologyMetadata);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        this.thread.runOnceWithoutProcessingThreads();
        ((TaskManager)Mockito.verify((Object)taskManager, (VerificationMode)Mockito.never())).process(Mockito.anyInt(), (Time)Mockito.any());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldProcessWhenRunning(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeFalse((boolean)processingThreadsEnabled);
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        StreamsConfig config = new StreamsConfig((Map)props);
        Mockito.when((Object)this.mainConsumer.poll((Duration)Mockito.any())).thenReturn((Object)ConsumerRecords.empty());
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.mainConsumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.mainConsumer, taskManager, config, topologyMetadata);
        this.thread.updateThreadMetadata("admin");
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.thread.setState(StreamThread.State.RUNNING);
        this.runOnce(processingThreadsEnabled);
        ((TaskManager)Mockito.verify((Object)taskManager)).process(Mockito.anyInt(), (Time)Mockito.any());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldProcessWhenPartitionAssigned(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeTrue((boolean)stateUpdaterEnabled);
        Assumptions.assumeFalse((boolean)processingThreadsEnabled);
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        StreamsConfig config = new StreamsConfig((Map)props);
        Mockito.when((Object)this.mainConsumer.poll((Duration)Mockito.any())).thenReturn((Object)ConsumerRecords.empty());
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.mainConsumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.mainConsumer, taskManager, config, topologyMetadata);
        this.thread.updateThreadMetadata("admin");
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.runOnce(processingThreadsEnabled);
        ((TaskManager)Mockito.verify((Object)taskManager)).process(Mockito.anyInt(), (Time)Mockito.any());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldProcessWhenStarting(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeTrue((boolean)stateUpdaterEnabled);
        Assumptions.assumeFalse((boolean)processingThreadsEnabled);
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        props.setProperty("__state.updater.enabled__", Boolean.toString(true));
        StreamsConfig config = new StreamsConfig((Map)props);
        Mockito.when((Object)this.mainConsumer.poll((Duration)Mockito.any())).thenReturn((Object)ConsumerRecords.empty());
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.mainConsumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.mainConsumer, taskManager, config, topologyMetadata);
        this.thread.updateThreadMetadata("admin");
        this.thread.setState(StreamThread.State.STARTING);
        this.runOnce(processingThreadsEnabled);
        ((TaskManager)Mockito.verify((Object)taskManager)).process(Mockito.anyInt(), (Time)Mockito.any());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws InterruptedException {
        MockTime mockTime = new MockTime(1L);
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, config.getString("built.in.metrics.version"), (Time)mockTime);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.lenient().when((Object)this.consumer.poll((Duration)ArgumentMatchers.any())).thenReturn((Object)ConsumerRecords.empty());
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        MockConsumerClientSupplier mockClientSupplier = new MockConsumerClientSupplier(this.consumer);
        mockClientSupplier.setCluster(this.createCluster());
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.stateDirectory = new StateDirectory(config, (Time)mockTime, true, false);
        StreamsMetadataState streamsMetadataState = new StreamsMetadataState(new TopologyMetadata(this.internalTopologyBuilder, config), StreamsMetadataState.UNKNOWN_HOST, new LogContext(String.format("stream-client [%s] ", CLIENT_ID)));
        BiConsumer mockExceptionHandler = (BiConsumer)Mockito.mock(BiConsumer.class);
        this.thread = StreamThread.create((TopologyMetadata)topologyMetadata, (StreamsConfig)config, (KafkaClientSupplier)mockClientSupplier, (Admin)mockClientSupplier.getAdmin(config.getAdminConfigs(CLIENT_ID)), (UUID)PROCESS_ID, (String)CLIENT_ID, (StreamsMetricsImpl)streamsMetrics, (Time)mockTime, (StreamsMetadataState)streamsMetadataState, (long)0L, (StateDirectory)this.stateDirectory, (StateRestoreListener)new MockStateRestoreListener(), (StandbyUpdateListener)new MockStandbyUpdateListener(), (int)1, null, (BiConsumer)mockExceptionHandler);
        mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L);
        this.thread.start();
        TestUtils.waitForCondition(() -> this.thread.state() == StreamThread.State.STARTING, (long)10000L, (String)"Thread never started.");
        this.thread.shutdown();
        TestUtils.waitForCondition(() -> this.thread.state() == StreamThread.State.DEAD, (long)10000L, (String)"Thread never shut down.");
        ((Consumer)Mockito.verify(this.consumer, (VerificationMode)Mockito.atMostOnce())).enforceRebalance(ArgumentMatchers.anyString());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws InterruptedException {
        MockTime mockTime = new MockTime(1L);
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, config.getString("built.in.metrics.version"), (Time)mockTime);
        Mockito.lenient().when((Object)this.consumer.poll((Duration)ArgumentMatchers.any())).thenReturn((Object)ConsumerRecords.empty());
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        MockConsumerClientSupplier mockClientSupplier = new MockConsumerClientSupplier(this.consumer);
        mockClientSupplier.setCluster(this.createCluster());
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.stateDirectory = new StateDirectory(config, (Time)mockTime, true, false);
        StreamsMetadataState streamsMetadataState = new StreamsMetadataState(new TopologyMetadata(this.internalTopologyBuilder, config), StreamsMetadataState.UNKNOWN_HOST, new LogContext(String.format("stream-client [%s] ", CLIENT_ID)));
        this.thread = StreamThread.create((TopologyMetadata)topologyMetadata, (StreamsConfig)config, (KafkaClientSupplier)mockClientSupplier, (Admin)mockClientSupplier.getAdmin(config.getAdminConfigs(CLIENT_ID)), (UUID)PROCESS_ID, (String)CLIENT_ID, (StreamsMetricsImpl)streamsMetrics, (Time)mockTime, (StreamsMetadataState)streamsMetadataState, (long)0L, (StateDirectory)this.stateDirectory, (StateRestoreListener)new MockStateRestoreListener(), (StandbyUpdateListener)new MockStandbyUpdateListener(), (int)1, null, null);
        mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L);
        this.thread.taskManager().handleRebalanceStart(Collections.emptySet());
        this.thread.start();
        TestUtils.waitForCondition(() -> this.thread.state() == StreamThread.State.STARTING, (long)10000L, (String)"Thread never started.");
        TestUtils.retryOnExceptionWithTimeout(() -> {});
        this.thread.shutdown();
        MatcherAssert.assertThat((Object)mockClientSupplier.nextRebalanceMs().get(), (Matcher)CoreMatchers.not((Object)0L));
        this.thread.taskManager().handleRebalanceComplete();
        TestUtils.waitForCondition(() -> this.thread.state() == StreamThread.State.DEAD, (long)10000L, (String)"Thread never shut down.");
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeFalse((boolean)processingThreadsEnabled);
        LinkedList mockProcessors = new LinkedList();
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addProcessor("processor1", () -> {
            MockApiProcessor processor = new MockApiProcessor(PunctuationType.WALL_CLOCK_TIME, 10L);
            mockProcessors.add(processor);
            return processor;
        }, new String[]{"source1"});
        this.internalTopologyBuilder.addProcessor("processor2", () -> new MockApiProcessor(PunctuationType.STREAM_TIME, 10L), new String[]{"source1"});
        Properties properties = new Properties();
        properties.put("__state.updater.enabled__", (Object)stateUpdaterEnabled);
        properties.put("__processing.threads.enabled__", (Object)processingThreadsEnabled);
        properties.put("commit.interval.ms", (Object)100L);
        StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig(APPLICATION_ID, "localhost:2171", Serdes.ByteArraySerde.class.getName(), Serdes.ByteArraySerde.class.getName(), properties));
        this.thread = this.createStreamThread(CLIENT_ID, config);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        TaskId task1 = new TaskId(0, this.t1p1.partition());
        Set<TopicPartition> assignedPartitions = Collections.singleton(this.t1p1);
        this.thread.taskManager().handleAssignment(Collections.singletonMap(task1, assignedPartitions), Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(Collections.singleton(this.t1p1));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        long offset = -1L;
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 0L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 1L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)2));
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)2));
        this.mockTime.sleep(11L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)2));
        this.mockTime.sleep(11L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 5L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 5L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 6L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)3));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 11L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 12L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 13L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 14L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)3));
        mockProcessors.forEach(MockApiProcessor::requestCommit);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 15L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 15L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 16L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 17L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)3));
        this.mockTime.sleep(90L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)3));
        this.mockTime.sleep(90L);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 18L);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.currentNumIterations(), (Matcher)CoreMatchers.equalTo((Object)1));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCauseExceptionIfNothingCommitted(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        long commitInterval = 1000L;
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig config = new StreamsConfig((Map)props);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        Task runningTask = (Task)Mockito.mock(Task.class);
        TaskManager taskManager = this.mockTaskManagerCommit(runningTask, 0);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        this.mockTime.sleep(990L);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        ((TaskManager)Mockito.verify((Object)taskManager)).commit((Collection)Utils.mkSet((Object[])new Task[]{runningTask}));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldCommitAfterCommitInterval(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        long commitInterval = 100L;
        long commitLatency = 10L;
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(100L));
        StreamsConfig config = new StreamsConfig((Map)props);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        final AtomicBoolean committed = new AtomicBoolean(false);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        TaskManager taskManager = new TaskManager(null, null, null, null, null, null, (TasksRegistry)new Tasks(new LogContext()), topologyMetadata, null, null, null, null){

            int commit(Collection<Task> tasksToCommit) {
                committed.set(true);
                StreamThreadTest.this.mockTime.sleep(10L);
                return 1;
            }
        };
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        Assertions.assertTrue((boolean)committed.get());
        this.mockTime.sleep(100L);
        committed.set(false);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        Assertions.assertFalse((boolean)committed.get());
        this.mockTime.sleep(1L);
        committed.set(false);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        Assertions.assertTrue((boolean)committed.get());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldPurgeAfterPurgeInterval(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        long commitInterval = 100L;
        long purgeInterval = 200L;
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        props.setProperty("state.dir", this.stateDir);
        props.setProperty("commit.interval.ms", Long.toString(100L));
        props.setProperty("repartition.purge.interval.ms", Long.toString(200L));
        StreamsConfig config = new StreamsConfig((Map)props);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = this.mockTaskManagerPurge();
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        this.mockTime.sleep(201L);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        ((TaskManager)Mockito.verify((Object)taskManager, (VerificationMode)Mockito.times((int)2))).maybePurgeCommittedRecords();
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldRecordCommitLatency(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        DefaultTaskManager schedulingTaskManager;
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        Mockito.when((Object)this.consumer.poll((Duration)ArgumentMatchers.any())).thenReturn((Object)ConsumerRecords.empty());
        Task task = (Task)Mockito.mock(Task.class);
        ActiveTaskCreator activeTaskCreator = (ActiveTaskCreator)Mockito.mock(ActiveTaskCreator.class);
        Mockito.when((Object)activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.any())).thenReturn(Collections.singleton(task));
        Mockito.when((Object)activeTaskCreator.producerClientIds()).thenReturn(Collections.singleton("producerClientId"));
        StandbyTaskCreator standbyTaskCreator = (StandbyTaskCreator)Mockito.mock(StandbyTaskCreator.class);
        StateUpdater stateUpdater = (StateUpdater)Mockito.mock(StateUpdater.class);
        if (processingThreadsEnabled) {
            schedulingTaskManager = (DefaultTaskManager)Mockito.mock(DefaultTaskManager.class);
            KafkaFutureImpl future = new KafkaFutureImpl();
            future.complete(null);
        } else {
            schedulingTaskManager = null;
        }
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        TaskManager taskManager = new TaskManager(null, this.changelogReader, null, null, activeTaskCreator, standbyTaskCreator, (TasksRegistry)new Tasks(new LogContext()), topologyMetadata, null, null, stateUpdater, schedulingTaskManager){

            int commit(Collection<Task> tasksToCommit) {
                StreamThreadTest.this.mockTime.sleep(10L);
                return 1;
            }
        };
        taskManager.setMainConsumer(this.consumer);
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata);
        this.thread.updateThreadMetadata("adminClientId");
        this.thread.setState(StreamThread.State.STARTING);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        this.thread.rebalanceListener().onPartitionsAssigned(Collections.singleton(this.t1p1));
        Assertions.assertTrue((boolean)Double.isNaN((Double)((Metric)streamsMetrics.metrics().get(new MetricName("commit-latency-max", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue()));
        Assertions.assertTrue((boolean)Double.isNaN((Double)((Metric)streamsMetrics.metrics().get(new MetricName("commit-latency-avg", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue()));
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)((Metric)streamsMetrics.metrics().get(new MetricName("commit-latency-max", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)((Metric)streamsMetrics.metrics().get(new MetricName("commit-latency-avg", "stream-thread-metrics", "", Collections.singletonMap("thread-id", CLIENT_ID)))).metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        this.thread = this.createStreamThread(CLIENT_ID, config);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        assignedPartitions.add(this.t1p2);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        activeTasks.put(this.task2, Collections.singleton(this.t1p2));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(assignedPartitions);
        HashMap<TopicPartition, Long> beginOffsets = new HashMap<TopicPartition, Long>();
        beginOffsets.put(this.t1p1, 0L);
        beginOffsets.put(this.t1p2, 0L);
        mockConsumer.updateBeginningOffsets(beginOffsets);
        this.thread.rebalanceListener().onPartitionsAssigned(new HashSet(assignedPartitions));
        Assertions.assertEquals((int)1, (int)this.clientSupplier.producers.size());
        Producer globalProducer = (Producer)this.clientSupplier.producers.get(0);
        for (Task task : this.thread.readOnlyActiveTasks()) {
            Assertions.assertSame((Object)globalProducer, (Object)((RecordCollectorImpl)((StreamTask)task).recordCollector()).producer());
        }
        Assertions.assertSame(this.clientSupplier.consumer, (Object)this.thread.mainConsumer());
        Assertions.assertSame(this.clientSupplier.restoreConsumer, (Object)this.thread.restoreConsumer());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosAlphaEnabled(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Properties props = this.configProps(true, stateUpdaterEnabled, processingThreadsEnabled);
        props.put("processing.guarantee", "exactly_once");
        this.thread = this.createStreamThread(CLIENT_ID, new StreamsConfig((Map)props));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        assignedPartitions.add(this.t1p2);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        activeTasks.put(this.task2, Collections.singleton(this.t1p2));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(assignedPartitions);
        HashMap<TopicPartition, Long> beginOffsets = new HashMap<TopicPartition, Long>();
        beginOffsets.put(this.t1p1, 0L);
        beginOffsets.put(this.t1p2, 0L);
        mockConsumer.updateBeginningOffsets(beginOffsets);
        this.thread.rebalanceListener().onPartitionsAssigned(new HashSet(assignedPartitions));
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((int)this.thread.readOnlyActiveTasks().size(), (int)this.clientSupplier.producers.size());
        Assertions.assertSame(this.clientSupplier.consumer, (Object)this.thread.mainConsumer());
        Assertions.assertSame(this.clientSupplier.restoreConsumer, (Object)this.thread.restoreConsumer());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabled(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Properties props = this.configProps(true, stateUpdaterEnabled, processingThreadsEnabled);
        this.thread = this.createStreamThread(CLIENT_ID, new StreamsConfig((Map)props));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        assignedPartitions.add(this.t1p2);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        activeTasks.put(this.task2, Collections.singleton(this.t1p2));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(assignedPartitions);
        HashMap<TopicPartition, Long> beginOffsets = new HashMap<TopicPartition, Long>();
        beginOffsets.put(this.t1p1, 0L);
        beginOffsets.put(this.t1p2, 0L);
        mockConsumer.updateBeginningOffsets(beginOffsets);
        this.thread.rebalanceListener().onPartitionsAssigned(new HashSet(assignedPartitions));
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.clientSupplier.producers.size(), (Matcher)Matchers.is((Object)1));
        Assertions.assertSame(this.clientSupplier.consumer, (Object)this.thread.mainConsumer());
        Assertions.assertSame(this.clientSupplier.restoreConsumer, (Object)this.thread.restoreConsumer());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws InterruptedException {
        Assumptions.assumeFalse((boolean)stateUpdaterEnabled);
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Properties props = this.configProps(true, stateUpdaterEnabled, processingThreadsEnabled);
        this.thread = this.createStreamThread(CLIENT_ID, new StreamsConfig((Map)props), (Time)new MockTime(1L));
        this.thread.taskManager().handleRebalanceStart(Collections.singleton("topic1"));
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        assignedPartitions.add(this.t1p2);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        activeTasks.put(this.task2, Collections.singleton(this.t1p2));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        this.thread.start();
        TestUtils.waitForCondition(() -> this.thread.state() == StreamThread.State.STARTING, (long)10000L, (String)"Thread never started.");
        this.thread.shutdown();
        Assertions.assertFalse((boolean)this.thread.isRunning());
        Assertions.assertTrue((boolean)this.thread.isAlive());
        Thread.sleep(1000L);
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task1, this.task2}), (Object)this.thread.taskManager().activeTaskIds());
        Assertions.assertEquals((Object)StreamThread.State.PENDING_SHUTDOWN, (Object)this.thread.state());
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        TestUtils.waitForCondition(() -> this.thread.state() == StreamThread.State.DEAD, (long)10000L, (String)"Thread never shut down.");
        Assertions.assertEquals(Collections.emptySet(), (Object)this.thread.taskManager().activeTaskIds());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldShutdownTaskManagerOnClose(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata).updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        this.thread.setStateListener((t, newState, oldState) -> {
            if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
                this.thread.shutdown();
            }
        });
        this.thread.run();
        ((TaskManager)Mockito.verify((Object)taskManager)).shutdown(true);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotReturnDataAfterTaskMigrated(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        InternalTopologyBuilder internalTopologyBuilder = (InternalTopologyBuilder)Mockito.mock(InternalTopologyBuilder.class);
        Mockito.when((Object)internalTopologyBuilder.fullSourceTopicNames()).thenReturn(Collections.singletonList("topic1"));
        final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST);
        MockConsumer restoreConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        consumer.subscribe(Collections.singletonList("topic1"), (ConsumerRebalanceListener)new MockRebalanceListener());
        consumer.rebalance(Collections.singletonList(this.t1p1));
        consumer.updateEndOffsets(Collections.singletonMap(this.t1p1, 10L));
        consumer.seekToEnd(Collections.singletonList(this.t1p1));
        final TaskMigratedException taskMigratedException = new TaskMigratedException("Changelog restore found task migrated", (Throwable)new RuntimeException("restore task migrated"));
        ChangelogReader changelogReader = this.changelogReader;
        if (stateUpdaterEnabled) {
            Mockito.when((Object)taskManager.checkStateUpdater(ArgumentMatchers.anyLong(), (java.util.function.Consumer)ArgumentMatchers.any())).thenAnswer(answer -> {
                consumer.addRecord(new ConsumerRecord("topic1", 1, 11L, (Object)new byte[0], (Object)new byte[0]));
                consumer.addRecord(new ConsumerRecord("topic1", 1, 12L, (Object)new byte[1], (Object)new byte[0]));
                throw taskMigratedException;
            });
        } else {
            changelogReader = new MockChangelogReader(){

                @Override
                public long restore(Map<TaskId, Task> tasks) {
                    consumer.addRecord(new ConsumerRecord("topic1", 1, 11L, (Object)new byte[0], (Object)new byte[0]));
                    consumer.addRecord(new ConsumerRecord("topic1", 1, 12L, (Object)new byte[1], (Object)new byte[0]));
                    throw taskMigratedException;
                }
            };
        }
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        StreamsConfig config = new StreamsConfig((Map)props);
        this.thread = new StreamThread((Time)new MockTime(1L), config, null, (Consumer)consumer, (Consumer)restoreConsumer, changelogReader, null, taskManager, null, streamsMetrics, new TopologyMetadata(internalTopologyBuilder, config), CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null).updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        StreamsException thrown = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> ((StreamThread)this.thread).run());
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.isA(IllegalStateException.class));
        Assertions.assertEquals((Object)"No current assignment for partition topic1-1", (Object)thrown.getCause().getMessage());
        Assertions.assertFalse((boolean)consumer.shouldRebalance());
        ((TaskManager)Mockito.verify((Object)taskManager)).handleLostAll();
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldShutdownTaskManagerOnCloseWithoutStart(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata).updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        this.thread.shutdown();
        ((TaskManager)Mockito.verify((Object)taskManager)).shutdown(true);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldOnlyShutdownOnce(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata).updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        this.thread.shutdown();
        this.thread.run();
        ((TaskManager)Mockito.verify((Object)taskManager)).shutdown(true);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.internalTopologyBuilder.addSource(null, "name", null, null, null, new String[]{"topic"});
        this.internalTopologyBuilder.addSink("out", "output", null, null, null, new String[]{"name"});
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        this.thread = this.createStreamThread(CLIENT_ID, config);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
        HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        standbyTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(Collections.emptyMap(), standbyTasks);
        this.thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhileProcessing(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.internalTopologyBuilder.addSource(null, "source", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, new String[]{"source"});
        StreamsConfig config = new StreamsConfig((Map)this.configProps(true, stateUpdaterEnabled, processingThreadsEnabled));
        this.thread = this.createStreamThread(CLIENT_ID, config);
        MockConsumer<byte[], byte[]> consumer = this.clientSupplier.consumer;
        consumer.updatePartitions("topic1", Collections.singletonList(new PartitionInfo("topic1", 1, null, null, null)));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.readOnlyActiveTasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MockProducer<byte[], byte[]> producer = this.clientSupplier.producers.get(0);
        consumer.updateBeginningOffsets(Collections.singletonMap(assignedPartitions.iterator().next(), 0L));
        consumer.unsubscribe();
        consumer.assign(new HashSet(assignedPartitions));
        consumer.addRecord(new ConsumerRecord("topic1", 1, 0L, (Object)new byte[0], (Object)new byte[0]));
        if (processingThreadsEnabled) {
            Assertions.assertTrue((boolean)this.runUntilTimeoutOrCondition(() -> this.runOnce(processingThreadsEnabled), () -> !producer.history().isEmpty()));
        } else {
            this.mockTime.sleep(config.getLong("commit.interval.ms") + 1L);
            this.runOnce(processingThreadsEnabled);
            MatcherAssert.assertThat((Object)producer.history().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        }
        this.mockTime.sleep(config.getLong("commit.interval.ms") + 1L);
        TestUtils.waitForCondition(() -> producer.commitCount() == 1L, (String)"StreamsThread did not commit transaction.");
        producer.fenceProducer();
        this.mockTime.sleep(config.getLong("commit.interval.ms") + 1L);
        consumer.addRecord(new ConsumerRecord("topic1", 1, 1L, (Object)new byte[0], (Object)new byte[0]));
        Assertions.assertThrows(TaskMigratedException.class, () -> {
            if (processingThreadsEnabled) {
                this.runUntilTimeoutOrException(() -> this.runOnce(processingThreadsEnabled));
            } else {
                this.runOnce(processingThreadsEnabled);
            }
        });
        Assertions.assertTrue((boolean)this.thread.readOnlyActiveTasks().stream().anyMatch(task -> task.id().equals((Object)this.task1)), (String)"StreamsThread removed the fenced zombie task already, should wait for rebalance to close all zombies together.");
        MatcherAssert.assertThat((Object)producer.commitCount(), (Matcher)CoreMatchers.equalTo((Object)1L));
    }

    private void testThrowingDurringCommitTransactionException(RuntimeException e, boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws InterruptedException {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(true, stateUpdaterEnabled, processingThreadsEnabled));
        this.thread = this.createStreamThread(CLIENT_ID, config);
        this.internalTopologyBuilder.addSource(null, "name", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("out", "output", null, null, null, new String[]{"name"});
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.readOnlyActiveTasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, 0L);
        MockProducer<byte[], byte[]> producer = this.clientSupplier.producers.get(0);
        this.runOnce(processingThreadsEnabled);
        if (processingThreadsEnabled) {
            TestUtils.waitForCondition(() -> !producer.uncommittedRecords().isEmpty(), (String)"Processing threads to process record");
        }
        producer.commitTransactionException = e;
        Assertions.assertThrows(TaskMigratedException.class, () -> this.thread.rebalanceListener().onPartitionsRevoked((Collection)assignedPartitions));
        Assertions.assertFalse((boolean)producer.transactionCommitted());
        Assertions.assertFalse((boolean)producer.closed());
        Assertions.assertEquals((int)1, (int)this.thread.readOnlyActiveTasks().size());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.testThrowingDurringCommitTransactionException((RuntimeException)new ProducerFencedException("Producer is fenced"), stateUpdaterEnabled, processingThreadsEnabled);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfInvalidPidMappingOccurredInCommitTransactionWhenSuspendingTasks(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.testThrowingDurringCommitTransactionException((RuntimeException)new InvalidPidMappingException("PidMapping is invalid"), stateUpdaterEnabled, processingThreadsEnabled);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldReinitializeRevivedTasksInAnyState(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        this.thread = this.createStreamThread(CLIENT_ID, config, (Time)new MockTime(1L));
        String storeName = "store";
        String storeChangelog = "stream-thread-test-store-changelog";
        TopicPartition storeChangelogTopicPartition = new TopicPartition("stream-thread-test-store-changelog", 1);
        this.internalTopologyBuilder.addSource(null, "name", null, null, null, new String[]{"topic1"});
        AtomicBoolean shouldThrow = new AtomicBoolean(false);
        AtomicBoolean processed = new AtomicBoolean(false);
        this.internalTopologyBuilder.addProcessor("proc", () -> record -> {
            if (shouldThrow.get()) {
                throw new TaskCorruptedException(Collections.singleton(this.task1));
            }
            processed.set(true);
        }, new String[]{"name"});
        this.internalTopologyBuilder.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"store"), (Serde)Serdes.String(), (Serde)Serdes.String()), new String[]{"proc"});
        this.internalTopologyBuilder.buildTopology();
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.t1p1, (Object)0L)}));
        MockConsumer restoreConsumer = (MockConsumer)this.thread.restoreConsumer();
        restoreConsumer.updateBeginningOffsets(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)storeChangelogTopicPartition, (Object)0L)}));
        MockAdminClient admin = (MockAdminClient)this.thread.adminClient();
        admin.updateEndOffsets(Collections.singletonMap(storeChangelogTopicPartition, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.readOnlyActiveTasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.runOnce(processingThreadsEnabled);
        if (stateUpdaterEnabled) {
            TestUtils.waitForCondition(() -> this.thread.taskManager().checkStateUpdater(this.mockTime.milliseconds(), topicPartitions -> mockConsumer.seekToBeginning(Collections.singleton(this.t1p1))), (long)10000L, (String)"State updater never returned tasks.");
        }
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, 0L);
        shouldThrow.set(true);
        TaskCorruptedException taskCorruptedException = processingThreadsEnabled ? (TaskCorruptedException)Assertions.assertThrows(TaskCorruptedException.class, () -> this.runUntilTimeoutOrException(() -> this.runOnce(processingThreadsEnabled))) : (TaskCorruptedException)Assertions.assertThrows(TaskCorruptedException.class, () -> this.runOnce(processingThreadsEnabled));
        this.thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks());
        if (stateUpdaterEnabled) {
            TestUtils.waitForCondition(() -> this.thread.taskManager().checkStateUpdater(this.mockTime.milliseconds(), topicPartitions -> mockConsumer.seekToBeginning(Collections.singleton(this.t1p1))), (long)10000L, (String)"State updater never returned tasks.");
        }
        this.runOnce(processingThreadsEnabled);
        this.runOnce(processingThreadsEnabled);
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, 0L);
        shouldThrow.set(false);
        MatcherAssert.assertThat((Object)processed.get(), (Matcher)Matchers.is((Object)false));
        if (processingThreadsEnabled) {
            Assertions.assertTrue((boolean)this.runUntilTimeoutOrCondition(() -> this.runOnce(processingThreadsEnabled), processed::get));
        } else {
            this.runOnce(processingThreadsEnabled);
            this.runOnce(processingThreadsEnabled);
            MatcherAssert.assertThat((Object)processed.get(), (Matcher)Matchers.is((Object)true));
        }
    }

    private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(RuntimeException e, boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.internalTopologyBuilder.addSource(null, "source", null, null, null, new String[]{"topic1"});
        StreamsConfig config = new StreamsConfig((Map)this.configProps(true, stateUpdaterEnabled, processingThreadsEnabled));
        this.thread = this.createStreamThread(CLIENT_ID, config);
        MockConsumer<byte[], byte[]> consumer = this.clientSupplier.consumer;
        consumer.updatePartitions("topic1", Collections.singletonList(new PartitionInfo("topic1", 1, null, null, null)));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.readOnlyActiveTasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MockProducer<byte[], byte[]> producer = this.clientSupplier.producers.get(0);
        producer.commitTransactionException = e;
        this.mockTime.sleep(config.getLong("commit.interval.ms") + 1L);
        consumer.addRecord(new ConsumerRecord("topic1", 1, 1L, (Object)new byte[0], (Object)new byte[0]));
        Assertions.assertThrows(TaskMigratedException.class, () -> {
            if (processingThreadsEnabled) {
                this.runUntilTimeoutOrException(() -> this.runOnce(processingThreadsEnabled));
            } else {
                this.runOnce(processingThreadsEnabled);
            }
        });
        Assertions.assertTrue((boolean)this.thread.readOnlyActiveTasks().stream().anyMatch(task -> task.id().equals((Object)this.task1)), (String)"StreamsThread removed the fenced zombie task already, should wait for rebalance to close all zombies together.");
        MatcherAssert.assertThat((Object)producer.commitCount(), (Matcher)CoreMatchers.equalTo((Object)0L));
        Assertions.assertTrue((boolean)this.clientSupplier.producers.get(0).transactionInFlight());
        Assertions.assertFalse((boolean)this.clientSupplier.producers.get(0).transactionCommitted());
        Assertions.assertFalse((boolean)this.clientSupplier.producers.get(0).closed());
        Assertions.assertEquals((int)1, (int)this.thread.readOnlyActiveTasks().size());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting((RuntimeException)new ProducerFencedException("Producer is fenced"), stateUpdaterEnabled, processingThreadsEnabled);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting((RuntimeException)new InvalidPidMappingException("PID Mapping is invalid"), stateUpdaterEnabled, processingThreadsEnabled);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCloseTaskProducerWhenSuspending(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(true, stateUpdaterEnabled, processingThreadsEnabled));
        this.thread = this.createStreamThread(CLIENT_ID, config);
        this.internalTopologyBuilder.addSource(null, "name", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("out", "output", null, null, null, new String[]{"name"});
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        MatcherAssert.assertThat((Object)this.thread.readOnlyActiveTasks().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, 0L);
        MockProducer<byte[], byte[]> producer = this.clientSupplier.producers.get(0);
        if (processingThreadsEnabled) {
            Assertions.assertTrue((boolean)this.runUntilTimeoutOrCondition(() -> this.runOnce(processingThreadsEnabled), () -> !producer.history().isEmpty()));
        } else {
            this.runOnce(processingThreadsEnabled);
        }
        this.thread.rebalanceListener().onPartitionsRevoked(assignedPartitions);
        Assertions.assertTrue((boolean)producer.transactionCommitted());
        Assertions.assertTrue((boolean)producer.transactionCommitted());
        Assertions.assertFalse((boolean)producer.closed());
        Assertions.assertEquals((int)1, (int)this.thread.readOnlyActiveTasks().size());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldReturnActiveTaskMetadataWhileRunningState(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        this.internalTopologyBuilder.addSource(null, "source", null, null, null, new String[]{"topic1"});
        this.clientSupplier.setCluster(this.createCluster());
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, APPLICATION_ID, config.getString("built.in.metrics.version"), (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.stateDirectory = new StateDirectory(config, (Time)this.mockTime, true, false);
        StreamsMetadataState streamsMetadataState = new StreamsMetadataState(new TopologyMetadata(this.internalTopologyBuilder, config), StreamsMetadataState.UNKNOWN_HOST, new LogContext(String.format("stream-client [%s] ", CLIENT_ID)));
        this.thread = StreamThread.create((TopologyMetadata)topologyMetadata, (StreamsConfig)config, (KafkaClientSupplier)this.clientSupplier, (Admin)this.clientSupplier.getAdmin(config.getAdminConfigs(CLIENT_ID)), (UUID)PROCESS_ID, (String)CLIENT_ID, (StreamsMetricsImpl)streamsMetrics, (Time)this.mockTime, (StreamsMetadataState)streamsMetadataState, (long)0L, (StateDirectory)this.stateDirectory, (StateRestoreListener)new MockStateRestoreListener(), (StandbyUpdateListener)new MockStandbyUpdateListener(), (int)1, null, HANDLER);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(assignedPartitions);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        ThreadMetadata metadata = this.thread.threadMetadata();
        Assertions.assertEquals((Object)StreamThread.State.RUNNING.name(), (Object)metadata.threadState());
        Assertions.assertTrue((boolean)metadata.activeTasks().contains(new TaskMetadataImpl(this.task1, Utils.mkSet((Object[])new TopicPartition[]{this.t1p1}), new HashMap(), new HashMap(), Optional.empty())));
        Assertions.assertTrue((boolean)metadata.standbyTasks().isEmpty());
        Assertions.assertTrue((boolean)Arrays.asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState()), (String)("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED"));
        String threadName = metadata.threadName();
        MatcherAssert.assertThat((Object)threadName, (Matcher)CoreMatchers.startsWith((String)(CLIENT_ID + "-StreamThread-" + 1)));
        Assertions.assertEquals((Object)(threadName + "-consumer"), (Object)metadata.consumerClientId());
        Assertions.assertEquals((Object)(threadName + "-restore-consumer"), (Object)metadata.restoreConsumerClientId());
        Assertions.assertEquals(Collections.singleton(threadName + "-producer"), (Object)metadata.producerClientIds());
        Assertions.assertEquals((Object)(CLIENT_ID + "-admin"), (Object)metadata.adminClientId());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldReturnStandbyTaskMetadataWhileRunningState(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).groupByKey().count(Materialized.as((String)"count-one"));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        this.thread = this.createStreamThread(CLIENT_ID, config, (Time)new MockTime(1L));
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, Collections.singletonList(new PartitionInfo(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, 0, null, new Node[0], new Node[0])));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(new TopicPartition(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, 1), 0L);
        restoreConsumer.updateEndOffsets(offsets);
        restoreConsumer.updateBeginningOffsets(offsets);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        standbyTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(Collections.emptyMap(), standbyTasks);
        this.thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
        this.runOnce(processingThreadsEnabled);
        ThreadMetadata threadMetadata = this.thread.threadMetadata();
        Assertions.assertEquals((Object)StreamThread.State.RUNNING.name(), (Object)threadMetadata.threadState());
        Assertions.assertTrue((boolean)threadMetadata.standbyTasks().contains(new TaskMetadataImpl(this.task1, Utils.mkSet((Object[])new TopicPartition[]{this.t1p1}), new HashMap(), new HashMap(), Optional.empty())));
        Assertions.assertTrue((boolean)threadMetadata.activeTasks().isEmpty());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldUpdateStandbyTask(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        Assumptions.assumeFalse((boolean)stateUpdaterEnabled);
        String storeName1 = "count-one";
        String storeName2 = "table-two";
        String changelogName1 = STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG;
        String changelogName2 = STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG;
        Properties props = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        StreamsConfig config = new StreamsConfig((Map)props);
        this.thread = this.createStreamThread(CLIENT_ID, config);
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        this.setupThread("count-one", "table-two", STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG, restoreConsumer, false);
        this.runOnce(processingThreadsEnabled);
        StandbyTask standbyTask1 = this.standbyTask(this.thread.taskManager(), this.t1p1);
        StandbyTask standbyTask2 = this.standbyTask(this.thread.taskManager(), this.t2p1);
        Assertions.assertEquals((Object)this.task1, (Object)standbyTask1.id());
        Assertions.assertEquals((Object)this.task3, (Object)standbyTask2.id());
        KeyValueStore store1 = (KeyValueStore)standbyTask1.getStore("count-one");
        KeyValueStore store2 = (KeyValueStore)standbyTask2.getStore("table-two");
        Assertions.assertEquals((long)0L, (long)store1.approximateNumEntries());
        Assertions.assertEquals((long)0L, (long)store2.approximateNumEntries());
        this.addStandbyRecordsToRestoreConsumer(restoreConsumer);
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((long)10L, (long)store1.approximateNumEntries());
        Assertions.assertEquals((long)4L, (long)store2.approximateNumEntries());
    }

    private void addActiveRecordsToRestoreConsumer(MockConsumer<byte[], byte[]> restoreConsumer) {
        for (long i = 0L; i < 10L; ++i) {
            restoreConsumer.addRecord(new ConsumerRecord(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, 2, i, (Object)("K" + i).getBytes(), (Object)("V" + i).getBytes()));
        }
    }

    private void addStandbyRecordsToRestoreConsumer(MockConsumer<byte[], byte[]> restoreConsumer) {
        for (long i = 0L; i < 10L; ++i) {
            restoreConsumer.addRecord(new ConsumerRecord(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, 1, i, (Object)("K" + i).getBytes(), (Object)("V" + i).getBytes()));
            restoreConsumer.addRecord(new ConsumerRecord(STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG, 1, i, (Object)("K" + i).getBytes(), (Object)("V" + i).getBytes()));
        }
    }

    private void setupThread(String storeName1, String storeName2, String changelogName1, String changelogName2, MockConsumer<byte[], byte[]> restoreConsumer, boolean addActiveTask) throws IOException {
        TopicPartition activePartition = new TopicPartition(changelogName1, 2);
        TopicPartition partition1 = new TopicPartition(changelogName1, 1);
        TopicPartition partition2 = new TopicPartition(changelogName2, 1);
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).groupByKey().count(Materialized.as((String)storeName1));
        MaterializedInternal materialized = new MaterializedInternal(Materialized.as((String)storeName2), (InternalNameProvider)this.internalStreamsBuilder, "");
        this.internalStreamsBuilder.table("topic2", new ConsumedInternal(), materialized);
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        restoreConsumer.updatePartitions(changelogName1, Collections.singletonList(new PartitionInfo(changelogName1, 1, null, new Node[0], new Node[0])));
        restoreConsumer.updateEndOffsets(Collections.singletonMap(activePartition, 10L));
        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(activePartition, 0L));
        ((MockAdminClient)this.thread.adminClient()).updateBeginningOffsets(Collections.singletonMap(activePartition, 0L));
        ((MockAdminClient)this.thread.adminClient()).updateEndOffsets(Collections.singletonMap(activePartition, 10L));
        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L));
        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L));
        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L));
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.getOrCreateDirectoryForTask(this.task3), ".checkpoint"));
        checkpoint.write(Collections.singletonMap(partition2, 5L));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        if (addActiveTask) {
            activeTasks.put(this.task2, Collections.singleton(this.t1p2));
        }
        standbyTasks.put(this.task1, Collections.singleton(this.t1p1));
        standbyTasks.put(this.task3, Collections.singleton(this.t2p1));
        this.thread.taskManager().handleAssignment(activeTasks, standbyTasks);
        this.thread.taskManager().tryToCompleteRestoration(this.mockTime.milliseconds(), null);
        this.thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotUpdateStandbyTaskWhenPaused(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        Assumptions.assumeFalse((boolean)stateUpdaterEnabled);
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        String storeName1 = "count-one";
        String storeName2 = "table-two";
        String changelogName1 = STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG;
        String changelogName2 = STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG;
        this.thread = this.createStreamThread(CLIENT_ID, config);
        MockConsumer<byte[], byte[]> restoreConsumer = this.clientSupplier.restoreConsumer;
        this.setupThread("count-one", "table-two", STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG, restoreConsumer, true);
        this.runOnce(processingThreadsEnabled);
        StreamTask activeTask1 = this.activeTask(this.thread.taskManager(), this.t1p2);
        StandbyTask standbyTask1 = this.standbyTask(this.thread.taskManager(), this.t1p1);
        StandbyTask standbyTask2 = this.standbyTask(this.thread.taskManager(), this.t2p1);
        Assertions.assertEquals((Object)this.task1, (Object)standbyTask1.id());
        Assertions.assertEquals((Object)this.task3, (Object)standbyTask2.id());
        KeyValueStore activeStore = (KeyValueStore)activeTask1.getStore("count-one");
        KeyValueStore store1 = (KeyValueStore)standbyTask1.getStore("count-one");
        KeyValueStore store2 = (KeyValueStore)standbyTask2.getStore("table-two");
        Assertions.assertEquals((long)0L, (long)activeStore.approximateNumEntries());
        Assertions.assertEquals((long)0L, (long)store1.approximateNumEntries());
        Assertions.assertEquals((long)0L, (long)store2.approximateNumEntries());
        this.addActiveRecordsToRestoreConsumer(restoreConsumer);
        this.addStandbyRecordsToRestoreConsumer(restoreConsumer);
        this.thread.taskManager().topologyMetadata().pauseTopology("__UNNAMED_TOPOLOGY__");
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((long)0L, (long)activeStore.approximateNumEntries());
        Assertions.assertEquals((long)0L, (long)store1.approximateNumEntries());
        Assertions.assertEquals((long)0L, (long)store2.approximateNumEntries());
        this.thread.taskManager().topologyMetadata().resumeTopology("__UNNAMED_TOPOLOGY__");
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((long)10L, (long)activeStore.approximateNumEntries());
        Assertions.assertEquals((long)0L, (long)store1.approximateNumEntries());
        Assertions.assertEquals((long)0L, (long)store2.approximateNumEntries());
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((long)10L, (long)activeStore.approximateNumEntries());
        Assertions.assertEquals((long)10L, (long)store1.approximateNumEntries());
        Assertions.assertEquals((long)4L, (long)store2.approximateNumEntries());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldCreateStandbyTask(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        this.setupInternalTopologyWithoutState(config);
        this.internalTopologyBuilder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("myStore", true), new String[]{"processor1"});
        MatcherAssert.assertThat(this.createStandbyTask(config), (Matcher)CoreMatchers.not((Matcher)Matchers.empty()));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCreateStandbyTaskWithoutStateStores(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        this.setupInternalTopologyWithoutState(config);
        MatcherAssert.assertThat(this.createStandbyTask(config), (Matcher)Matchers.empty());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        this.setupInternalTopologyWithoutState(config);
        MockKeyValueStoreBuilder storeBuilder = new MockKeyValueStoreBuilder("myStore", true);
        storeBuilder.withLoggingDisabled();
        this.internalTopologyBuilder.addStateStore((StoreBuilder)storeBuilder, new String[]{"processor1"});
        MatcherAssert.assertThat(this.createStandbyTask(config), (Matcher)Matchers.empty());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldPunctuateActiveTask(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeFalse((boolean)processingThreadsEnabled);
        final ArrayList punctuatedStreamTime = new ArrayList();
        final ArrayList punctuatedWallClockTime = new ArrayList();
        ProcessorSupplier punctuateProcessor = () -> new ContextualProcessor<Object, Object, Void, Void>(){

            public void init(org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> context) {
                context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
                context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add);
            }

            public void process(Record<Object, Object> record) {
            }
        };
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).process(punctuateProcessor, new String[0]);
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        this.thread = this.createStreamThread(CLIENT_ID, config);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        this.clientSupplier.consumer.assign(assignedPartitions);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((int)0, (int)punctuatedStreamTime.size());
        Assertions.assertEquals((int)0, (int)punctuatedWallClockTime.size());
        this.mockTime.sleep(100L);
        this.clientSupplier.consumer.addRecord(new ConsumerRecord("topic1", 1, 100L, 100L, TimestampType.CREATE_TIME, "K".getBytes().length, "V".getBytes().length, (Object)"K".getBytes(), (Object)"V".getBytes(), (Headers)new RecordHeaders(), Optional.empty()));
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((int)1, (int)punctuatedStreamTime.size());
        Assertions.assertEquals((int)1, (int)punctuatedWallClockTime.size());
        this.mockTime.sleep(100L);
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((int)1, (int)punctuatedStreamTime.size());
        Assertions.assertEquals((int)2, (int)punctuatedWallClockTime.size());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldPunctuateWithTimestampPreservedInProcessorContext(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeFalse((boolean)processingThreadsEnabled);
        TransformerSupplier punctuateProcessor = () -> new Transformer<Object, Object, KeyValue<Object, Object>>(){

            public void init(ProcessorContext context) {
                context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward((Object)"key", (Object)"value"));
                context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, timestamp -> context.forward((Object)"key", (Object)"value"));
            }

            public KeyValue<Object, Object> transform(Object key, Object value) {
                return null;
            }

            public void close() {
            }
        };
        ArrayList peekedContextTime = new ArrayList();
        ProcessorSupplier peekProcessor = () -> record -> peekedContextTime.add(record.timestamp());
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).transform(punctuateProcessor, new String[0]).process(peekProcessor, new String[0]);
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        long currTime = this.mockTime.milliseconds();
        this.thread = this.createStreamThread(CLIENT_ID, stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        assignedPartitions.add(this.t1p1);
        activeTasks.put(this.task1, Collections.singleton(this.t1p1));
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        this.clientSupplier.consumer.assign(assignedPartitions);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((int)0, (int)peekedContextTime.size());
        this.mockTime.sleep(100L);
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((int)1, (int)peekedContextTime.size());
        Assertions.assertEquals((long)(currTime + 100L), (long)((Long)peekedContextTime.get(0)));
        this.clientSupplier.consumer.addRecord(new ConsumerRecord("topic1", 1, 110L, 110L, TimestampType.CREATE_TIME, "K".getBytes().length, "V".getBytes().length, (Object)"K".getBytes(), (Object)"V".getBytes(), (Headers)new RecordHeaders(), Optional.empty()));
        this.runOnce(processingThreadsEnabled);
        Assertions.assertEquals((int)2, (int)peekedContextTime.size());
        Assertions.assertEquals((long)110L, (long)((Long)peekedContextTime.get(1)));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldAlwaysUpdateTasksMetadataAfterChangingState(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        this.thread = this.createStreamThread(CLIENT_ID, config);
        ThreadMetadata metadata = this.thread.threadMetadata();
        Assertions.assertEquals((Object)StreamThread.State.CREATED.name(), (Object)metadata.threadState());
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.thread.setState(StreamThread.State.RUNNING);
        metadata = this.thread.threadMetadata();
        Assertions.assertEquals((Object)StreamThread.State.RUNNING.name(), (Object)metadata.threadState());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.internalStreamsBuilder.stream(Collections.singleton("topic"), this.consumed).groupByKey().count(Materialized.as((String)"count"));
        this.internalStreamsBuilder.buildAndOptimizeTopology();
        this.thread = this.createStreamThread("clientId", (Time)new MockTime(1L), stateUpdaterEnabled, processingThreadsEnabled);
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        MockConsumer mockRestoreConsumer = (MockConsumer)this.thread.restoreConsumer();
        MockAdminClient mockAdminClient = (MockAdminClient)this.thread.adminClient();
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Set<TopicPartition> topicPartitionSet = Collections.singleton(topicPartition);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        TaskId task0 = new TaskId(0, 0);
        activeTasks.put(task0, topicPartitionSet);
        this.thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
        mockConsumer.updatePartitions("topic", Collections.singletonList(new PartitionInfo("topic", 0, null, new Node[0], new Node[0])));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        mockConsumer.subscribe((Collection)Utils.mkSet((Object[])new String[]{topicPartition.topic()}));
        mockConsumer.rebalance(Collections.singleton(topicPartition));
        mockRestoreConsumer.updatePartitions("stream-thread-test-count-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-changelog", 0, null, new Node[0], new Node[0])));
        TopicPartition changelogPartition = new TopicPartition("stream-thread-test-count-changelog", 0);
        final Set<TopicPartition> changelogPartitionSet = Collections.singleton(changelogPartition);
        mockRestoreConsumer.updateBeginningOffsets(Collections.singletonMap(changelogPartition, 0L));
        mockAdminClient.updateEndOffsets(Collections.singletonMap(changelogPartition, 2L));
        mockConsumer.schedulePollTask(() -> {
            this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
            this.thread.rebalanceListener().onPartitionsAssigned((Collection)topicPartitionSet);
        });
        this.thread.start();
        TestUtils.waitForCondition(() -> mockRestoreConsumer.assignment().size() == 1, (String)"Never get the assignment");
        mockRestoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 0L, (Object)"K1".getBytes(), (Object)"V1".getBytes()));
        TestUtils.waitForCondition(() -> mockRestoreConsumer.position(changelogPartition) == 1L, (String)"Never restore first record");
        mockRestoreConsumer.setPollException((KafkaException)new InvalidOffsetException("Try Again!"){

            public Set<TopicPartition> partitions() {
                return changelogPartitionSet;
            }
        });
        TestUtils.waitForCondition(() -> mockRestoreConsumer.assignment().size() == 1, (String)"Never get the assignment");
        TestUtils.waitForCondition(() -> mockRestoreConsumer.position(changelogPartition) == 0L, (String)"Never restore first record");
        mockRestoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 0L, (Object)"K1".getBytes(), (Object)"V1".getBytes()));
        mockRestoreConsumer.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 1L, (Object)"K2".getBytes(), (Object)"V2".getBytes()));
        if (stateUpdaterEnabled) {
            TestUtils.waitForCondition(() -> mockRestoreConsumer.assignment().size() == 0, (String)"Never get the assignment");
        } else {
            TestUtils.waitForCondition(() -> {
                mockRestoreConsumer.assign((Collection)changelogPartitionSet);
                return mockRestoreConsumer.position(changelogPartition) == 2L;
            }, (String)"Never finished restore");
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldLogAndRecordSkippedMetricForDeserializationException(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Properties properties = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        properties.setProperty("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class.getName());
        properties.setProperty("default.value.serde", Serdes.Integer().getClass().getName());
        this.thread = this.createStreamThread(CLIENT_ID, new StreamsConfig((Map)properties));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        TaskId task1 = new TaskId(0, this.t1p1.partition());
        Set<TopicPartition> assignedPartitions = Collections.singleton(this.t1p1);
        this.thread.taskManager().handleAssignment(Collections.singletonMap(task1, assignedPartitions), Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(Collections.singleton(this.t1p1));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        long offset = -1L;
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), ++offset, -1L, TimestampType.CREATE_TIME, -1, -1, (Object)new byte[0], (Object)"I am not an integer.".getBytes(), (Headers)new RecordHeaders(), Optional.empty()));
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), ++offset, -1L, TimestampType.CREATE_TIME, -1, -1, (Object)new byte[0], (Object)"I am not an integer.".getBytes(), (Headers)new RecordHeaders(), Optional.empty()));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RecordDeserializer.class);){
            this.runOnce(processingThreadsEnabled);
            List strings = appender.getMessages();
            Assertions.assertTrue((boolean)strings.contains("stream-thread [" + Thread.currentThread().getName() + "] task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[0]"));
            Assertions.assertTrue((boolean)strings.contains("stream-thread [" + Thread.currentThread().getName() + "] task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[1]"));
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldThrowTaskMigratedExceptionHandlingTaskLost(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        Set<TopicPartition> assignedPartitions = Collections.singleton(this.t1p1);
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST);
        consumer.assign(assignedPartitions);
        consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        consumer.updateEndOffsets(Collections.singletonMap(this.t1p1, 10L));
        ((TaskManager)Mockito.doThrow((Throwable[])new Throwable[]{new TaskMigratedException("Task lost exception", (Throwable)new RuntimeException())}).when((Object)taskManager)).handleLostAll();
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread((Consumer<byte[], byte[]>)consumer, taskManager, config, topologyMetadata).updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        consumer.schedulePollTask(() -> {
            this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
            this.thread.rebalanceListener().onPartitionsLost((Collection)assignedPartitions);
        });
        this.thread.setState(StreamThread.State.STARTING);
        Assertions.assertThrows(TaskMigratedException.class, () -> this.runOnce(processingThreadsEnabled));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldThrowTaskMigratedExceptionHandlingRevocation(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        Set<TopicPartition> assignedPartitions = Collections.singleton(this.t1p1);
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST);
        consumer.assign(assignedPartitions);
        consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        consumer.updateEndOffsets(Collections.singletonMap(this.t1p1, 10L));
        ((TaskManager)Mockito.doThrow((Throwable[])new Throwable[]{new TaskMigratedException("Revocation non fatal exception", (Throwable)new RuntimeException())}).when((Object)taskManager)).handleRevocation(assignedPartitions);
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread((Consumer<byte[], byte[]>)consumer, taskManager, config, topologyMetadata).updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        consumer.schedulePollTask(() -> {
            this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
            this.thread.rebalanceListener().onPartitionsRevoked((Collection)assignedPartitions);
        });
        this.thread.setState(StreamThread.State.STARTING);
        Assertions.assertThrows(TaskMigratedException.class, () -> this.runOnce(processingThreadsEnabled));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        Consumer consumer = (Consumer)Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskId taskId1 = new TaskId(0, 0);
        final Set<TaskId> corruptedTasks = Collections.singleton(taskId1);
        Mockito.when((Object)taskManager.handleCorruption(corruptedTasks)).thenReturn((Object)true);
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread((Time)this.mockTime, config, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetrics, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null){

            void runOnceWithProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(corruptedTasks);
            }

            void runOnceWithoutProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(corruptedTasks);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        this.thread.run();
        ((Consumer)Mockito.verify((Object)consumer)).subscribe((Collection)ArgumentMatchers.any(), (ConsumerRebalanceListener)ArgumentMatchers.any());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        Consumer consumer = (Consumer)Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskId taskId1 = new TaskId(0, 0);
        final Set<TaskId> corruptedTasks = Collections.singleton(taskId1);
        ((TaskManager)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException()}).when((Object)taskManager)).handleCorruption(corruptedTasks);
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread((Time)this.mockTime, config, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetrics, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null){

            void runOnceWithProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(corruptedTasks);
            }

            void runOnceWithoutProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(corruptedTasks);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        AtomicBoolean exceptionHandlerInvoked = new AtomicBoolean(false);
        this.thread.setStreamsUncaughtExceptionHandler((e, b) -> exceptionHandlerInvoked.set(true));
        this.thread.run();
        MatcherAssert.assertThat((Object)exceptionHandlerInvoked.get(), (Matcher)Matchers.is((Object)true));
        ((Consumer)Mockito.verify((Object)consumer)).subscribe((Collection)ArgumentMatchers.any(), (ConsumerRebalanceListener)ArgumentMatchers.any());
        ((Consumer)Mockito.verify((Object)consumer)).unsubscribe();
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        Consumer consumer = (Consumer)Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskId taskId1 = new TaskId(0, 0);
        final Set<TaskId> corruptedTasks = Collections.singleton(taskId1);
        ((TaskManager)Mockito.doThrow((Throwable[])new Throwable[]{new TaskMigratedException("Task migrated", (Throwable)new RuntimeException("non-corrupted task migrated"))}).when((Object)taskManager)).handleCorruption(corruptedTasks);
        ((TaskManager)Mockito.doNothing().when((Object)taskManager)).handleLostAll();
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread((Time)this.mockTime, config, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetrics, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null){

            void runOnceWithProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(corruptedTasks);
            }

            void runOnceWithoutProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(corruptedTasks);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.runLoop();
        ((Consumer)Mockito.verify((Object)consumer, (VerificationMode)Mockito.times((int)2))).subscribe((Collection)ArgumentMatchers.any(), (ConsumerRebalanceListener)ArgumentMatchers.any());
        ((Consumer)Mockito.verify((Object)consumer)).unsubscribe();
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(true, stateUpdaterEnabled, processingThreadsEnabled));
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        Consumer consumer = (Consumer)Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskId taskId1 = new TaskId(0, 0);
        final Set<TaskId> corruptedTasks = Collections.singleton(taskId1);
        Mockito.when((Object)taskManager.handleCorruption(corruptedTasks)).thenReturn((Object)true);
        ((Consumer)Mockito.doNothing().when((Object)consumer)).enforceRebalance("Active tasks corrupted");
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread((Time)this.mockTime, config, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetrics, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null){

            void runOnceWithProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(corruptedTasks);
            }

            void runOnceWithoutProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(corruptedTasks);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.runLoop();
        ((Consumer)Mockito.verify((Object)consumer)).subscribe((Collection)ArgumentMatchers.any(), (ConsumerRebalanceListener)ArgumentMatchers.any());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(true, stateUpdaterEnabled, processingThreadsEnabled));
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        Consumer consumer = (Consumer)Mockito.mock(Consumer.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskId taskId1 = new TaskId(0, 0);
        final Set<TaskId> corruptedTasks = Collections.singleton(taskId1);
        Mockito.when((Object)taskManager.handleCorruption(corruptedTasks)).thenReturn((Object)false);
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread((Time)this.mockTime, config, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetrics, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null){

            void runOnceWithProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(corruptedTasks);
            }

            void runOnceWithoutProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                throw new TaskCorruptedException(corruptedTasks);
            }
        }.updateThreadMetadata(ClientUtils.getSharedAdminClientId((String)CLIENT_ID));
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.runLoop();
        ((Consumer)Mockito.verify((Object)consumer)).subscribe((Collection)ArgumentMatchers.any(), (ConsumerRebalanceListener)ArgumentMatchers.any());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotCommitNonRunningNonRestoringTasks(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        Task task1 = (Task)Mockito.mock(Task.class);
        Task task2 = (Task)Mockito.mock(Task.class);
        Task task3 = (Task)Mockito.mock(Task.class);
        TaskId taskId1 = new TaskId(0, 1);
        TaskId taskId2 = new TaskId(0, 2);
        TaskId taskId3 = new TaskId(0, 3);
        Mockito.when((Object)task1.state()).thenReturn((Object)Task.State.RUNNING);
        Mockito.when((Object)task2.state()).thenReturn((Object)Task.State.RESTORING);
        Mockito.when((Object)task3.state()).thenReturn((Object)Task.State.CREATED);
        Mockito.when((Object)taskManager.allOwnedTasks()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)taskId1, (Object)task1), Utils.mkEntry((Object)taskId2, (Object)task2), Utils.mkEntry((Object)taskId3, (Object)task3)}));
        Mockito.when((Object)taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task1, task2}))).thenReturn((Object)2);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata);
        this.thread.setNow(this.mockTime.milliseconds());
        this.thread.maybeCommit();
        ((TaskManager)Mockito.verify((Object)taskManager)).commit((Collection)Utils.mkSet((Object[])new Task[]{task1, task2}));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Properties properties = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        properties.setProperty("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class.getName());
        StreamsConfig config = new StreamsConfig((Map)properties);
        this.thread = this.createStreamThread(CLIENT_ID, config);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_REVOKED);
        TaskId task1 = new TaskId(0, this.t1p1.partition());
        Set<TopicPartition> assignedPartitions = Collections.singleton(this.t1p1);
        this.thread.taskManager().handleAssignment(Collections.singletonMap(task1, assignedPartitions), Collections.emptyMap());
        MockConsumer mockConsumer = (MockConsumer)this.thread.mainConsumer();
        mockConsumer.assign(Collections.singleton(this.t1p1));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        this.thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        this.runOnce(processingThreadsEnabled);
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RecordQueue.class);){
            long offset = -1L;
            this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
            this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
            this.runOnce(processingThreadsEnabled);
            this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
            this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
            this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
            this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset);
            this.runOnce(processingThreadsEnabled);
            this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 1L);
            this.addRecord((MockConsumer<byte[], byte[]>)mockConsumer, ++offset, 1L);
            this.runOnce(processingThreadsEnabled);
            List strings = appender.getMessages();
            String threadTaskPrefix = "stream-thread [" + Thread.currentThread().getName() + "] task [0_1] ";
            Assertions.assertTrue((boolean)strings.contains(threadTaskPrefix + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[0] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            Assertions.assertTrue((boolean)strings.contains(threadTaskPrefix + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[1] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            Assertions.assertTrue((boolean)strings.contains(threadTaskPrefix + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[2] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            Assertions.assertTrue((boolean)strings.contains(threadTaskPrefix + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[3] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            Assertions.assertTrue((boolean)strings.contains(threadTaskPrefix + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[4] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
            Assertions.assertTrue((boolean)strings.contains(threadTaskPrefix + "Skipping record due to negative extracted timestamp. topic=[topic1] partition=[1] offset=[5] extractedTimestamp=[-1] extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"));
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldTransmitTaskManagerMetrics(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap());
        KafkaMetric testMetric = new KafkaMetric(new Object(), testMetricName, (MetricValueProvider)((Measurable)(c, now) -> 0.0), null, (Time)new MockTime());
        Map<MetricName, KafkaMetric> dummyProducerMetrics = Collections.singletonMap(testMetricName, testMetric);
        Mockito.when((Object)taskManager.producerMetrics()).thenReturn(dummyProducerMetrics);
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = this.buildStreamThread(this.consumer, taskManager, config, topologyMetadata);
        MatcherAssert.assertThat(dummyProducerMetrics, (Matcher)Matchers.is((Object)this.thread.producerMetrics()));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldConstructAdminMetrics(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        Node broker1 = new Node(0, "dummyHost-1", 1234);
        Node broker2 = new Node(1, "dummyHost-2", 1234);
        List<Node> cluster = Arrays.asList(broker1, broker2);
        MockAdminClient adminClient = new MockAdminClient.Builder().brokers(cluster).clusterId(null).build();
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread((Time)this.mockTime, config, (Admin)adminClient, this.consumer, this.consumer, this.changelogReader, null, taskManager, null, streamsMetrics, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null);
        MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap());
        KafkaMetric testMetric = new KafkaMetric(new Object(), testMetricName, (MetricValueProvider)((Measurable)(c, now) -> 0.0), null, (Time)new MockTime());
        adminClient.setMockMetrics(testMetricName, (Metric)testMetric);
        Map adminClientMetrics = this.thread.adminClientMetrics();
        Assertions.assertEquals((Object)testMetricName, (Object)((Metric)adminClientMetrics.get(testMetricName)).metricName());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotRecordFailedStreamThread(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.runAndVerifyFailedStreamThreadRecording(false, stateUpdaterEnabled, processingThreadsEnabled);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldRecordFailedStreamThread(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.runAndVerifyFailedStreamThreadRecording(true, stateUpdaterEnabled, processingThreadsEnabled);
    }

    public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        StreamsConfig config = new StreamsConfig((Map)this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        this.thread = new StreamThread((Time)this.mockTime, config, null, this.consumer, this.consumer, this.changelogReader, null, taskManager, null, streamsMetrics, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, (e, b) -> {}, null){

            void runOnceWithProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                if (shouldFail) {
                    throw new StreamsException(Thread.currentThread().getName());
                }
            }

            void runOnceWithoutProcessingThreads() {
                this.setState(StreamThread.State.PENDING_SHUTDOWN);
                if (shouldFail) {
                    throw new StreamsException(Thread.currentThread().getName());
                }
            }
        };
        this.thread.updateThreadMetadata("metadata");
        this.thread.run();
        Metric failedThreads = StreamsTestUtils.getMetricByName(this.metrics.metrics(), "failed-stream-threads", "stream-metrics");
        MatcherAssert.assertThat((Object)failedThreads.metricValue(), (Matcher)Matchers.is((Object)(shouldFail ? 1.0 : 0.0)));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldCheckStateUpdater(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeTrue((boolean)stateUpdaterEnabled);
        Properties streamsConfigProps = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        this.thread = this.setUpThread(streamsConfigProps);
        TaskManager taskManager = this.thread.taskManager();
        this.thread.setState(StreamThread.State.STARTING);
        this.runOnce(processingThreadsEnabled);
        ((TaskManager)Mockito.verify((Object)taskManager)).checkStateUpdater(ArgumentMatchers.anyLong(), (java.util.function.Consumer)Mockito.any());
        if (!processingThreadsEnabled) {
            ((TaskManager)Mockito.verify((Object)taskManager)).process(Mockito.anyInt(), (Time)Mockito.any());
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldCheckStateUpdaterInBetweenProcessCalls(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeTrue((boolean)stateUpdaterEnabled);
        Assumptions.assumeFalse((boolean)processingThreadsEnabled);
        Properties streamsConfigProps = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        this.thread = this.setUpThread(streamsConfigProps);
        TaskManager taskManager = this.thread.taskManager();
        this.thread.setState(StreamThread.State.STARTING);
        Mockito.when((Object)taskManager.process(Mockito.anyInt(), (Time)Mockito.any())).thenReturn((Object)1).thenReturn((Object)0);
        this.runOnce(processingThreadsEnabled);
        ((TaskManager)Mockito.verify((Object)taskManager, (VerificationMode)Mockito.times((int)2))).checkStateUpdater(ArgumentMatchers.anyLong(), (java.util.function.Consumer)Mockito.any());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldUpdateLagsAfterPolling(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Properties streamsConfigProps = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        this.thread = this.setUpThread(streamsConfigProps);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.thread.updateThreadMetadata("metadata");
        this.thread.setState(StreamThread.State.RUNNING);
        this.runOnce(processingThreadsEnabled);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.mainConsumer, this.thread.taskManager()});
        ((Consumer)inOrder.verify(this.mainConsumer)).poll((Duration)Mockito.any());
        ((TaskManager)inOrder.verify((Object)this.thread.taskManager())).updateLags();
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Properties streamsConfigProps = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        this.thread = this.setUpThread(streamsConfigProps);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.runOnce(processingThreadsEnabled);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.thread.taskManager(), this.mainConsumer});
        ((TaskManager)inOrder.verify((Object)this.thread.taskManager())).resumePollingForPartitionsWithAvailableSpace();
        ((Consumer)inOrder.verify(this.mainConsumer)).poll((Duration)Mockito.any());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeTrue((boolean)stateUpdaterEnabled);
        Properties streamsConfigProps = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        StreamsConfig config = new StreamsConfig((Map)streamsConfigProps);
        Duration pollTime = Duration.ofMillis(config.getLong("poll.ms"));
        this.thread = this.setUpThread(streamsConfigProps);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.runOnce(processingThreadsEnabled);
        ((Consumer)Mockito.verify(this.mainConsumer)).poll(pollTime);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        Assumptions.assumeFalse((boolean)stateUpdaterEnabled);
        Properties streamsConfigProps = this.configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
        this.thread = this.setUpThread(streamsConfigProps);
        this.thread.setState(StreamThread.State.STARTING);
        this.thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.runOnce(processingThreadsEnabled);
        ((Consumer)Mockito.verify(this.mainConsumer)).poll(Duration.ZERO);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldGetMainAndRestoreConsumerInstanceId(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.getMainAndRestoreConsumerInstanceId(false, stateUpdaterEnabled, processingThreadsEnabled);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.getMainAndRestoreConsumerInstanceId(true, stateUpdaterEnabled, processingThreadsEnabled);
    }

    private void getMainAndRestoreConsumerInstanceId(boolean injectTimeException, boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        Uuid consumerInstanceId = Uuid.randomUuid();
        this.clientSupplier.consumer.setClientInstanceId(consumerInstanceId);
        if (injectTimeException) {
            this.clientSupplier.consumer.injectTimeoutException(1);
        }
        Uuid restoreInstanceId = Uuid.randomUuid();
        this.clientSupplier.restoreConsumer.setClientInstanceId(restoreInstanceId);
        if (injectTimeException) {
            this.clientSupplier.restoreConsumer.injectTimeoutException(1);
        }
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerInstanceIdFutures = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture mainConsumerFuture = (KafkaFuture)consumerInstanceIdFutures.get("clientId-StreamThread-1-consumer");
        Uuid mainConsumerUuid = (Uuid)mainConsumerFuture.get();
        MatcherAssert.assertThat((Object)mainConsumerUuid, (Matcher)CoreMatchers.equalTo((Object)consumerInstanceId));
        KafkaFuture restoreConsumerFuture = (KafkaFuture)consumerInstanceIdFutures.get("clientId-StreamThread-1-restore-consumer");
        Uuid restoreConsumerUuid = (Uuid)restoreConsumerFuture.get();
        MatcherAssert.assertThat((Object)restoreConsumerUuid, (Matcher)CoreMatchers.equalTo((Object)restoreInstanceId));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldGetProducerInstanceId(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.getProducerInstanceId(false, stateUpdaterEnabled, processingThreadsEnabled);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldProducerInstanceIdAndInternalTimeout(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.getProducerInstanceId(true, stateUpdaterEnabled, processingThreadsEnabled);
    }

    private void getProducerInstanceId(boolean injectTimeException, boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        Uuid producerInstanceId = Uuid.randomUuid();
        MockProducer producer = new MockProducer();
        producer.setClientInstanceId(producerInstanceId);
        if (injectTimeException) {
            producer.injectTimeoutException(1);
        }
        this.clientSupplier.prepareProducer((MockProducer<byte[], byte[]>)producer);
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        KafkaFuture producerInstanceIdFutures = this.thread.producersClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture producerFuture = (KafkaFuture)((Map)producerInstanceIdFutures.get()).get("clientId-StreamThread-1-producer");
        Uuid producerUuid = (Uuid)producerFuture.get();
        MatcherAssert.assertThat((Object)producerUuid, (Matcher)CoreMatchers.equalTo((Object)producerInstanceId));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerFutures = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture future = (KafkaFuture)consumerFutures.get("clientId-StreamThread-1-consumer");
        ExecutionException error = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> ((KafkaFuture)future).get());
        MatcherAssert.assertThat((Object)error.getCause(), (Matcher)IsInstanceOf.instanceOf(UnsupportedOperationException.class));
        MatcherAssert.assertThat((Object)error.getCause().getMessage(), (Matcher)CoreMatchers.equalTo((Object)"clientInstanceId not set"));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerFutures = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture future = (KafkaFuture)consumerFutures.get("clientId-StreamThread-1-restore-consumer");
        ExecutionException error = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> ((KafkaFuture)future).get());
        MatcherAssert.assertThat((Object)error.getCause(), (Matcher)IsInstanceOf.instanceOf(UnsupportedOperationException.class));
        MatcherAssert.assertThat((Object)error.getCause().getMessage(), (Matcher)CoreMatchers.equalTo((Object)"clientInstanceId not set"));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        Map producerFutures = (Map)this.thread.producersClientInstanceIds(Duration.ZERO).get();
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture future = (KafkaFuture)producerFutures.get("clientId-StreamThread-1-producer");
        ExecutionException error = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> ((KafkaFuture)future).get());
        MatcherAssert.assertThat((Object)error.getCause(), (Matcher)IsInstanceOf.instanceOf(UnsupportedOperationException.class));
        MatcherAssert.assertThat((Object)error.getCause().getMessage(), (Matcher)CoreMatchers.equalTo((Object)"clientInstanceId not set"));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldReturnNullIfMainConsumerTelemetryDisabled(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.clientSupplier.consumer.disableTelemetry();
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerFutures = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture future = (KafkaFuture)consumerFutures.get("clientId-StreamThread-1-consumer");
        Uuid clientInstanceId = (Uuid)future.get();
        MatcherAssert.assertThat((Object)clientInstanceId, (Matcher)CoreMatchers.equalTo(null));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        this.clientSupplier.restoreConsumer.disableTelemetry();
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerFutures = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture future = (KafkaFuture)consumerFutures.get("clientId-StreamThread-1-restore-consumer");
        Uuid clientInstanceId = (Uuid)future.get();
        MatcherAssert.assertThat((Object)clientInstanceId, (Matcher)CoreMatchers.equalTo(null));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldReturnNullIfProducerTelemetryDisabled(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        MockProducer producer = new MockProducer();
        producer.disableTelemetry();
        this.clientSupplier.prepareProducer((MockProducer<byte[], byte[]>)producer);
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        Map producerFutures = (Map)this.thread.producersClientInstanceIds(Duration.ZERO).get();
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture future = (KafkaFuture)producerFutures.get("clientId-StreamThread-1-producer");
        Uuid clientInstanceId = (Uuid)future.get();
        MatcherAssert.assertThat((Object)clientInstanceId, (Matcher)CoreMatchers.equalTo(null));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldTimeOutOnMainConsumerInstanceId(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.clientSupplier.consumer.setClientInstanceId(Uuid.randomUuid());
        this.clientSupplier.consumer.injectTimeoutException(-1);
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerFutures = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.mockTime.sleep(1L);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture future = (KafkaFuture)consumerFutures.get("clientId-StreamThread-1-consumer");
        ExecutionException error = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> ((KafkaFuture)future).get());
        MatcherAssert.assertThat((Object)error.getCause(), (Matcher)IsInstanceOf.instanceOf(TimeoutException.class));
        MatcherAssert.assertThat((Object)error.getCause().getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Could not retrieve main consumer client instance id."));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldTimeOutOnRestoreConsumerInstanceId(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.clientSupplier.restoreConsumer.setClientInstanceId(Uuid.randomUuid());
        this.clientSupplier.restoreConsumer.injectTimeoutException(-1);
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        Map consumerFutures = this.thread.consumerClientInstanceIds(Duration.ZERO);
        this.mockTime.sleep(1L);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture future = (KafkaFuture)consumerFutures.get("clientId-StreamThread-1-restore-consumer");
        ExecutionException error = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> ((KafkaFuture)future).get());
        MatcherAssert.assertThat((Object)error.getCause(), (Matcher)IsInstanceOf.instanceOf(TimeoutException.class));
        MatcherAssert.assertThat((Object)error.getCause().getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Could not retrieve restore consumer client instance id."));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void shouldTimeOutOnProducerInstanceId(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws Exception {
        MockProducer producer = new MockProducer();
        producer.setClientInstanceId(Uuid.randomUuid());
        producer.injectTimeoutException(-1);
        this.clientSupplier.prepareProducer((MockProducer<byte[], byte[]>)producer);
        this.thread = this.createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
        this.thread.setState(StreamThread.State.STARTING);
        Map producerFutures = (Map)this.thread.producersClientInstanceIds(Duration.ZERO).get();
        this.mockTime.sleep(1L);
        this.thread.maybeGetClientInstanceIds();
        KafkaFuture future = (KafkaFuture)producerFutures.get("clientId-StreamThread-1-producer");
        ExecutionException error = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> ((KafkaFuture)future).get());
        MatcherAssert.assertThat((Object)error.getCause(), (Matcher)IsInstanceOf.instanceOf(TimeoutException.class));
        MatcherAssert.assertThat((Object)error.getCause().getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Could not retrieve thread producer client instance id."));
    }

    private StreamThread setUpThread(Properties streamsConfigProps) {
        StreamsConfig config = new StreamsConfig((Map)streamsConfigProps);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
        Mockito.when((Object)this.mainConsumer.poll((Duration)Mockito.any(Duration.class))).thenReturn((Object)new ConsumerRecords(Collections.emptyMap()));
        Mockito.when((Object)this.mainConsumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        TopologyMetadata topologyMetadata = new TopologyMetadata(this.internalTopologyBuilder, config);
        topologyMetadata.buildAndRewriteTopology();
        return new StreamThread((Time)this.mockTime, new StreamsConfig(streamsConfigProps.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), null, this.mainConsumer, null, this.changelogReader, "", taskManager, null, new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime), topologyMetadata, "thread-id", new LogContext(), null, null, new LinkedList(), null, null, null);
    }

    private TaskManager mockTaskManager(Task runningTask) {
        TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        TaskId taskId = new TaskId(0, 0);
        Mockito.when((Object)runningTask.state()).thenReturn((Object)Task.State.RUNNING);
        Mockito.when((Object)taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, runningTask));
        return taskManager;
    }

    private TaskManager mockTaskManagerPurge() {
        Task runningTask = (Task)Mockito.mock(Task.class);
        return this.mockTaskManager(runningTask);
    }

    private TaskManager mockTaskManagerCommit(Task runningTask, int commits) {
        TaskManager taskManager = this.mockTaskManager(runningTask);
        Mockito.when((Object)taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{runningTask}))).thenReturn((Object)commits);
        return taskManager;
    }

    private void setupInternalTopologyWithoutState(StreamsConfig config) {
        this.stateDirectory = new StateDirectory(config, (Time)this.mockTime, true, false);
        this.internalTopologyBuilder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.internalTopologyBuilder.addProcessor("processor1", MockApiProcessor::new, new String[]{"source1"});
        this.internalTopologyBuilder.setStreamsConfig(config);
    }

    private Collection<Task> createStandbyTask(StreamsConfig config) {
        LogContext logContext = new LogContext("test");
        Logger log = logContext.logger(StreamThreadTest.class);
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(new TopologyMetadata(this.internalTopologyBuilder, config), config, streamsMetrics, this.stateDirectory, (ChangelogReader)new MockChangelogReader(), CLIENT_ID, log, false);
        return standbyTaskCreator.createTasks(Collections.singletonMap(new TaskId(1, 2), Collections.emptySet()));
    }

    private void addRecord(MockConsumer<byte[], byte[]> mockConsumer, long offset) {
        this.addRecord(mockConsumer, offset, -1L);
    }

    private void addRecord(MockConsumer<byte[], byte[]> mockConsumer, long offset, long timestamp) {
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), offset, timestamp, TimestampType.CREATE_TIME, -1, -1, (Object)new byte[0], (Object)new byte[0], (Headers)new RecordHeaders(), Optional.empty()));
    }

    StreamTask activeTask(TaskManager taskManager, TopicPartition partition) {
        Stream<Task> standbys = taskManager.allTasks().values().stream().filter(Task::isActive);
        for (Task task : standbys::iterator) {
            if (!task.inputPartitions().contains(partition)) continue;
            return (StreamTask)task;
        }
        return null;
    }

    StandbyTask standbyTask(TaskManager taskManager, TopicPartition partition) {
        Stream standbys = taskManager.standbyTaskMap().values().stream();
        for (Task task : standbys::iterator) {
            if (!task.inputPartitions().contains(partition)) continue;
            return (StandbyTask)task;
        }
        return null;
    }

    private StreamThread buildStreamThread(Consumer<byte[], byte[]> consumer, TaskManager taskManager, StreamsConfig config, TopologyMetadata topologyMetadata) {
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, "latest", (Time)this.mockTime);
        return new StreamThread((Time)this.mockTime, config, null, consumer, consumer, this.changelogReader, null, taskManager, null, streamsMetrics, topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), new AtomicLong(Long.MAX_VALUE), new LinkedList(), null, HANDLER, null);
    }

    private void runOnce(boolean processingThreadsEnabled) {
        if (processingThreadsEnabled) {
            this.thread.runOnceWithProcessingThreads();
        } else {
            this.thread.runOnceWithoutProcessingThreads();
        }
    }

    private void runUntilTimeoutOrException(Runnable action) {
        long expectedEnd = System.currentTimeMillis() + 15000L;
        while (System.currentTimeMillis() < expectedEnd) {
            action.run();
            this.mockTime.sleep(10L);
        }
    }

    private boolean runUntilTimeoutOrCondition(Runnable action, TestCondition testCondition) throws Exception {
        long expectedEnd = System.currentTimeMillis() + 15000L;
        while (System.currentTimeMillis() < expectedEnd) {
            action.run();
            if (testCondition.conditionMet()) {
                return true;
            }
            this.mockTime.sleep(10L);
        }
        return false;
    }

    private static class MockConsumerClientSupplier
    extends MockClientSupplier {
        final Consumer<byte[], byte[]> mockConsumer;
        final Map<String, Object> consumerConfigs = new HashMap<String, Object>();

        MockConsumerClientSupplier(Consumer<byte[], byte[]> mockConsumer) {
            this.mockConsumer = mockConsumer;
        }

        @Override
        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
            this.consumerConfigs.putAll(config);
            return this.mockConsumer;
        }

        AtomicLong nextRebalanceMs() {
            return ((ReferenceContainer)this.consumerConfigs.get((Object)"__reference.container.instance__")).nextScheduledRebalanceMs;
        }
    }

    private static class StateListenerStub
    implements StreamThread.StateListener {
        int numChanges = 0;
        ThreadStateTransitionValidator oldState = null;
        ThreadStateTransitionValidator newState = null;

        private StateListenerStub() {
        }

        public void onChange(Thread thread, ThreadStateTransitionValidator newState, ThreadStateTransitionValidator oldState) {
            ++this.numChanges;
            if (this.newState != null && !this.newState.equals(oldState)) {
                throw new RuntimeException("State mismatch " + oldState + " different from " + this.newState);
            }
            this.oldState = oldState;
            this.newState = newState;
        }
    }
}

