/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ActiveTaskCreator;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StandbyTaskCreator;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;

@RunWith(value=EasyMockRunner.class)
public class TaskManagerTest {
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TaskId taskId00 = new TaskId(0, 0);
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p0changelog = new TopicPartition("changelog", 0);
    private final Set<TopicPartition> taskId00Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0});
    private final Set<TopicPartition> taskId00ChangelogPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0changelog});
    private final Map<TaskId, Set<TopicPartition>> taskId00Assignment = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
    private final TaskId taskId01 = new TaskId(0, 1);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p1changelog = new TopicPartition("changelog", 1);
    private final Set<TopicPartition> taskId01Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p1});
    private final Set<TopicPartition> taskId01ChangelogPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p1changelog});
    private final Map<TaskId, Set<TopicPartition>> taskId01Assignment = Collections.singletonMap(this.taskId01, this.taskId01Partitions);
    private final TaskId taskId02 = new TaskId(0, 2);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final Set<TopicPartition> taskId02Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p2});
    private final TaskId taskId03 = new TaskId(0, 3);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final Set<TopicPartition> taskId03Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p3});
    private final TaskId taskId04 = new TaskId(0, 4);
    private final TopicPartition t1p4 = new TopicPartition("topic1", 4);
    private final Set<TopicPartition> taskId04Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p4});
    private final TaskId taskId05 = new TaskId(0, 5);
    private final TopicPartition t1p5 = new TopicPartition("topic1", 5);
    private final Set<TopicPartition> taskId05Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p5});
    private final TaskId taskId10 = new TaskId(1, 0);
    private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private final Set<TopicPartition> taskId10Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t2p0});
    @Mock(type=MockType.STRICT)
    private InternalTopologyBuilder topologyBuilder;
    @Mock(type=MockType.DEFAULT)
    private StateDirectory stateDirectory;
    @Mock(type=MockType.NICE)
    private ChangelogReader changeLogReader;
    @Mock(type=MockType.STRICT)
    private Consumer<byte[], byte[]> consumer;
    @Mock(type=MockType.STRICT)
    private ActiveTaskCreator activeTaskCreator;
    @Mock(type=MockType.NICE)
    private StandbyTaskCreator standbyTaskCreator;
    @Mock(type=MockType.NICE)
    private Admin adminClient;
    private TaskManager taskManager;
    private final Time time = new MockTime();
    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder();

    @Before
    public void setUp() {
        this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE);
    }

    private void setUpTaskManager(StreamsConfigUtils.ProcessingMode processingMode) {
        this.taskManager = new TaskManager(this.time, this.changeLogReader, UUID.randomUUID(), "taskManagerTest", new StreamsMetricsImpl(new Metrics(), "clientId", "latest", this.time), this.activeTaskCreator, this.standbyTaskCreator, new TopologyMetadata(this.topologyBuilder, (StreamsConfig)new DummyStreamsConfig(processingMode)), this.adminClient, this.stateDirectory);
        this.taskManager.setMainConsumer(this.consumer);
        EasyMock.reset((Object[])new Object[]{this.topologyBuilder});
        EasyMock.expect((Object)this.topologyBuilder.hasNamedTopology()).andStubReturn((Object)false);
        this.activeTaskCreator.removeRevokedUnknownTasks((Set)EasyMock.anyObject());
        EasyMock.expectLastCall().asStub();
        this.standbyTaskCreator.removeRevokedUnknownTasks((Set)EasyMock.anyObject());
        EasyMock.expectLastCall().asStub();
    }

    @Test
    public void shouldIdempotentlyUpdateSubscriptionFromActiveAssignment() {
        TopicPartition newTopicPartition = new TopicPartition("topic2", 1);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId01, (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, newTopicPartition}))});
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignment))).andStubReturn(Collections.emptyList());
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.eq(Arrays.asList(this.t1p1, newTopicPartition)), EasyMock.anyString());
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder});
    }

    @Test
    public void shouldNotLockAnythingIfStateDirIsEmpty() {
        EasyMock.expect((Object)this.stateDirectory.listNonEmptyTaskDirectories()).andReturn(new ArrayList()).once();
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        EasyMock.verify((Object[])new Object[]{this.stateDirectory});
        Assert.assertTrue((boolean)this.taskManager.lockedTaskDirectories().isEmpty());
    }

    @Test
    public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
        this.expectLockObtainedFor(this.taskId01);
        this.expectLockFailedFor(this.taskId10);
        this.makeTaskFolders(this.taskId01.toString(), this.taskId10.toString(), "dummy");
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        EasyMock.verify((Object[])new Object[]{this.stateDirectory});
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is(Collections.singleton(this.taskId01)));
    }

    @Test
    public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
        this.expectLockObtainedFor(this.taskId00, this.taskId01, this.taskId02);
        this.expectUnlockFor(this.taskId02);
        this.makeTaskFolders(this.taskId00.toString(), this.taskId01.toString(), this.taskId02.toString());
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01, this.taskId02})));
        this.handleAssignment(this.taskId00Assignment, this.taskId01Assignment, Collections.emptyMap());
        EasyMock.reset((Object[])new Object[]{this.consumer});
        TaskManagerTest.expectConsumerAssignmentPaused(this.consumer);
        EasyMock.replay((Object[])new Object[]{this.consumer});
        this.taskManager.handleRebalanceComplete();
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01})));
        EasyMock.verify((Object[])new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)-2L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)-2L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)-2L)});
        this.computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
    }

    @Test
    public void shouldComputeOffsetSumForNonRunningActiveTask() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)5L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)15L)});
        this.computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
    }

    @Test
    public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)-4L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)10L)});
        this.computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
    }

    private void computeOffsetSumAndVerify(Map<TopicPartition, Long> changelogOffsets, Map<TaskId, Long> expectedOffsetSums) throws Exception {
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        StateMachineTask restoringTask = this.handleAssignment(Collections.emptyMap(), Collections.emptyMap(), this.taskId00Assignment).get(this.taskId00);
        restoringTask.setChangelogOffsets(changelogOffsets);
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is(expectedOffsetSums));
    }

    @Test
    public void shouldComputeOffsetSumForStandbyTask() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)5L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)15L)});
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        StateMachineTask restoringTask = this.handleAssignment(Collections.emptyMap(), this.taskId00Assignment, Collections.emptyMap()).get(this.taskId00);
        restoringTask.setChangelogOffsets(changelogOffsets);
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)expectedOffsetSums));
    }

    @Test
    public void shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)5L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)15L)});
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.writeCheckpointFile(this.taskId00, changelogOffsets);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)expectedOffsetSums));
    }

    @Test
    public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)5L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)15L)});
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.writeCheckpointFile(this.taskId00, changelogOffsets);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        StateMachineTask uninitializedTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singleton(uninitializedTask));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)uninitializedTask.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)expectedOffsetSums));
    }

    @Test
    public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)5L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)15L)});
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.writeCheckpointFile(this.taskId00, changelogOffsets);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        StateMachineTask closedTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singleton(closedTask));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        closedTask.suspend();
        closedTask.closeClean();
        MatcherAssert.assertThat((Object)closedTask.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)expectedOffsetSums));
    }

    @Test
    public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception {
        this.expectLockFailedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        Assert.assertTrue((boolean)this.taskManager.lockedTaskDirectories().isEmpty());
        Assert.assertTrue((boolean)this.taskManager.getTaskOffsetSums().isEmpty());
    }

    @Test
    public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheckpoint() throws Exception {
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        EasyMock.expect((Object)this.stateDirectory.checkpointFileFor(this.taskId00)).andReturn((Object)this.getCheckpointFile(this.taskId00));
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        Assert.assertTrue((boolean)this.taskManager.getTaskOffsetSums().isEmpty());
        EasyMock.verify((Object[])new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception {
        long largeOffset = 0x3FFFFFFFFFFFFFFFL;
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)0x3FFFFFFFFFFFFFFFL), Utils.mkEntry((Object)new TopicPartition("changelog", 2), (Object)0x3FFFFFFFFFFFFFFFL), Utils.mkEntry((Object)new TopicPartition("changelog", 3), (Object)0x3FFFFFFFFFFFFFFFL)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)Long.MAX_VALUE)});
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.writeCheckpointFile(this.taskId00, changelogOffsets);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)expectedOffsetSums));
    }

    @Test
    public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(Collections.emptyMap()))).andStubReturn(Collections.emptyList());
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.anyObject())).andStubReturn(Collections.emptyList());
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall();
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
    }

    @Test
    public void shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public void closeClean() {
                throw new RuntimeException("KABOOM!");
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(Collections.emptyMap()))).andStubReturn(Collections.emptyList());
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.anyObject())).andStubReturn(Collections.emptyList());
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        this.taskManager.handleRevocation(this.taskId00Partitions);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Unexpected failure to close 1 task(s) [[0_0]]. First unexpected exception (for task 0_0) follows."));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(this.taskId01Assignment))).andStubReturn(Collections.singletonList(task01));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        this.makeTaskFolders(this.taskId00.toString(), this.taskId01.toString());
        this.expectLockObtainedFor(this.taskId00, this.taskId01);
        this.makeTaskFolders(new String[0]);
        this.expectLockObtainedFor(new TaskId[0]);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01})));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleLostAll();
        MatcherAssert.assertThat((Object)task00.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.is(Collections.singletonMap(this.taskId01, task01)));
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01})));
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is(Collections.emptySet()));
    }

    @Test
    public void shouldReInitializeThreadProducerOnHandleLostAllIfEosV2Enabled() {
        this.activeTaskCreator.reInitializeThreadProducer();
        EasyMock.expectLastCall();
        this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2);
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator});
        this.taskManager.handleLostAll();
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.anyObject())).andStubReturn(Collections.emptyList());
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("KABOOM!"));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Unexpected failure to close 1 task(s) [[0_0]]. First unexpected exception (for task 0_0) follows."));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(RuntimeException.class));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldReviveCorruptTasks() {
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.createStrictMock(ProcessorStateManager.class);
        stateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.expectLastCall().once();
        EasyMock.replay((Object[])new Object[]{stateManager});
        final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager){

            @Override
            public void postCommit(boolean enforceCheckpoint) {
                if (enforceCheckpoint) {
                    enforcedCheckpoint.set(true);
                }
                super.postCommit(enforceCheckpoint);
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect((Object)this.consumer.assignment()).andReturn(this.taskId00Partitions);
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), tp -> MatcherAssert.assertThat((Object)tp, (Matcher)Matchers.is((Matcher)Matchers.empty()))), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)task00.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task00.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(this.taskId00Partitions));
        MatcherAssert.assertThat((Object)enforcedCheckpoint.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.is(Collections.singletonMap(this.taskId00, task00)));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{stateManager});
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.createStrictMock(ProcessorStateManager.class);
        stateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.replay((Object[])new Object[]{stateManager});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect((Object)this.consumer.assignment()).andReturn(this.taskId00Partitions);
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), tp -> MatcherAssert.assertThat((Object)tp, (Matcher)Matchers.is((Matcher)Matchers.empty()))), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)task00.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task00.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(this.taskId00Partitions));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.is(Collections.singletonMap(this.taskId00, task00)));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{stateManager});
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.createStrictMock(ProcessorStateManager.class);
        stateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.replay((Object[])new Object[]{stateManager});
        StateMachineTask corruptedTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager);
        StateMachineTask nonCorruptedTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>(this.taskId00Assignment);
        assignment.putAll(this.taskId01Assignment);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(assignment))).andStubReturn(Arrays.asList(new Task[]{corruptedTask, nonCorruptedTask}));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.consumer.assignment()).andReturn(this.taskId00Partitions);
        this.consumer.commitSync((Map)EasyMock.eq(Collections.emptyMap()));
        EasyMock.expectLastCall().andStubThrow((Throwable)((Object)new AssertionError((Object)"should not invoke commitSync when offset map is empty")));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), tp -> MatcherAssert.assertThat((Object)tp, (Matcher)Matchers.is((Matcher)Matchers.empty()))), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)nonCorruptedTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        nonCorruptedTask.setCommitNeeded();
        corruptedTask.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        Assert.assertTrue((boolean)nonCorruptedTask.commitPrepared);
        MatcherAssert.assertThat((Object)nonCorruptedTask.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(Collections.emptySet()));
        MatcherAssert.assertThat((Object)corruptedTask.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(this.taskId00Partitions));
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldNotCommitNonRunningNonCorruptedTasks() {
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.createStrictMock(ProcessorStateManager.class);
        stateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.replay((Object[])new Object[]{stateManager});
        StateMachineTask corruptedTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager);
        StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager);
        nonRunningNonCorruptedTask.setCommitNeeded();
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>(this.taskId00Assignment);
        assignment.putAll(this.taskId01Assignment);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(assignment))).andStubReturn(Arrays.asList(new Task[]{corruptedTask, nonRunningNonCorruptedTask}));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect((Object)this.consumer.assignment()).andReturn(this.taskId00Partitions);
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        corruptedTask.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)nonRunningNonCorruptedTask.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)nonRunningNonCorruptedTask.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(Collections.emptySet()));
        MatcherAssert.assertThat((Object)corruptedTask.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(this.taskId00Partitions));
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator});
        Assert.assertFalse((boolean)nonRunningNonCorruptedTask.commitPrepared);
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.createStrictMock(ProcessorStateManager.class);
        stateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.replay((Object[])new Object[]{stateManager});
        StateMachineTask corruptedStandby = new StateMachineTask(this.taskId00, this.taskId00Partitions, false, stateManager);
        StateMachineTask runningNonCorruptedActive = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager){

            @Override
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new TaskMigratedException("You dropped out of the group!", (Throwable)new RuntimeException());
            }
        };
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singleton(corruptedStandby));
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId01Assignment))).andStubReturn(Collections.singleton(runningNonCorruptedActive));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId01Assignment, this.taskId00Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)runningNonCorruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)corruptedStandby.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        runningNonCorruptedActive.setCommitNeeded();
        corruptedStandby.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        Assert.assertThrows(TaskMigratedException.class, () -> this.taskManager.handleCorruption(Collections.singleton(this.taskId00)));
        MatcherAssert.assertThat((Object)corruptedStandby.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedStandby.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.createNiceMock(ProcessorStateManager.class);
        EasyMock.expect((Object)this.stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new ArrayList());
        StateMachineTask corruptedActive = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager);
        StateMachineTask uncorruptedActive = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        uncorruptedActive.setCommitNeeded();
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.putAll(this.taskId00Assignment);
        assignment.putAll(this.taskId01Assignment);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(assignment))).andStubReturn(Arrays.asList(new Task[]{corruptedActive, uncorruptedActive}));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        this.topologyBuilder.addSubscribedTopicsFromMetadata((Set)EasyMock.eq(Collections.singleton("topic1")), (String)EasyMock.anyObject());
        EasyMock.expectLastCall().anyTimes();
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.consumer.assignment()).andStubReturn((Object)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader, this.stateDirectory, stateManager});
        uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitCompleted, (Matcher)Matchers.is((Object)false));
        this.taskManager.handleRebalanceStart(Collections.singleton("topic1"));
        MatcherAssert.assertThat((Object)this.taskManager.isRebalanceInProgress(), (Matcher)Matchers.is((Object)true));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitCompleted, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.createStrictMock(ProcessorStateManager.class);
        stateManager.markChangelogAsCorrupted(this.taskId00Partitions);
        EasyMock.replay((Object[])new Object[]{stateManager});
        StateMachineTask corruptedActive = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager);
        StateMachineTask uncorruptedActive = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager){

            public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
                Assert.fail((String)"Should not try to mark changelogs as corrupted for uncorrupted task");
            }
        };
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.putAll(this.taskId00Assignment);
        assignment.putAll(this.taskId01Assignment);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(assignment))).andStubReturn(Arrays.asList(new Task[]{corruptedActive, uncorruptedActive}));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall().andThrow((Throwable)new TimeoutException());
        EasyMock.expect((Object)this.consumer.assignment()).andStubReturn((Object)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)corruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        uncorruptedActive.setCommitNeeded();
        corruptedActive.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitCompleted, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActive.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActive.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActive.commitCompleted, (Matcher)Matchers.is((Object)false));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitCompleted, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActive.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedActive.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActive.commitCompleted, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)uncorruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS() {
        this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2);
        StreamsProducer producer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        EasyMock.expect((Object)this.activeTaskCreator.threadProducer()).andStubReturn((Object)producer);
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.createMock(ProcessorStateManager.class);
        final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
        StateMachineTask corruptedActiveTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager){

            public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
                super.markChangelogAsCorrupted(partitions);
                corruptedTaskChangelogMarkedAsCorrupted.set(true);
            }
        };
        stateManager.markChangelogAsCorrupted(this.taskId00ChangelogPartitions);
        final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
        StateMachineTask uncorruptedActiveTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager){

            public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
                super.markChangelogAsCorrupted(partitions);
                uncorruptedTaskChangelogMarkedAsCorrupted.set(true);
            }
        };
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets);
        stateManager.markChangelogAsCorrupted(this.taskId01ChangelogPartitions);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.putAll(this.taskId00Assignment);
        assignment.putAll(this.taskId01Assignment);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(assignment))).andStubReturn(Arrays.asList(new Task[]{corruptedActiveTask, uncorruptedActiveTask}));
        this.topologyBuilder.addSubscribedTopicsFromAssignment((List)EasyMock.anyObject(), EasyMock.anyString());
        EasyMock.expectLastCall().anyTimes();
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
        EasyMock.expect((Object)this.consumer.groupMetadata()).andReturn((Object)groupMetadata);
        producer.commitTransaction(offsets, groupMetadata);
        EasyMock.expectLastCall().andThrow((Throwable)new TimeoutException());
        EasyMock.expect((Object)this.consumer.assignment()).andStubReturn((Object)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.topologyBuilder, this.consumer, this.changeLogReader, stateManager, producer});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)corruptedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        uncorruptedActiveTask.setCommitNeeded();
        Map<TopicPartition, Long> corruptedActiveTaskChangelogOffsets = Collections.singletonMap(this.t1p0changelog, 0L);
        corruptedActiveTask.setChangelogOffsets(corruptedActiveTaskChangelogOffsets);
        Map<TopicPartition, Long> uncorruptedActiveTaskChangelogOffsets = Collections.singletonMap(this.t1p1changelog, 0L);
        uncorruptedActiveTask.setChangelogOffsets(uncorruptedActiveTaskChangelogOffsets);
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitCompleted, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitCompleted, (Matcher)Matchers.is((Object)false));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitCompleted, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitCompleted, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)corruptedTaskChangelogMarkedAsCorrupted.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedTaskChangelogMarkedAsCorrupted.get(), (Matcher)Matchers.is((Object)true));
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithALOS() {
        StateMachineTask revokedActiveTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        revokedActiveTask.setCommittableOffsetsAndMetadata(offsets00);
        revokedActiveTask.setCommitNeeded();
        StateMachineTask unrevokedActiveTaskWithCommitNeeded = new StateMachineTask(this.taskId01, this.taskId01Partitions, true){

            public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
                Assert.fail((String)"Should not try to mark changelogs as corrupted for uncorrupted task");
            }
        };
        Map<TopicPartition, OffsetAndMetadata> offsets01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        unrevokedActiveTaskWithCommitNeeded.setCommittableOffsetsAndMetadata(offsets01);
        unrevokedActiveTaskWithCommitNeeded.setCommitNeeded();
        StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        HashMap<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        expectedCommittedOffsets.putAll(offsets00);
        expectedCommittedOffsets.putAll(offsets01);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignmentActive))).andReturn(Arrays.asList(new Task[]{revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded}));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall();
        this.consumer.commitSync(expectedCommittedOffsets);
        EasyMock.expectLastCall().andThrow((Throwable)new TimeoutException());
        EasyMock.expect((Object)this.consumer.assignment()).andStubReturn((Object)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions, this.taskId02Partitions}));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)revokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithoutCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)revokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithoutCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS() {
        this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2);
        StreamsProducer producer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        EasyMock.expect((Object)this.activeTaskCreator.threadProducer()).andStubReturn((Object)producer);
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.createMock(ProcessorStateManager.class);
        StateMachineTask revokedActiveTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager);
        Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        revokedActiveTask.setCommittableOffsetsAndMetadata(revokedActiveTaskOffsets);
        revokedActiveTask.setCommitNeeded();
        final AtomicBoolean unrevokedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
        StateMachineTask unrevokedActiveTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager){

            public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
                super.markChangelogAsCorrupted(partitions);
                unrevokedTaskChangelogMarkedAsCorrupted.set(true);
            }
        };
        Map<TopicPartition, OffsetAndMetadata> unrevokedTaskOffsets = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        unrevokedActiveTask.setCommittableOffsetsAndMetadata(unrevokedTaskOffsets);
        unrevokedActiveTask.setCommitNeeded();
        StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, stateManager);
        HashMap<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        expectedCommittedOffsets.putAll(revokedActiveTaskOffsets);
        expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
        stateManager.markChangelogAsCorrupted(this.taskId00ChangelogPartitions);
        stateManager.markChangelogAsCorrupted(this.taskId01ChangelogPartitions);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignmentActive))).andReturn(Arrays.asList(new Task[]{revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded}));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall();
        ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
        EasyMock.expect((Object)this.consumer.groupMetadata()).andReturn((Object)groupMetadata);
        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
        EasyMock.expectLastCall().andThrow((Throwable)new TimeoutException());
        EasyMock.expect((Object)this.consumer.assignment()).andStubReturn((Object)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions, this.taskId02Partitions}));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader, producer, stateManager});
        this.taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)revokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)unrevokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithoutCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Map<TopicPartition, Long> revokedActiveTaskChangelogOffsets = Collections.singletonMap(this.t1p0changelog, 0L);
        revokedActiveTask.setChangelogOffsets(revokedActiveTaskChangelogOffsets);
        Map<TopicPartition, Long> unrevokedActiveTaskChangelogOffsets = Collections.singletonMap(this.t1p1changelog, 0L);
        unrevokedActiveTask.setChangelogOffsets(unrevokedActiveTaskChangelogOffsets);
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)unrevokedTaskChangelogMarkedAsCorrupted.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)revokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        MatcherAssert.assertThat((Object)unrevokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithoutCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
    }

    @Test
    public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
    }

    @Test
    public void shouldAddNonResumedSuspendedTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(task00));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(this.taskId01Assignment))).andReturn(Collections.singletonList(task01));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldUpdateInputPartitionsAfterRebalance() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader, false);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Set newPartitionsSet = Utils.mkSet((Object[])new TopicPartition[]{this.t1p1});
        Map<TaskId, Set> taskIdSetMap = Collections.singletonMap(this.taskId00, newPartitionsSet);
        this.taskManager.handleAssignment(taskIdSetMap, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Assert.assertEquals((Object)newPartitionsSet, (Object)task00.inputPartitions());
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
    }

    @Test
    public void shouldAddNewActiveTasks() {
        Map<TaskId, Set<TopicPartition>> assignment = this.taskId00Assignment;
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        EasyMock.expect((Object)this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection)EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        this.changeLogReader.enforceRestoreActive();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(Collections.emptyMap()))).andStubReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), noOpResetter -> {});
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo(Collections.singletonMap(this.taskId00, task00)));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldNotCompleteRestorationIfTasksCannotInitialize() {
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public void initializeIfNeeded() {
                throw new LockException("can't lock");
            }
        };
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true){

            @Override
            public void initializeIfNeeded() {
                throw new TimeoutException("timed out");
            }
        };
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection)EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        this.changeLogReader.enforceRestoreActive();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignment))).andStubReturn(Arrays.asList(new Task[]{task00, task01}));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(Collections.emptyMap()))).andStubReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00)), Utils.mkEntry((Object)this.taskId01, (Object)((Object)task01))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() {
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public void completeRestoration(java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
                throw new TimeoutException("timeout!");
            }
        };
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection)EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        this.changeLogReader.enforceRestoreActive();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(Collections.emptyMap()))).andStubReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldSuspendActiveTasksDuringRevocation() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(task00));
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
        StreamsProducer producer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets00);
        task00.setCommitNeeded();
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        task01.setCommittableOffsetsAndMetadata(offsets01);
        task01.setCommitNeeded();
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets02 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, null));
        task02.setCommittableOffsetsAndMetadata(offsets02);
        StateMachineTask task10 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        HashMap<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        expectedCommittedOffsets.putAll(offsets00);
        expectedCommittedOffsets.putAll(offsets01);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        Map assignmentStandby = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId10, this.taskId10Partitions)});
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignmentActive))).andReturn(Arrays.asList(new Task[]{task00, task01, task02}));
        EasyMock.expect((Object)this.activeTaskCreator.threadProducer()).andReturn((Object)producer);
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq((Object)assignmentStandby))).andReturn(Collections.singletonList(task10));
        ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
        EasyMock.expect((Object)this.consumer.groupMetadata()).andReturn((Object)groupMetadata);
        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
        EasyMock.expectLastCall();
        task00.committedOffsets();
        EasyMock.expectLastCall();
        task01.committedOffsets();
        EasyMock.expectLastCall();
        task02.committedOffsets();
        EasyMock.expectLastCall();
        task10.committedOffsets();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task10.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task01.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task02.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task10.commitPrepared, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldCommitAllNeededTasksOnHandleRevocation() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets00);
        task00.setCommitNeeded();
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        task01.setCommittableOffsetsAndMetadata(offsets01);
        task01.setCommitNeeded();
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets02 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, null));
        task02.setCommittableOffsetsAndMetadata(offsets02);
        StateMachineTask task10 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        HashMap<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        expectedCommittedOffsets.putAll(offsets00);
        expectedCommittedOffsets.putAll(offsets01);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        Map assignmentStandby = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId10, this.taskId10Partitions)});
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignmentActive))).andReturn(Arrays.asList(new Task[]{task00, task01, task02}));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq((Object)assignmentStandby))).andReturn(Collections.singletonList(task10));
        this.consumer.commitSync(expectedCommittedOffsets);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task10.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task00.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task01.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task02.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task10.commitPrepared, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets00);
        task00.setCommitNeeded();
        StateMachineTask task10 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        Map<TaskId, Set<TopicPartition>> assignmentActive = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        Map<TaskId, Set<TopicPartition>> assignmentStandby = Collections.singletonMap(this.taskId10, this.taskId10Partitions);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(assignmentActive))).andReturn(Collections.singleton(task00));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(assignmentStandby))).andReturn(Collections.singletonList(task10));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task10.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task10.commitPrepared, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets00);
        task00.setCommitNeeded();
        StateMachineTask task10 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        Map<TaskId, Set<TopicPartition>> assignmentActive = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        Map<TaskId, Set<TopicPartition>> assignmentStandby = Collections.singletonMap(this.taskId10, this.taskId10Partitions);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(assignmentActive))).andReturn(Collections.singleton(task00));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(assignmentStandby))).andReturn(Collections.singletonList(task10));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task10.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(task00));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId)EasyMock.eq((Object)this.taskId00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
    }

    @Test
    public void shouldPassUpIfExceptionDuringSuspend() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("KABOOM!");
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleRevocation(this.taskId00Partitions));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown() {
        final TopicPartition changelog = new TopicPartition("changelog", 0);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions), Utils.mkEntry((Object)this.taskId03, this.taskId03Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public Collection<TopicPartition> changelogPartitions() {
                return Collections.singletonList(changelog);
            }
        };
        final AtomicBoolean closedDirtyTask01 = new AtomicBoolean(false);
        final AtomicBoolean closedDirtyTask02 = new AtomicBoolean(false);
        final AtomicBoolean closedDirtyTask03 = new AtomicBoolean(false);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("migrated", (Throwable)new RuntimeException("cause"));
            }

            @Override
            public void closeDirty() {
                super.closeDirty();
                closedDirtyTask01.set(true);
            }
        };
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }

            @Override
            public void closeDirty() {
                super.closeDirty();
                closedDirtyTask02.set(true);
            }
        };
        StateMachineTask task03 = new StateMachineTask(this.taskId03, this.taskId03Partitions, true){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }

            @Override
            public void closeDirty() {
                super.closeDirty();
                closedDirtyTask03.set(true);
            }
        };
        EasyMock.resetToStrict((Object[])new Object[]{this.changeLogReader});
        EasyMock.expect((Object)this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignment))).andStubReturn(Arrays.asList(new Task[]{task00, task01, task02, task03}));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId)EasyMock.eq((Object)this.taskId00));
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId)EasyMock.eq((Object)this.taskId01));
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId)EasyMock.eq((Object)this.taskId02));
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId)EasyMock.eq((Object)this.taskId03));
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(Collections.emptyMap()))).andStubReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task03.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task03.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00)), Utils.mkEntry((Object)this.taskId01, (Object)((Object)task01)), Utils.mkEntry((Object)this.taskId02, (Object)((Object)task02)), Utils.mkEntry((Object)this.taskId03, (Object)((Object)task03))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        RuntimeException exception = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.shutdown(true));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)IsEqual.equalTo((Object)"Unexpected exception while closing task"));
        MatcherAssert.assertThat((Object)exception.getCause().getMessage(), (Matcher)Matchers.is((Object)"migrated; it means all tasks belonging to this thread should be migrated."));
        MatcherAssert.assertThat((Object)exception.getCause().getCause().getMessage(), (Matcher)Matchers.is((Object)"cause"));
        MatcherAssert.assertThat((Object)closedDirtyTask01.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)closedDirtyTask02.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)closedDirtyTask03.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task03.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.changeLogReader});
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateTaskProducerExceptionsOnCleanShutdown() {
        final TopicPartition changelog = new TopicPartition("changelog", 0);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public Collection<TopicPartition> changelogPartitions() {
                return Collections.singletonList(changelog);
            }
        };
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        EasyMock.resetToStrict((Object[])new Object[]{this.changeLogReader});
        EasyMock.expect((Object)this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignment))).andStubReturn(Collections.singletonList(task00));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId)EasyMock.eq((Object)this.taskId00));
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("whatever"));
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(Collections.emptyMap()))).andStubReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        RuntimeException exception = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.shutdown(true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.is((Object)"Unexpected exception while closing task"));
        MatcherAssert.assertThat((Object)exception.getCause().getMessage(), (Matcher)Matchers.is((Object)"whatever"));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.changeLogReader});
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateThreadProducerExceptionsOnCleanShutdown() {
        final TopicPartition changelog = new TopicPartition("changelog", 0);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public Collection<TopicPartition> changelogPartitions() {
                return Collections.singletonList(changelog);
            }
        };
        EasyMock.resetToStrict((Object[])new Object[]{this.changeLogReader});
        EasyMock.expect((Object)this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignment))).andStubReturn(Collections.singletonList(task00));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId)EasyMock.eq((Object)this.taskId00));
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("whatever"));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(Collections.emptyMap()))).andStubReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        RuntimeException exception = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.shutdown(true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.is((Object)"Unexpected exception while closing task"));
        MatcherAssert.assertThat((Object)exception.getCause().getMessage(), (Matcher)Matchers.is((Object)"whatever"));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.changeLogReader});
    }

    @Test
    public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() {
        this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false){

            @Override
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("task 0_1 prepare commit boom!");
            }
        };
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task00);
        this.taskManager.addTask((Task)task01);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.singletonMap(this.taskId00, this.taskId00Partitions)));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)Matchers.is((Object)"task 0_1 prepare commit boom!"));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.tasks(), (Matcher)Matchers.is(Collections.singletonMap(this.taskId00, task00)));
    }

    @Test
    public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("task 0_1 suspend boom!");
            }
        };
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        this.taskManager.addTask((Task)task00);
        this.taskManager.addTask((Task)task01);
        this.taskManager.addTask((Task)task02);
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator});
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleRevocation((Collection)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId01Partitions, this.taskId02Partitions})));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)Matchers.is((Object)"task 0_1 suspend boom!"));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldCloseActiveTasksAndIgnoreExceptionsOnUncleanShutdown() {
        final TopicPartition changelog = new TopicPartition("changelog", 0);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public Collection<TopicPartition> changelogPartitions() {
                return Collections.singletonList(changelog);
            }
        };
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("migrated", (Throwable)new RuntimeException("cause"));
            }
        };
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }
        };
        EasyMock.resetToStrict((Object[])new Object[]{this.changeLogReader});
        EasyMock.expect((Object)this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignment))).andStubReturn(Arrays.asList(new Task[]{task00, task01, task02}));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId)EasyMock.eq((Object)this.taskId00));
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("whatever 0"));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId)EasyMock.eq((Object)this.taskId01));
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("whatever 1"));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded((TaskId)EasyMock.eq((Object)this.taskId02));
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("whatever 2"));
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("whatever all"));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(Collections.emptyMap()))).andStubReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00)), Utils.mkEntry((Object)this.taskId01, (Object)((Object)task01)), Utils.mkEntry((Object)this.taskId02, (Object)((Object)task02))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        this.taskManager.shutdown(false);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.changeLogReader});
    }

    @Test
    public void shouldCloseStandbyTasksOnShutdown() {
        Map<TaskId, Set<TopicPartition>> assignment = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.expect((Object)this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection)EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.changeLogReader});
        this.taskManager.handleAssignment(Collections.emptyMap(), assignment);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.equalTo(Collections.singletonMap(this.taskId00, task00)));
        this.taskManager.shutdown(true);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldInitializeNewActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo(Collections.singletonMap(this.taskId00, task00)));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldInitializeNewStandbyTasks() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(this.taskId01Assignment))).andStubReturn(Collections.singletonList(task01));
        EasyMock.replay((Object[])new Object[]{this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.equalTo(Collections.singletonMap(this.taskId01, task01)));
    }

    @Test
    public void shouldHandleRebalanceEvents() {
        Set<TopicPartition> assignment = Collections.singleton(new TopicPartition("assignment", 0));
        EasyMock.expect((Object)this.consumer.assignment()).andReturn(assignment);
        this.consumer.pause(assignment);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.stateDirectory.listNonEmptyTaskDirectories()).andReturn(new ArrayList());
        EasyMock.replay((Object[])new Object[]{this.consumer, this.stateDirectory});
        MatcherAssert.assertThat((Object)this.taskManager.isRebalanceInProgress(), (Matcher)Matchers.is((Object)false));
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat((Object)this.taskManager.isRebalanceInProgress(), (Matcher)Matchers.is((Object)true));
        this.taskManager.handleRebalanceComplete();
        MatcherAssert.assertThat((Object)this.taskManager.isRebalanceInProgress(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldCommitActiveAndStandbyTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(this.taskId01Assignment))).andStubReturn(Collections.singletonList(task01));
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        task01.setCommitNeeded();
        MatcherAssert.assertThat((Object)this.taskManager.commitAll(), (Matcher)IsEqual.equalTo((Object)2));
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task01.commitNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldCommitProvidedTasksIfNeeded() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        StateMachineTask task03 = new StateMachineTask(this.taskId03, this.taskId03Partitions, false);
        StateMachineTask task04 = new StateMachineTask(this.taskId04, this.taskId04Partitions, false);
        StateMachineTask task05 = new StateMachineTask(this.taskId05, this.taskId05Partitions, false);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        Map assignmentStandby = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId03, this.taskId03Partitions), Utils.mkEntry((Object)this.taskId04, this.taskId04Partitions), Utils.mkEntry((Object)this.taskId05, this.taskId05Partitions)});
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignmentActive))).andStubReturn(Arrays.asList(new Task[]{task00, task01, task02}));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq((Object)assignmentStandby))).andStubReturn(Arrays.asList(new Task[]{task03, task04, task05}));
        this.consumer.commitSync((Map)EasyMock.eq(Collections.emptyMap()));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        task01.setCommitNeeded();
        task03.setCommitNeeded();
        task04.setCommitNeeded();
        MatcherAssert.assertThat((Object)this.taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task00, task02, task03, task05})), (Matcher)IsEqual.equalTo((Object)2));
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task01.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task02.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task03.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task04.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task05.commitNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        MatcherAssert.assertThat((Object)this.taskManager.commitAll(), (Matcher)IsEqual.equalTo((Object)1));
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throws Exception {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        this.makeTaskFolders(this.taskId00.toString(), ((Object)((Object)task01)).toString());
        this.expectLockObtainedFor(this.taskId00, this.taskId01);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(this.taskId01Assignment))).andStubReturn(Collections.singletonList(task01));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.stateDirectory, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        task01.setCommitNeeded();
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat((Object)this.taskManager.commitAll(), (Matcher)IsEqual.equalTo((Object)-1));
        MatcherAssert.assertThat((Object)this.taskManager.maybeCommitActiveTasksPerUserRequested(), (Matcher)IsEqual.equalTo((Object)-1));
    }

    @Test
    public void shouldCommitViaConsumerIfEosDisabled() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        task01.setCommittableOffsetsAndMetadata(offsets);
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task01);
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.consumer});
        this.taskManager.commitAll();
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldCommitViaProducerIfEosAlphaEnabled() {
        StreamsProducer producer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        EasyMock.expect((Object)this.activeTaskCreator.streamsProducerForTask((TaskId)EasyMock.anyObject(TaskId.class))).andReturn((Object)producer).andReturn((Object)producer);
        Map<TopicPartition, OffsetAndMetadata> offsetsT01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        Map<TopicPartition, OffsetAndMetadata> offsetsT02 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(1L, null));
        producer.commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
        EasyMock.expectLastCall();
        producer.commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
        EasyMock.expectLastCall();
        this.shouldCommitViaProducerIfEosEnabled(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA, producer, offsetsT01, offsetsT02);
    }

    @Test
    public void shouldCommitViaProducerIfEosV2Enabled() {
        StreamsProducer producer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        EasyMock.expect((Object)this.activeTaskCreator.threadProducer()).andReturn((Object)producer);
        Map<TopicPartition, OffsetAndMetadata> offsetsT01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        Map<TopicPartition, OffsetAndMetadata> offsetsT02 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(1L, null));
        HashMap<TopicPartition, OffsetAndMetadata> allOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        allOffsets.putAll(offsetsT01);
        allOffsets.putAll(offsetsT02);
        producer.commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
        EasyMock.expectLastCall();
        this.shouldCommitViaProducerIfEosEnabled(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, producer, offsetsT01, offsetsT02);
    }

    private void shouldCommitViaProducerIfEosEnabled(StreamsConfigUtils.ProcessingMode processingMode, StreamsProducer producer, Map<TopicPartition, OffsetAndMetadata> offsetsT01, Map<TopicPartition, OffsetAndMetadata> offsetsT02) {
        this.setUpTaskManager(processingMode);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        task01.setCommittableOffsetsAndMetadata(offsetsT01);
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task01);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        task02.setCommittableOffsetsAndMetadata(offsetsT02);
        task02.setCommitNeeded();
        this.taskManager.addTask((Task)task02);
        EasyMock.reset((Object[])new Object[]{this.consumer});
        EasyMock.expect((Object)this.consumer.groupMetadata()).andStubReturn((Object)new ConsumerGroupMetadata("appId"));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, producer});
        this.taskManager.commitAll();
        EasyMock.verify((Object[])new Object[]{producer, this.consumer});
    }

    @Test
    public void shouldPropagateExceptionFromActiveCommit() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("opsh.");
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.commitAll());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"opsh."));
    }

    @Test
    public void shouldPropagateExceptionFromStandbyCommit() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false){

            @Override
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("opsh.");
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(this.taskId01Assignment))).andStubReturn(Collections.singletonList(task01));
        EasyMock.replay((Object[])new Object[]{this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task01.setCommitNeeded();
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.commitAll());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"opsh."));
    }

    @Test
    public void shouldSendPurgeData() {
        EasyMock.resetToStrict((Object[])new Object[]{this.adminClient});
        EasyMock.expect((Object)this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)5L)))).andReturn((Object)new DeleteRecordsResult(Collections.singletonMap(this.t1p1, TaskManagerTest.completedFuture())));
        EasyMock.expect((Object)this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)17L)))).andReturn((Object)new DeleteRecordsResult(Collections.singletonMap(this.t1p1, TaskManagerTest.completedFuture())));
        EasyMock.replay((Object[])new Object[]{this.adminClient});
        final HashMap<TopicPartition, Long> purgableOffsets = new HashMap<TopicPartition, Long>();
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public Map<TopicPartition, Long> purgeableOffsets() {
                return purgableOffsets;
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        purgableOffsets.put(this.t1p1, 5L);
        this.taskManager.maybePurgeCommittedRecords();
        purgableOffsets.put(this.t1p1, 17L);
        this.taskManager.maybePurgeCommittedRecords();
        EasyMock.verify((Object[])new Object[]{this.adminClient});
    }

    @Test
    public void shouldNotSendPurgeDataIfPreviousNotDone() {
        EasyMock.resetToStrict((Object[])new Object[]{this.adminClient});
        KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl();
        EasyMock.expect((Object)this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)5L)))).andReturn((Object)new DeleteRecordsResult(Collections.singletonMap(this.t1p1, futureDeletedRecords)));
        EasyMock.replay((Object[])new Object[]{this.adminClient});
        final HashMap<TopicPartition, Long> purgableOffsets = new HashMap<TopicPartition, Long>();
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public Map<TopicPartition, Long> purgeableOffsets() {
                return purgableOffsets;
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        purgableOffsets.put(this.t1p1, 5L);
        this.taskManager.maybePurgeCommittedRecords();
        purgableOffsets.put(this.t1p1, 17L);
        this.taskManager.maybePurgeCommittedRecords();
        EasyMock.verify((Object[])new Object[]{this.adminClient});
    }

    @Test
    public void shouldIgnorePurgeDataErrors() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl();
        DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(this.t1p1, futureDeletedRecords));
        futureDeletedRecords.completeExceptionally((Throwable)new Exception("KABOOM!"));
        EasyMock.expect((Object)this.adminClient.deleteRecords((Map)EasyMock.anyObject())).andReturn((Object)deleteRecordsResult).times(2);
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.adminClient, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setPurgeableOffsets(Collections.singletonMap(this.t1p1, 5L));
        this.taskManager.maybePurgeCommittedRecords();
        this.taskManager.maybePurgeCommittedRecords();
        EasyMock.verify((Object[])new Object[]{this.adminClient});
    }

    @Test
    public void shouldMaybeCommitAllActiveTasksThatNeedCommit() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets0 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets0);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets1 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        task01.setCommittableOffsetsAndMetadata(offsets1);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets2 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, null));
        task02.setCommittableOffsetsAndMetadata(offsets2);
        StateMachineTask task03 = new StateMachineTask(this.taskId03, this.taskId03Partitions, true);
        StateMachineTask task04 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        HashMap<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        expectedCommittedOffsets.putAll(offsets0);
        expectedCommittedOffsets.putAll(offsets1);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions), Utils.mkEntry((Object)this.taskId03, this.taskId03Partitions)});
        Map assignmentStandby = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId10, this.taskId10Partitions)});
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq((Object)assignmentActive))).andStubReturn(Arrays.asList(new Task[]{task00, task01, task02, task03}));
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq((Object)assignmentStandby))).andStubReturn(Collections.singletonList(task04));
        this.consumer.commitSync(expectedCommittedOffsets);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task03.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task04.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        task00.setCommitRequested();
        task01.setCommitNeeded();
        task02.setCommitRequested();
        task03.setCommitNeeded();
        task03.setCommitRequested();
        task04.setCommitNeeded();
        task04.setCommitRequested();
        MatcherAssert.assertThat((Object)this.taskManager.maybeCommitActiveTasksPerUserRequested(), (Matcher)IsEqual.equalTo((Object)3));
    }

    @Test
    public void shouldProcessActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>();
        assignment.put(this.taskId00, this.taskId00Partitions);
        assignment.put(this.taskId01, this.taskId01Partitions);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(assignment))).andStubReturn(Arrays.asList(new Task[]{task00, task01}));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.addRecords(this.t1p0, Arrays.asList(TaskManagerTest.getConsumerRecord(this.t1p0, 0L), TaskManagerTest.getConsumerRecord(this.t1p0, 1L), TaskManagerTest.getConsumerRecord(this.t1p0, 2L), TaskManagerTest.getConsumerRecord(this.t1p0, 3L), TaskManagerTest.getConsumerRecord(this.t1p0, 4L), TaskManagerTest.getConsumerRecord(this.t1p0, 5L)));
        task01.addRecords(this.t1p1, Arrays.asList(TaskManagerTest.getConsumerRecord(this.t1p1, 0L), TaskManagerTest.getConsumerRecord(this.t1p1, 1L), TaskManagerTest.getConsumerRecord(this.t1p1, 2L), TaskManagerTest.getConsumerRecord(this.t1p1, 3L), TaskManagerTest.getConsumerRecord(this.t1p1, 4L)));
        MatcherAssert.assertThat((Object)this.taskManager.process(3, this.time), (Matcher)Matchers.is((Object)6));
        MatcherAssert.assertThat((Object)this.taskManager.process(3, this.time), (Matcher)Matchers.is((Object)5));
        MatcherAssert.assertThat((Object)this.taskManager.process(3, this.time), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void shouldNotFailOnTimeoutException() {
        final AtomicReference<TimeoutException> timeoutException = new AtomicReference<TimeoutException>();
        timeoutException.set(new TimeoutException("Skip me!"));
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        task00.transitionTo(Task.State.RESTORING);
        task00.transitionTo(Task.State.RUNNING);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true){

            @Override
            public boolean process(long wallClockTime) {
                TimeoutException exception = (TimeoutException)timeoutException.get();
                if (exception != null) {
                    throw exception;
                }
                return true;
            }
        };
        task01.transitionTo(Task.State.RESTORING);
        task01.transitionTo(Task.State.RUNNING);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        task02.transitionTo(Task.State.RESTORING);
        task02.transitionTo(Task.State.RUNNING);
        this.taskManager.addTask((Task)task00);
        this.taskManager.addTask((Task)task01);
        this.taskManager.addTask((Task)task02);
        task00.addRecords(this.t1p0, Arrays.asList(TaskManagerTest.getConsumerRecord(this.t1p0, 0L), TaskManagerTest.getConsumerRecord(this.t1p0, 1L)));
        task01.addRecords(this.t1p1, Arrays.asList(TaskManagerTest.getConsumerRecord(this.t1p1, 0L), TaskManagerTest.getConsumerRecord(this.t1p1, 1L)));
        task02.addRecords(this.t1p2, Arrays.asList(TaskManagerTest.getConsumerRecord(this.t1p2, 0L), TaskManagerTest.getConsumerRecord(this.t1p2, 1L)));
        MatcherAssert.assertThat((Object)this.taskManager.process(1, this.time), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)task01.timeout, (Matcher)IsEqual.equalTo((Object)this.time.milliseconds()));
        timeoutException.set(null);
        MatcherAssert.assertThat((Object)this.taskManager.process(1, this.time), (Matcher)Matchers.is((Object)3));
        MatcherAssert.assertThat((Object)task01.timeout, (Matcher)IsEqual.equalTo(null));
        MatcherAssert.assertThat((Object)this.taskManager.process(1, this.time), (Matcher)Matchers.is((Object)1));
    }

    @Test
    public void shouldPropagateTaskMigratedExceptionsInProcessActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public boolean process(long wallClockTime) {
                throw new TaskMigratedException("migrated", (Throwable)new RuntimeException("cause"));
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        TopicPartition partition = this.taskId00Partitions.iterator().next();
        task00.addRecords(partition, Collections.singletonList(TaskManagerTest.getConsumerRecord(partition, 0L)));
        Assert.assertThrows(TaskMigratedException.class, () -> this.taskManager.process(1, this.time));
    }

    @Test
    public void shouldWrapRuntimeExceptionsInProcessActiveTasksAndSetTaskId() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public boolean process(long wallClockTime) {
                throw new RuntimeException("oops");
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        TopicPartition partition = this.taskId00Partitions.iterator().next();
        task00.addRecords(partition, Collections.singletonList(TaskManagerTest.getConsumerRecord(partition, 0L)));
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.taskManager.process(1, this.time));
        MatcherAssert.assertThat((Object)exception.taskId().isPresent(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat(exception.taskId().get(), (Matcher)Matchers.is((Object)this.taskId00));
        MatcherAssert.assertThat((Object)exception.getCause().getMessage(), (Matcher)Matchers.is((Object)"oops"));
    }

    @Test
    public void shouldPropagateTaskMigratedExceptionsInPunctuateActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            public boolean maybePunctuateStreamTime() {
                throw new TaskMigratedException("migrated", (Throwable)new RuntimeException("cause"));
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Assert.assertThrows(TaskMigratedException.class, () -> this.taskManager.punctuate());
    }

    @Test
    public void shouldPropagateKafkaExceptionsInPunctuateActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            public boolean maybePunctuateStreamTime() {
                throw new KafkaException("oops");
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Assert.assertThrows(KafkaException.class, () -> this.taskManager.punctuate());
    }

    @Test
    public void shouldPunctuateActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            public boolean maybePunctuateStreamTime() {
                return true;
            }

            public boolean maybePunctuateSystemTime() {
                return true;
            }
        };
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.punctuate(), (Matcher)IsEqual.equalTo((Object)2));
    }

    @Test
    public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public Collection<TopicPartition> changelogPartitions() {
                return Collections.singletonList(new TopicPartition("fake", 0));
            }
        };
        EasyMock.expect((Object)this.changeLogReader.completedChangelogs()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andStubReturn(Collections.singletonList(task00));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.changeLogReader, this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldHaveRemainingPartitionsUncleared() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(task00));
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer, this.changeLogReader});
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class);){
            LogCaptureAppender.setClassLoggerToDebug(TaskManager.class);
            this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
            MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
            this.taskManager.handleRevocation((Collection)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, new TopicPartition("unknown", 0)}));
            MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
            List<String> messages = appender.getMessages();
            MatcherAssert.assertThat(messages, (Matcher)CoreMatchers.hasItem((Object)"taskManagerTestThe following revoked partitions [unknown-0] are missing from the current task partitions. It could potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback."));
        }
    }

    @Test
    public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
        StateMachineTask migratedTask01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", (Throwable)new RuntimeException());
            }
        };
        StateMachineTask migratedTask02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t2 close exception", (Throwable)new RuntimeException());
            }
        };
        this.taskManager.addTask((Task)migratedTask01);
        this.taskManager.addTask((Task)migratedTask02);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"t1 close exception; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() {
        StateMachineTask migratedTask01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", (Throwable)new RuntimeException());
            }
        };
        StateMachineTask migratedTask02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false){

            @Override
            public void suspend() {
                super.suspend();
                throw new IllegalStateException("t2 illegal state exception", new RuntimeException());
            }
        };
        this.taskManager.addTask((Task)migratedTask01);
        this.taskManager.addTask((Task)migratedTask02);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Unexpected failure to close 2 task(s) [[0_1, 0_2]]. First unexpected exception (for task 0_2) follows."));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)IsEqual.equalTo((Object)"t2 illegal state exception"));
    }

    @Test
    public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() {
        StateMachineTask migratedTask01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", (Throwable)new RuntimeException());
            }
        };
        StateMachineTask migratedTask02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false){

            @Override
            public void suspend() {
                super.suspend();
                throw new KafkaException("Kaboom for t2!", (Throwable)new RuntimeException());
            }
        };
        this.taskManager.addTask((Task)migratedTask01);
        this.taskManager.addTask((Task)migratedTask02);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        MatcherAssert.assertThat((Object)thrown.taskId().isPresent(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat(thrown.taskId().get(), (Matcher)Matchers.is((Object)this.taskId02));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)IsEqual.equalTo((Object)"Kaboom for t2!"));
    }

    @Test
    public void shouldTransmitProducerMetrics() {
        MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap());
        KafkaMetric testMetric = new KafkaMetric(new Object(), testMetricName, (MetricValueProvider)((Measurable)(config, now) -> 0.0), null, (Time)new MockTime());
        Map<MetricName, KafkaMetric> dummyProducerMetrics = Collections.singletonMap(testMetricName, testMetric);
        EasyMock.expect((Object)this.activeTaskCreator.producerMetrics()).andReturn(dummyProducerMetrics);
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator});
        MatcherAssert.assertThat((Object)this.taskManager.producerMetrics(), (Matcher)Matchers.is(dummyProducerMetrics));
    }

    private Map<TaskId, StateMachineTask> handleAssignment(Map<TaskId, Set<TopicPartition>> runningActiveAssignment, Map<TaskId, Set<TopicPartition>> standbyAssignment, Map<TaskId, Set<TopicPartition>> restoringActiveAssignment) {
        Set runningTasks = runningActiveAssignment.entrySet().stream().map(t -> new StateMachineTask((TaskId)t.getKey(), (Set)t.getValue(), true)).collect(Collectors.toSet());
        Set standbyTasks = standbyAssignment.entrySet().stream().map(t -> new StateMachineTask((TaskId)t.getKey(), (Set)t.getValue(), false)).collect(Collectors.toSet());
        Set<Task> restoringTasks = restoringActiveAssignment.entrySet().stream().map(t -> new StateMachineTask((TaskId)t.getKey(), (Set)t.getValue(), true)).collect(Collectors.toSet());
        restoringTasks.forEach(t -> ((StateMachineTask)((Object)t)).setChangelogOffsets(Collections.singletonMap(new TopicPartition("changelog", 0), 0L)));
        HashMap<TaskId, Set<TopicPartition>> allActiveTasksAssignment = new HashMap<TaskId, Set<TopicPartition>>(runningActiveAssignment);
        allActiveTasksAssignment.putAll(restoringActiveAssignment);
        HashSet allActiveTasks = new HashSet(runningTasks);
        allActiveTasks.addAll(restoringTasks);
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(standbyAssignment))).andStubReturn(standbyTasks);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(allActiveTasksAssignment))).andStubReturn(allActiveTasks);
        TaskManagerTest.expectRestoreToBeCompleted(this.consumer, this.changeLogReader);
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator, this.consumer, this.changeLogReader});
        this.taskManager.handleAssignment(allActiveTasksAssignment, standbyAssignment);
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        HashMap<TaskId, StateMachineTask> allTasks = new HashMap<TaskId, StateMachineTask>();
        for (Task task : runningTasks) {
            MatcherAssert.assertThat((Object)task.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
            allTasks.put(task.id(), (StateMachineTask)task);
        }
        for (Task task : restoringTasks) {
            MatcherAssert.assertThat((Object)task.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
            allTasks.put(task.id(), (StateMachineTask)task);
        }
        for (Task task : standbyTasks) {
            MatcherAssert.assertThat((Object)task.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
            allTasks.put(task.id(), (StateMachineTask)task);
        }
        return allTasks;
    }

    private void expectLockObtainedFor(TaskId ... tasks) throws Exception {
        for (TaskId task : tasks) {
            EasyMock.expect((Object)this.stateDirectory.lock(task)).andReturn((Object)true).once();
        }
    }

    private void expectLockFailedFor(TaskId ... tasks) throws Exception {
        for (TaskId task : tasks) {
            EasyMock.expect((Object)this.stateDirectory.lock(task)).andReturn((Object)false).once();
        }
    }

    private void expectUnlockFor(TaskId ... tasks) throws Exception {
        for (TaskId task : tasks) {
            this.stateDirectory.unlock(task);
            EasyMock.expectLastCall();
        }
    }

    private static void expectConsumerAssignmentPaused(Consumer<byte[], byte[]> consumer) {
        Set<TopicPartition> assignment = Collections.singleton(new TopicPartition("assignment", 0));
        EasyMock.expect((Object)consumer.assignment()).andReturn(assignment);
        consumer.pause(assignment);
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnCommitFailed() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task01.setCommittableOffsetsAndMetadata(offsets);
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task01);
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall().andThrow((Throwable)new CommitFailedException());
        EasyMock.replay((Object[])new Object[]{this.consumer});
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.taskManager.commitAll());
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(CommitFailedException.class));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group; it means all tasks belonging to this thread should be migrated."));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
    }

    @Test
    public void shouldNotFailForTimeoutExceptionOnConsumerCommit() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        task00.setCommittableOffsetsAndMetadata(this.taskId00Partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0L))));
        task01.setCommittableOffsetsAndMetadata(this.taskId00Partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0L))));
        this.consumer.commitSync((Map)EasyMock.anyObject(Map.class));
        EasyMock.expectLastCall().andThrow((Throwable)new TimeoutException("KABOOM!"));
        this.consumer.commitSync((Map)EasyMock.anyObject(Map.class));
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.consumer});
        task00.setCommitNeeded();
        MatcherAssert.assertThat((Object)this.taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task00, task01})), (Matcher)IsEqual.equalTo((Object)0));
        MatcherAssert.assertThat((Object)task00.timeout, (Matcher)IsEqual.equalTo((Object)this.time.milliseconds()));
        Assert.assertNull((Object)task01.timeout);
        MatcherAssert.assertThat((Object)this.taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task00, task01})), (Matcher)IsEqual.equalTo((Object)1));
        Assert.assertNull((Object)task00.timeout);
        Assert.assertNull((Object)task01.timeout);
    }

    @Test
    public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
        this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA);
        StreamsProducer producer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        EasyMock.expect((Object)this.activeTaskCreator.streamsProducerForTask((TaskId)EasyMock.anyObject(TaskId.class))).andReturn((Object)producer).andReturn((Object)producer).andReturn((Object)producer);
        Map<TopicPartition, OffsetAndMetadata> offsetsT00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        Map<TopicPartition, OffsetAndMetadata> offsetsT01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        producer.commitTransaction(offsetsT00, null);
        EasyMock.expectLastCall().andThrow((Throwable)new TimeoutException("KABOOM!"));
        producer.commitTransaction(offsetsT00, null);
        EasyMock.expectLastCall();
        producer.commitTransaction(offsetsT01, null);
        EasyMock.expectLastCall();
        producer.commitTransaction(offsetsT01, null);
        EasyMock.expectLastCall();
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        task00.setCommittableOffsetsAndMetadata(offsetsT00);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        task01.setCommittableOffsetsAndMetadata(offsetsT01);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        EasyMock.expect((Object)this.consumer.groupMetadata()).andStubReturn(null);
        EasyMock.replay((Object[])new Object[]{producer, this.activeTaskCreator, this.consumer});
        task00.setCommitNeeded();
        task01.setCommitNeeded();
        TaskCorruptedException exception = (TaskCorruptedException)Assert.assertThrows(TaskCorruptedException.class, () -> this.taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task00, task01, task02})));
        MatcherAssert.assertThat((Object)exception.corruptedTasks(), (Matcher)IsEqual.equalTo(Collections.singleton(this.taskId00)));
    }

    @Test
    public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV2() {
        this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2);
        StreamsProducer producer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        EasyMock.expect((Object)this.activeTaskCreator.threadProducer()).andReturn((Object)producer).andReturn((Object)producer);
        Map<TopicPartition, OffsetAndMetadata> offsetsT00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        Map<TopicPartition, OffsetAndMetadata> offsetsT01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        HashMap<TopicPartition, OffsetAndMetadata> allOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(offsetsT00);
        allOffsets.putAll(offsetsT01);
        producer.commitTransaction(allOffsets, null);
        EasyMock.expectLastCall().andThrow((Throwable)new TimeoutException("KABOOM!"));
        producer.commitTransaction(allOffsets, null);
        EasyMock.expectLastCall();
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        task00.setCommittableOffsetsAndMetadata(offsetsT00);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        task01.setCommittableOffsetsAndMetadata(offsetsT01);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        EasyMock.expect((Object)this.consumer.groupMetadata()).andStubReturn(null);
        EasyMock.replay((Object[])new Object[]{producer, this.activeTaskCreator, this.consumer});
        task00.setCommitNeeded();
        task01.setCommitNeeded();
        TaskCorruptedException exception = (TaskCorruptedException)Assert.assertThrows(TaskCorruptedException.class, () -> this.taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task00, task01, task02})));
        MatcherAssert.assertThat((Object)exception.corruptedTasks(), (Matcher)IsEqual.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01})));
    }

    @Test
    public void shouldStreamsExceptionOnCommitError() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task01.setCommittableOffsetsAndMetadata(offsets);
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task01);
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall().andThrow((Throwable)new KafkaException());
        EasyMock.replay((Object[])new Object[]{this.consumer});
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.taskManager.commitAll());
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(KafkaException.class));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered committing offsets via consumer"));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
    }

    @Test
    public void shouldFailOnCommitFatal() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task01.setCommittableOffsetsAndMetadata(offsets);
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task01);
        this.consumer.commitSync(offsets);
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("KABOOM"));
        EasyMock.replay((Object[])new Object[]{this.consumer});
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.commitAll());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"KABOOM"));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
    }

    @Test
    public void shouldSuspendAllTasksButSkipCommitIfSuspendingFailsDuringRevocation() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("KABOOM!");
            }
        };
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>(this.taskId00Assignment);
        assignment.putAll(this.taskId01Assignment);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(assignment))).andReturn(Arrays.asList(new Task[]{task00, task01}));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.consumer});
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleRevocation(Arrays.asList(this.t1p0, this.t1p1)));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldConvertActiveTaskToStandbyTask() {
        StreamTask activeTask = (StreamTask)EasyMock.mock(StreamTask.class);
        EasyMock.expect((Object)activeTask.id()).andStubReturn((Object)this.taskId00);
        EasyMock.expect((Object)activeTask.inputPartitions()).andStubReturn(this.taskId00Partitions);
        EasyMock.expect((Object)activeTask.isActive()).andStubReturn((Object)true);
        EasyMock.expect((Object)activeTask.prepareCommit()).andStubReturn(Collections.emptyMap());
        StandbyTask standbyTask = (StandbyTask)EasyMock.mock(StandbyTask.class);
        EasyMock.expect((Object)standbyTask.id()).andStubReturn((Object)this.taskId00);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(activeTask));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect((Object)this.standbyTaskCreator.createStandbyTaskFromActive((StreamTask)EasyMock.anyObject(), (Set)EasyMock.eq(this.taskId00Partitions))).andReturn((Object)standbyTask);
        EasyMock.replay((Object[])new Object[]{activeTask, standbyTask, this.activeTaskCreator, this.standbyTaskCreator, this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.standbyTaskCreator});
    }

    @Test
    public void shouldConvertStandbyTaskToActiveTask() {
        StandbyTask standbyTask = (StandbyTask)EasyMock.mock(StandbyTask.class);
        EasyMock.expect((Object)standbyTask.id()).andStubReturn((Object)this.taskId00);
        EasyMock.expect((Object)standbyTask.isActive()).andStubReturn((Object)false);
        EasyMock.expect((Object)standbyTask.prepareCommit()).andStubReturn(Collections.emptyMap());
        standbyTask.suspend();
        EasyMock.expectLastCall().anyTimes();
        standbyTask.postCommit(true);
        EasyMock.expectLastCall().anyTimes();
        StreamTask activeTask = (StreamTask)EasyMock.mock(StreamTask.class);
        EasyMock.expect((Object)activeTask.id()).andStubReturn((Object)this.taskId00);
        EasyMock.expect((Object)activeTask.inputPartitions()).andStubReturn(this.taskId00Partitions);
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Map)EasyMock.eq(this.taskId00Assignment))).andReturn(Collections.singletonList(standbyTask));
        EasyMock.expect((Object)this.activeTaskCreator.createActiveTaskFromStandby((StandbyTask)EasyMock.anyObject(), (Set)EasyMock.eq(this.taskId00Partitions), (Consumer)EasyMock.anyObject())).andReturn((Object)activeTask);
        EasyMock.replay((Object[])new Object[]{standbyTask, activeTask, this.standbyTaskCreator, this.activeTaskCreator, this.consumer});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        EasyMock.verify((Object[])new Object[]{this.standbyTaskCreator, this.activeTaskCreator});
    }

    private static void expectRestoreToBeCompleted(Consumer<byte[], byte[]> consumer, ChangelogReader changeLogReader) {
        TaskManagerTest.expectRestoreToBeCompleted(consumer, changeLogReader, true);
    }

    private static void expectRestoreToBeCompleted(Consumer<byte[], byte[]> consumer, ChangelogReader changeLogReader, boolean changeLogUpdateRequired) {
        Set<TopicPartition> assignment = Collections.singleton(new TopicPartition("assignment", 0));
        EasyMock.expect((Object)consumer.assignment()).andReturn(assignment);
        consumer.resume(assignment);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)changeLogReader.completedChangelogs()).andReturn(Collections.emptySet()).times(changeLogUpdateRequired ? 1 : 0, 1);
    }

    private static KafkaFutureImpl<DeletedRecords> completedFuture() {
        KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl();
        futureDeletedRecords.complete(null);
        return futureDeletedRecords;
    }

    private void makeTaskFolders(String ... names) throws Exception {
        ArrayList<StateDirectory.TaskDirectory> taskFolders = new ArrayList<StateDirectory.TaskDirectory>(names.length);
        for (int i = 0; i < names.length; ++i) {
            taskFolders.add(new StateDirectory.TaskDirectory(this.testFolder.newFolder(names[i]), null));
        }
        EasyMock.expect((Object)this.stateDirectory.listNonEmptyTaskDirectories()).andReturn(taskFolders).once();
    }

    private void writeCheckpointFile(TaskId task, Map<TopicPartition, Long> offsets) throws Exception {
        File checkpointFile = this.getCheckpointFile(task);
        Files.createFile(checkpointFile.toPath(), new FileAttribute[0]);
        new OffsetCheckpoint(checkpointFile).write(offsets);
        EasyMock.expect((Object)this.stateDirectory.checkpointFileFor(task)).andReturn((Object)checkpointFile);
    }

    private File getCheckpointFile(TaskId task) {
        return new File(new File(this.testFolder.getRoot(), task.toString()), ".checkpoint");
    }

    private static ConsumerRecord<byte[], byte[]> getConsumerRecord(TopicPartition topicPartition, long offset) {
        return new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), offset, null, null);
    }

    private static class StateMachineTask
    extends AbstractTask
    implements Task {
        private final boolean active;
        private boolean commitNeeded = false;
        private boolean commitRequested = false;
        private boolean commitPrepared = false;
        private boolean commitCompleted = false;
        private Map<TopicPartition, OffsetAndMetadata> committableOffsets = Collections.emptyMap();
        private Map<TopicPartition, Long> purgeableOffsets;
        private Map<TopicPartition, Long> changelogOffsets = Collections.emptyMap();
        private Set<TopicPartition> partitionsForOffsetReset = Collections.emptySet();
        private Long timeout = null;
        private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>> queue = new HashMap<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>>();

        StateMachineTask(TaskId id, Set<TopicPartition> partitions, boolean active) {
            this(id, partitions, active, null);
        }

        StateMachineTask(TaskId id, Set<TopicPartition> partitions, boolean active, ProcessorStateManager processorStateManager) {
            super(id, null, null, processorStateManager, partitions, 0L, "test-task", StateMachineTask.class);
            this.active = active;
        }

        public void initializeIfNeeded() {
            if (this.state() == Task.State.CREATED) {
                this.transitionTo(Task.State.RESTORING);
                if (!this.active) {
                    this.transitionTo(Task.State.RUNNING);
                }
            }
        }

        public void addPartitionsForOffsetReset(Set<TopicPartition> partitionsForOffsetReset) {
            this.partitionsForOffsetReset = partitionsForOffsetReset;
        }

        public void completeRestoration(java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
            if (this.state() == Task.State.RUNNING) {
                return;
            }
            this.transitionTo(Task.State.RUNNING);
        }

        public void setCommitNeeded() {
            this.commitNeeded = true;
        }

        public boolean commitNeeded() {
            return this.commitNeeded;
        }

        public void setCommitRequested() {
            this.commitRequested = true;
        }

        public boolean commitRequested() {
            return this.commitRequested;
        }

        public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
            this.commitPrepared = true;
            if (this.commitNeeded) {
                return this.committableOffsets;
            }
            return Collections.emptyMap();
        }

        public void postCommit(boolean enforceCheckpoint) {
            this.commitNeeded = false;
            this.commitCompleted = true;
        }

        public void suspend() {
            if (this.state() == Task.State.CLOSED) {
                throw new IllegalStateException("Illegal state " + this.state() + " while suspending active task " + this.id);
            }
            if (this.state() != Task.State.SUSPENDED) {
                this.transitionTo(Task.State.SUSPENDED);
            }
        }

        public void resume() {
            if (this.state() == Task.State.SUSPENDED) {
                this.transitionTo(Task.State.RUNNING);
            }
        }

        public void revive() {
            this.commitNeeded = false;
            this.commitRequested = false;
            super.revive();
        }

        public void maybeInitTaskTimeoutOrThrow(long currentWallClockMs, Exception cause) {
            this.timeout = currentWallClockMs;
        }

        public void clearTaskTimeout() {
            this.timeout = null;
        }

        public void closeClean() {
            this.transitionTo(Task.State.CLOSED);
        }

        public void closeDirty() {
            this.transitionTo(Task.State.CLOSED);
        }

        public void closeCleanAndRecycleState() {
            this.transitionTo(Task.State.CLOSED);
        }

        public void updateInputPartitions(Set<TopicPartition> topicPartitions, Map<String, List<String>> allTopologyNodesToSourceTopics) {
            this.inputPartitions = topicPartitions;
        }

        void setCommittableOffsetsAndMetadata(Map<TopicPartition, OffsetAndMetadata> committableOffsets) {
            if (!this.active) {
                throw new IllegalStateException("Cannot set CommittableOffsetsAndMetadate for StandbyTasks");
            }
            this.committableOffsets = committableOffsets;
        }

        public StateStore getStore(String name) {
            return null;
        }

        public Collection<TopicPartition> changelogPartitions() {
            return this.changelogOffsets.keySet();
        }

        public boolean isActive() {
            return this.active;
        }

        void setPurgeableOffsets(Map<TopicPartition, Long> purgeableOffsets) {
            this.purgeableOffsets = purgeableOffsets;
        }

        public Map<TopicPartition, Long> purgeableOffsets() {
            return this.purgeableOffsets;
        }

        void setChangelogOffsets(Map<TopicPartition, Long> changelogOffsets) {
            this.changelogOffsets = changelogOffsets;
        }

        public Map<TopicPartition, Long> changelogOffsets() {
            return this.changelogOffsets;
        }

        public Map<TopicPartition, Long> committedOffsets() {
            return Collections.emptyMap();
        }

        public Map<TopicPartition, Long> highWaterMark() {
            return Collections.emptyMap();
        }

        public Optional<Long> timeCurrentIdlingStarted() {
            return Optional.empty();
        }

        public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
            if (this.isActive()) {
                Deque partitionQueue = this.queue.computeIfAbsent(partition, k -> new LinkedList());
                for (ConsumerRecord<byte[], byte[]> record : records) {
                    partitionQueue.add(record);
                }
            } else {
                throw new IllegalStateException("Can't add records to an inactive task.");
            }
        }

        public boolean process(long wallClockTime) {
            if (this.isActive() && this.state() == Task.State.RUNNING) {
                for (LinkedList<ConsumerRecord<byte[], byte[]>> records : this.queue.values()) {
                    ConsumerRecord<byte[], byte[]> record = records.poll();
                    if (record == null) continue;
                    return true;
                }
                return false;
            }
            throw new IllegalStateException("Can't process an inactive or non-running task.");
        }
    }
}

