/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.DefaultStateUpdater;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateUpdater;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class DefaultStateUpdaterTest {
    private static final int COMMIT_INTERVAL = 100;
    private static final long CALL_TIMEOUT = 1000L;
    private static final long VERIFICATION_TIMEOUT = 15000L;
    private static final TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0);
    private static final TopicPartition TOPIC_PARTITION_A_1 = new TopicPartition("topicA", 1);
    private static final TopicPartition TOPIC_PARTITION_B_0 = new TopicPartition("topicB", 0);
    private static final TopicPartition TOPIC_PARTITION_C_0 = new TopicPartition("topicC", 0);
    private static final TopicPartition TOPIC_PARTITION_D_0 = new TopicPartition("topicD", 0);
    private static final TaskId TASK_0_0 = new TaskId(0, 0);
    private static final TaskId TASK_0_1 = new TaskId(0, 1);
    private static final TaskId TASK_0_2 = new TaskId(0, 2);
    private static final TaskId TASK_1_0 = new TaskId(1, 0);
    private static final TaskId TASK_1_1 = new TaskId(1, 1);
    private final Time time = new MockTime(1L);
    private final StreamsConfig config = new StreamsConfig((Map)this.configProps());
    private final ChangelogReader changelogReader = (ChangelogReader)Mockito.mock(ChangelogReader.class);
    private final Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> {};
    private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(this.config, this.changelogReader, this.time);

    DefaultStateUpdaterTest() {
    }

    @AfterEach
    public void tearDown() {
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
    }

    private Properties configProps() {
        return Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"appId"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once_v2"), Utils.mkEntry((Object)"commit.interval.ms", (Object)100), Utils.mkEntry((Object)StreamsConfig.producerPrefix((String)"transaction.timeout.ms"), (Object)100)}));
    }

    @Test
    public void shouldShutdownStateUpdater() {
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).clear();
    }

    @Test
    public void shouldShutdownStateUpdaterAndRestart() {
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)2))).clear();
    }

    @Test
    public void shouldThrowIfStatelessTaskNotInStateRestoring() {
        this.shouldThrowIfActiveTaskNotInStateRestoring(this.createStatelessTask(TASK_0_0));
    }

    @Test
    public void shouldThrowIfStatefulTaskNotInStateRestoring() {
        this.shouldThrowIfActiveTaskNotInStateRestoring(this.createActiveStatefulTask(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)));
    }

    private void shouldThrowIfActiveTaskNotInStateRestoring(StreamTask task) {
        this.shouldThrowIfTaskNotInGivenState((Task)task, Task.State.RESTORING);
    }

    @Test
    public void shouldThrowIfStandbyTaskNotInStateRunning() {
        StandbyTask task = this.createStandbyTask(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        this.shouldThrowIfTaskNotInGivenState((Task)task, Task.State.RUNNING);
    }

    private void shouldThrowIfTaskNotInGivenState(Task task, Task.State correctState) {
        for (Task.State state : Task.State.values()) {
            if (state == correctState) continue;
            Mockito.when((Object)task.state()).thenReturn((Object)state);
            Assertions.assertThrows(IllegalStateException.class, () -> this.stateUpdater.add(task));
        }
    }

    @Test
    public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception {
        StreamTask task1 = this.createStatelessTaskInStateRestoring(TASK_0_0);
        this.shouldImmediatelyAddStatelessTasksToRestoredTasks(task1);
    }

    @Test
    public void shouldImmediatelyAddMultipleStatelessTasksToRestoredTasks() throws Exception {
        StreamTask task1 = this.createStatelessTaskInStateRestoring(TASK_0_0);
        StreamTask task2 = this.createStatelessTaskInStateRestoring(TASK_0_2);
        StreamTask task3 = this.createStatelessTaskInStateRestoring(TASK_1_0);
        this.shouldImmediatelyAddStatelessTasksToRestoredTasks(task1, task2, task3);
    }

    private void shouldImmediatelyAddStatelessTasksToRestoredTasks(StreamTask ... tasks) throws Exception {
        this.stateUpdater.start();
        for (StreamTask task : tasks) {
            this.stateUpdater.add((Task)task);
        }
        this.verifyRestoredActiveTasks(tasks);
        this.verifyNeverCheckpointTasks((Task[])tasks);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
    }

    @Test
    public void shouldRestoreSingleActiveStatefulTask() throws Exception {
        StreamTask task = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Arrays.asList(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false).thenReturn((Object)false).thenReturn((Object)true);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.verifyRestoredActiveTasks(task);
        this.verifyCheckpointTasks(true, new Task[]{task});
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)1))).enforceRestoreActive();
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.atLeast((int)3))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.never())).transitToUpdateStandby();
    }

    @Test
    public void shouldRestoreMultipleActiveStatefulTasks() throws Exception {
        StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask task2 = this.createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StreamTask task3 = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0})).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false).thenReturn((Object)false).thenReturn((Object)false).thenReturn((Object)true);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        this.verifyRestoredActiveTasks(task3, task1, task2);
        this.verifyCheckpointTasks(true, new Task[]{task3, task1, task2});
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)3))).enforceRestoreActive();
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.atLeast((int)4))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.never())).transitToUpdateStandby();
    }

    @Test
    public void shouldDrainRestoredActiveTasks() throws Exception {
        Assertions.assertTrue((boolean)this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
        StreamTask task1 = this.createStatelessTaskInStateRestoring(TASK_0_0);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.verifyDrainingRestoredActiveTasks(task1);
        StreamTask task2 = this.createStatelessTaskInStateRestoring(TASK_1_1);
        StreamTask task3 = this.createStatelessTaskInStateRestoring(TASK_1_0);
        StreamTask task4 = this.createStatelessTaskInStateRestoring(TASK_0_2);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        this.stateUpdater.add((Task)task4);
        this.verifyDrainingRestoredActiveTasks(task2, task3, task4);
    }

    @Test
    public void shouldUpdateSingleStandbyTask() throws Exception {
        StandbyTask task = this.createStandbyTaskInStateRunning(TASK_0_0, Arrays.asList(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
        this.shouldUpdateStandbyTasks(task);
    }

    @Test
    public void shouldUpdateMultipleStandbyTasks() throws Exception {
        StandbyTask task1 = this.createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StandbyTask task2 = this.createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask task3 = this.createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        this.shouldUpdateStandbyTasks(task1, task2, task3);
    }

    private void shouldUpdateStandbyTasks(StandbyTask ... tasks) throws Exception {
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        for (StandbyTask task : tasks) {
            this.stateUpdater.add((Task)task);
        }
        this.verifyUpdatingStandbyTasks(tasks);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)1))).transitToUpdateStandby();
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.timeout((long)15000L).atLeast(1))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.never())).enforceRestoreActive();
    }

    @Test
    public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception {
        StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask task2 = this.createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask task3 = this.createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        StandbyTask task4 = this.createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        this.stateUpdater.add((Task)task4);
        this.verifyRestoredActiveTasks(task2, task1);
        this.verifyCheckpointTasks(true, new Task[]{task2, task1});
        this.verifyUpdatingStandbyTasks(task4, task3);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.atLeast((int)3))).restore(ArgumentMatchers.anyMap());
        InOrder orderVerifier = Mockito.inOrder((Object[])new Object[]{this.changelogReader, task1, task2});
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)2))).enforceRestoreActive();
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).transitToUpdateStandby();
    }

    @Test
    public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception {
        StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StandbyTask task2 = this.createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        StreamTask task3 = this.createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.verifyRestoredActiveTasks(task1);
        this.verifyCheckpointTasks(true, new Task[]{task1});
        this.verifyUpdatingStandbyTasks(task2);
        InOrder orderVerifier = Mockito.inOrder((Object[])new Object[]{this.changelogReader});
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).enforceRestoreActive();
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).transitToUpdateStandby();
        this.stateUpdater.add((Task)task3);
        this.verifyRestoredActiveTasks(task1, task3);
        this.verifyCheckpointTasks(true, new Task[]{task3});
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).enforceRestoreActive();
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).transitToUpdateStandby();
    }

    @Test
    public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception {
        StreamTask activeTask1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask activeTask2 = this.createActiveStatefulTaskInStateRestoring(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask standbyTask = this.createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Utils.mkSet((Object[])new TaskId[]{activeTask1.id(), activeTask2.id()}));
        Map updatingTasks1 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTask1.id(), (Object)activeTask1), Utils.mkEntry((Object)activeTask2.id(), (Object)activeTask2), Utils.mkEntry((Object)standbyTask.id(), (Object)standbyTask)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{taskCorruptedException}).doNothing().when((Object)this.changelogReader)).restore(updatingTasks1);
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)activeTask2);
        this.stateUpdater.add((Task)standbyTask);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{activeTask1, activeTask2}), (RuntimeException)taskCorruptedException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        InOrder orderVerifier = Mockito.inOrder((Object[])new Object[]{this.changelogReader});
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.atLeast((int)1))).enforceRestoreActive();
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).transitToUpdateStandby();
    }

    @Test
    public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
        StreamTask activeTask1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask activeTask2 = this.createActiveStatefulTaskInStateRestoring(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask standbyTask = this.createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)activeTask2);
        this.stateUpdater.add((Task)standbyTask);
        this.verifyUpdatingTasks(new Task[]{activeTask1, activeTask2, standbyTask});
        this.stateUpdater.remove(activeTask1.id());
        this.stateUpdater.remove(activeTask2.id());
        this.verifyRemovedTasks(new Task[]{activeTask1, activeTask2});
        InOrder orderVerifier = Mockito.inOrder((Object[])new Object[]{this.changelogReader});
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.atLeast((int)1))).enforceRestoreActive();
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).transitToUpdateStandby();
    }

    @Test
    public void shouldRemoveActiveStatefulTask() throws Exception {
        StreamTask task = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        this.shouldRemoveStatefulTask((Task)task);
    }

    @Test
    public void shouldRemoveStandbyTask() throws Exception {
        StandbyTask task = this.createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        this.shouldRemoveStatefulTask((Task)task);
    }

    private void shouldRemoveStatefulTask(Task task) throws Exception {
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.remove(task.id());
        this.verifyRemovedTasks(task);
        this.verifyCheckpointTasks(true, task);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).unregister(task.changelogPartitions());
    }

    @Test
    public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
        StreamTask task = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        this.shouldNotRemoveTaskFromRestoredActiveTasks(task);
    }

    @Test
    public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
        StreamTask task = this.createStatelessTaskInStateRestoring(TASK_0_0);
        this.shouldNotRemoveTaskFromRestoredActiveTasks(task);
    }

    private void shouldNotRemoveTaskFromRestoredActiveTasks(StreamTask task) throws Exception {
        StreamTask controlTask = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.stateUpdater.add((Task)controlTask);
        this.verifyRestoredActiveTasks(task);
        this.stateUpdater.remove(task.id());
        this.stateUpdater.remove(controlTask.id());
        this.verifyRemovedTasks(new Task[]{controlTask});
        this.verifyRestoredActiveTasks(task);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
    }

    @Test
    public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
        StreamTask task = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        this.shouldNotRemoveTaskFromFailedTasks((Task)task);
    }

    @Test
    public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
        StandbyTask task = this.createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        this.shouldNotRemoveTaskFromFailedTasks((Task)task);
    }

    private void shouldNotRemoveTaskFromFailedTasks(Task task) throws Exception {
        StreamTask controlTask = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StreamsException streamsException = new StreamsException("Something happened", task.id());
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task.id(), (Object)task), Utils.mkEntry((Object)controlTask.id(), (Object)controlTask)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException}).doNothing().when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.add((Task)controlTask);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task}), (RuntimeException)((Object)streamsException));
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.stateUpdater.remove(task.id());
        this.stateUpdater.remove(controlTask.id());
        this.verifyRemovedTasks(new Task[]{controlTask});
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
    }

    @Test
    public void shouldDrainRemovedTasks() throws Exception {
        Assertions.assertTrue((boolean)this.stateUpdater.drainRemovedTasks().isEmpty());
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.remove(task1.id());
        this.verifyDrainingRemovedTasks(new Task[]{task1});
        StreamTask task2 = this.createActiveStatefulTaskInStateRestoring(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_C_0));
        StreamTask task3 = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask task4 = this.createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_D_0));
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.remove(task2.id());
        this.stateUpdater.add((Task)task3);
        this.stateUpdater.remove(task3.id());
        this.stateUpdater.add((Task)task4);
        this.stateUpdater.remove(task4.id());
        this.verifyDrainingRemovedTasks(new Task[]{task2, task3, task4});
    }

    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
        StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StandbyTask task2 = this.createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        String exceptionMessage = "The Streams were crossed!";
        StreamsException streamsException = new StreamsException("The Streams were crossed!");
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1), Utils.mkEntry((Object)task2.id(), (Object)task2)});
        ((ChangelogReader)Mockito.doNothing().doThrow(new Throwable[]{streamsException}).when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1, task2}), (RuntimeException)((Object)streamsException));
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
    }

    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception {
        StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask task2 = this.createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask task3 = this.createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        String exceptionMessage = "The Streams were crossed!";
        StreamsException streamsException1 = new StreamsException("The Streams were crossed!", task1.id());
        StreamsException streamsException2 = new StreamsException("The Streams were crossed!", task3.id());
        Map updatingTasksBeforeFirstThrow = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1), Utils.mkEntry((Object)task2.id(), (Object)task2), Utils.mkEntry((Object)task3.id(), (Object)task3)});
        Map updatingTasksBeforeSecondThrow = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task2.id(), (Object)task2), Utils.mkEntry((Object)task3.id(), (Object)task3)});
        ((ChangelogReader)Mockito.doNothing().doThrow(new Throwable[]{streamsException1}).when((Object)this.changelogReader)).restore(updatingTasksBeforeFirstThrow);
        ((ChangelogReader)Mockito.doNothing().doThrow(new Throwable[]{streamsException2}).when((Object)this.changelogReader)).restore(updatingTasksBeforeSecondThrow);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks1 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1}), (RuntimeException)((Object)streamsException1));
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks2 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task3}), (RuntimeException)((Object)streamsException2));
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks1, expectedExceptionAndTasks2);
        this.verifyUpdatingTasks(new Task[]{task2});
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyRemovedTasks(new Task[0]);
    }

    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws Exception {
        StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StandbyTask task2 = this.createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StreamTask task3 = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        Set expectedTaskIds = Utils.mkSet((Object[])new TaskId[]{task1.id(), task2.id()});
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(expectedTaskIds);
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1), Utils.mkEntry((Object)task2.id(), (Object)task2), Utils.mkEntry((Object)task3.id(), (Object)task3)});
        ((ChangelogReader)Mockito.doNothing().doThrow(new Throwable[]{taskCorruptedException}).doNothing().when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1, task2}), (RuntimeException)taskCorruptedException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyUpdatingTasks(new Task[]{task3});
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyRemovedTasks(new Task[0]);
    }

    @Test
    public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception {
        StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StandbyTask task2 = this.createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!");
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1), Utils.mkEntry((Object)task2.id(), (Object)task2)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{illegalStateException}).when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1, task2}), (RuntimeException)illegalStateException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyRemovedTasks(new Task[0]);
    }

    @Test
    public void shouldDrainFailedTasksAndExceptions() throws Exception {
        Assertions.assertTrue((boolean)this.stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
        StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StreamTask task2 = this.createActiveStatefulTaskInStateRestoring(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_C_0));
        StreamTask task3 = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask task4 = this.createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_D_0));
        String exceptionMessage = "The Streams were crossed!";
        StreamsException streamsException1 = new StreamsException("The Streams were crossed!", task1.id());
        Map updatingTasks1 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException1}).when((Object)this.changelogReader)).restore(updatingTasks1);
        StreamsException streamsException2 = new StreamsException("The Streams were crossed!", task2.id());
        StreamsException streamsException3 = new StreamsException("The Streams were crossed!", task3.id());
        StreamsException streamsException4 = new StreamsException("The Streams were crossed!", task4.id());
        Map updatingTasks2 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task2.id(), (Object)task2), Utils.mkEntry((Object)task3.id(), (Object)task3), Utils.mkEntry((Object)task4.id(), (Object)task4)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException2}).when((Object)this.changelogReader)).restore(updatingTasks2);
        Map updatingTasks3 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task3.id(), (Object)task3), Utils.mkEntry((Object)task4.id(), (Object)task4)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException3}).when((Object)this.changelogReader)).restore(updatingTasks3);
        Map updatingTasks4 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task4.id(), (Object)task4)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException4}).when((Object)this.changelogReader)).restore(updatingTasks4);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks1 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1}), (RuntimeException)((Object)streamsException1));
        this.verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        this.stateUpdater.add((Task)task4);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks2 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task2}), (RuntimeException)((Object)streamsException2));
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks3 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task3}), (RuntimeException)((Object)streamsException3));
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks4 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task4}), (RuntimeException)((Object)streamsException4));
        this.verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4);
    }

    @Test
    public void shouldAutoCheckpointTasksOnInterval() throws Exception {
        StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask task2 = this.createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask task3 = this.createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
        StandbyTask task4 = this.createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        this.stateUpdater.add((Task)task4);
        this.verifyUpdatingTasks(new Task[]{task1, task2, task3, task4});
        this.time.sleep(101L);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyCheckpointTasks(false, new Task[]{task1, task2, task3, task4});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
        MockTime time = new MockTime();
        DefaultStateUpdater stateUpdater = new DefaultStateUpdater(this.config, this.changelogReader, (Time)time);
        try {
            StreamTask task1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
            StreamTask task2 = this.createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
            StandbyTask task3 = this.createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
            StandbyTask task4 = this.createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
            Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
            Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
            stateUpdater.start();
            stateUpdater.add((Task)task1);
            stateUpdater.add((Task)task2);
            stateUpdater.add((Task)task3);
            stateUpdater.add((Task)task4);
            time.sleep(100L);
            this.verifyNeverCheckpointTasks(new Task[]{task1, task2, task3, task4});
        }
        finally {
            stateUpdater.shutdown(Duration.ofMinutes(1L));
        }
    }

    private void verifyCheckpointTasks(boolean enforceCheckpoint, Task ... tasks) {
        for (Task task : tasks) {
            ((Task)Mockito.verify((Object)task, (VerificationMode)Mockito.timeout((long)15000L).atLeast(1))).maybeCheckpoint(enforceCheckpoint);
        }
    }

    private void verifyNeverCheckpointTasks(Task ... tasks) {
        for (Task task : tasks) {
            ((Task)Mockito.verify((Object)task, (VerificationMode)Mockito.never())).maybeCheckpoint(EasyMock.anyBoolean());
        }
    }

    @Test
    public void shouldGetTasksFromInputQueue() {
        this.stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
        StreamTask activeTask1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask activeTask2 = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask standbyTask1 = this.createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_C_0));
        StandbyTask standbyTask2 = this.createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        StandbyTask standbyTask3 = this.createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)standbyTask1);
        this.stateUpdater.add((Task)standbyTask2);
        this.stateUpdater.remove(TASK_0_0);
        this.stateUpdater.add((Task)activeTask2);
        this.stateUpdater.add((Task)standbyTask3);
        Set tasks = this.stateUpdater.getTasks();
        Assertions.assertEquals((int)5, (int)tasks.size());
        Assertions.assertTrue((boolean)tasks.containsAll(Utils.mkSet((Object[])new AbstractTask[]{activeTask1, activeTask2, standbyTask1, standbyTask2, standbyTask3})));
        Set activeTasks = this.stateUpdater.getActiveTasks();
        Assertions.assertEquals((int)2, (int)activeTasks.size());
        Assertions.assertTrue((boolean)activeTasks.containsAll(Utils.mkSet((Object[])new StreamTask[]{activeTask1, activeTask2})));
        Set standbyTasks = this.stateUpdater.getStandbyTasks();
        Assertions.assertEquals((int)3, (int)standbyTasks.size());
        Assertions.assertTrue((boolean)standbyTasks.containsAll(Utils.mkSet((Object[])new StandbyTask[]{standbyTask1, standbyTask2, standbyTask3})));
    }

    @Test
    public void shouldGetTasksFromUpdatingTasks() throws Exception {
        StreamTask activeTask1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask activeTask2 = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask standbyTask1 = this.createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_C_0));
        StandbyTask standbyTask2 = this.createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        StandbyTask standbyTask3 = this.createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)standbyTask1);
        this.stateUpdater.add((Task)standbyTask2);
        this.stateUpdater.add((Task)activeTask2);
        this.stateUpdater.add((Task)standbyTask3);
        this.verifyUpdatingTasks(new Task[]{activeTask1, activeTask2, standbyTask1, standbyTask2, standbyTask3});
        Set tasks = this.stateUpdater.getTasks();
        Assertions.assertEquals((int)5, (int)tasks.size());
        Assertions.assertTrue((boolean)tasks.containsAll(Utils.mkSet((Object[])new AbstractTask[]{activeTask1, activeTask2, standbyTask1, standbyTask2, standbyTask3})));
        Set activeTasks = this.stateUpdater.getActiveTasks();
        Assertions.assertEquals((int)2, (int)activeTasks.size());
        Assertions.assertTrue((boolean)activeTasks.containsAll(Utils.mkSet((Object[])new StreamTask[]{activeTask1, activeTask2})));
        Set standbyTasks = this.stateUpdater.getStandbyTasks();
        Assertions.assertEquals((int)3, (int)standbyTasks.size());
        Assertions.assertTrue((boolean)standbyTasks.containsAll(Utils.mkSet((Object[])new StandbyTask[]{standbyTask1, standbyTask2, standbyTask3})));
    }

    @Test
    public void shouldGetTasksFromRestoredActiveTasks() throws Exception {
        StreamTask activeTask1 = this.createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
        StreamTask activeTask2 = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)activeTask2);
        this.verifyRestoredActiveTasks(activeTask1, activeTask2);
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[]{activeTask1, activeTask2}), Utils.mkSet((Object[])new StandbyTask[0]));
        this.stateUpdater.drainRestoredActiveTasks(Duration.ofMinutes(1L));
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[0]), Utils.mkSet((Object[])new StandbyTask[0]));
    }

    @Test
    public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception {
        StreamTask activeTask1 = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask standbyTask2 = this.createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        StandbyTask standbyTask1 = this.createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Utils.mkSet((Object[])new TaskId[]{standbyTask1.id(), standbyTask2.id()}));
        StreamsException streamsException = new StreamsException("The Streams were crossed!", activeTask1.id());
        Map updatingTasks1 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTask1.id(), (Object)activeTask1), Utils.mkEntry((Object)standbyTask1.id(), (Object)standbyTask1), Utils.mkEntry((Object)standbyTask2.id(), (Object)standbyTask2)});
        ((ChangelogReader)Mockito.doNothing().doThrow(new Throwable[]{taskCorruptedException}).doNothing().when((Object)this.changelogReader)).restore(updatingTasks1);
        Map updatingTasks2 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTask1.id(), (Object)activeTask1)});
        ((ChangelogReader)Mockito.doNothing().doThrow(new Throwable[]{streamsException}).doNothing().when((Object)this.changelogReader)).restore(updatingTasks2);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)standbyTask1);
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)standbyTask2);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks1 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{standbyTask1, standbyTask2}), (RuntimeException)taskCorruptedException);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks2 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{activeTask1}), (RuntimeException)((Object)streamsException));
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks1, expectedExceptionAndTasks2);
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[]{activeTask1}), Utils.mkSet((Object[])new StandbyTask[]{standbyTask1, standbyTask2}));
        this.stateUpdater.drainExceptionsAndFailedTasks();
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[0]), Utils.mkSet((Object[])new StandbyTask[0]));
    }

    @Test
    public void shouldGetTasksFromRemovedTasks() throws Exception {
        StreamTask activeTask = this.createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
        StandbyTask standbyTask2 = this.createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
        StandbyTask standbyTask1 = this.createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)standbyTask1);
        this.stateUpdater.add((Task)activeTask);
        this.stateUpdater.add((Task)standbyTask2);
        this.stateUpdater.remove(standbyTask1.id());
        this.stateUpdater.remove(standbyTask2.id());
        this.stateUpdater.remove(activeTask.id());
        this.verifyRemovedTasks(new Task[]{activeTask, standbyTask1, standbyTask2});
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[]{activeTask}), Utils.mkSet((Object[])new StandbyTask[]{standbyTask1, standbyTask2}));
        this.stateUpdater.drainRemovedTasks();
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[0]), Utils.mkSet((Object[])new StandbyTask[0]));
    }

    private void verifyGetTasks(Set<StreamTask> expectedActiveTasks, Set<StandbyTask> expectedStandbyTasks) {
        Set tasks = this.stateUpdater.getTasks();
        HashSet<StreamTask> expectedTasks = new HashSet<StreamTask>(expectedActiveTasks);
        expectedTasks.addAll(expectedStandbyTasks);
        Assertions.assertEquals((int)(expectedActiveTasks.size() + expectedStandbyTasks.size()), (int)tasks.size());
        Assertions.assertTrue((boolean)tasks.containsAll(expectedTasks));
        Set activeTasks = this.stateUpdater.getActiveTasks();
        Assertions.assertEquals((int)expectedActiveTasks.size(), (int)activeTasks.size());
        Assertions.assertTrue((boolean)activeTasks.containsAll(expectedActiveTasks));
        Set standbyTasks = this.stateUpdater.getStandbyTasks();
        Assertions.assertEquals((int)expectedStandbyTasks.size(), (int)standbyTasks.size());
        Assertions.assertTrue((boolean)standbyTasks.containsAll(expectedStandbyTasks));
    }

    private void verifyRestoredActiveTasks(StreamTask ... tasks) throws Exception {
        if (tasks.length == 0) {
            Assertions.assertTrue((boolean)this.stateUpdater.getRestoredActiveTasks().isEmpty());
        } else {
            Set expectedRestoredTasks = Utils.mkSet((Object[])tasks);
            HashSet restoredTasks = new HashSet();
            TestUtils.waitForCondition(() -> {
                restoredTasks.addAll(this.stateUpdater.getRestoredActiveTasks());
                return restoredTasks.containsAll(expectedRestoredTasks) && restoredTasks.size() == expectedRestoredTasks.size();
            }, (long)15000L, (String)"Did not get all restored active task within the given timeout!");
        }
    }

    private void verifyDrainingRestoredActiveTasks(StreamTask ... tasks) throws Exception {
        Set expectedRestoredTasks = Utils.mkSet((Object[])tasks);
        HashSet restoredTasks = new HashSet();
        TestUtils.waitForCondition(() -> {
            restoredTasks.addAll(this.stateUpdater.drainRestoredActiveTasks(Duration.ofMillis(1000L)));
            return restoredTasks.containsAll(expectedRestoredTasks) && restoredTasks.size() == expectedRestoredTasks.size();
        }, (long)15000L, (String)"Did not get all restored active task within the given timeout!");
        Assertions.assertTrue((boolean)this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
    }

    private void verifyUpdatingTasks(Task ... tasks) throws Exception {
        if (tasks.length == 0) {
            Assertions.assertTrue((boolean)this.stateUpdater.getUpdatingTasks().isEmpty());
        } else {
            Set expectedUpdatingTasks = Utils.mkSet((Object[])tasks);
            HashSet updatingTasks = new HashSet();
            TestUtils.waitForCondition(() -> {
                updatingTasks.addAll(this.stateUpdater.getUpdatingTasks());
                return updatingTasks.containsAll(expectedUpdatingTasks) && updatingTasks.size() == expectedUpdatingTasks.size();
            }, (long)15000L, (String)"Did not get all updating task within the given timeout!");
        }
    }

    private void verifyUpdatingStandbyTasks(StandbyTask ... tasks) throws Exception {
        Set expectedStandbyTasks = Utils.mkSet((Object[])tasks);
        HashSet standbyTasks = new HashSet();
        TestUtils.waitForCondition(() -> {
            standbyTasks.addAll(this.stateUpdater.getUpdatingStandbyTasks());
            return standbyTasks.containsAll(expectedStandbyTasks) && standbyTasks.size() == expectedStandbyTasks.size();
        }, (long)15000L, (String)"Did not see all standby task within the given timeout!");
    }

    private void verifyRemovedTasks(Task ... tasks) throws Exception {
        if (tasks.length == 0) {
            Assertions.assertTrue((boolean)this.stateUpdater.getRemovedTasks().isEmpty());
        } else {
            Set expectedRemovedTasks = Utils.mkSet((Object[])tasks);
            HashSet removedTasks = new HashSet();
            TestUtils.waitForCondition(() -> {
                removedTasks.addAll(this.stateUpdater.getRemovedTasks());
                return removedTasks.containsAll(expectedRemovedTasks) && removedTasks.size() == expectedRemovedTasks.size();
            }, (long)15000L, (String)"Did not get all removed task within the given timeout!");
        }
    }

    private void verifyDrainingRemovedTasks(Task ... tasks) throws Exception {
        Set expectedRemovedTasks = Utils.mkSet((Object[])tasks);
        HashSet removedTasks = new HashSet();
        TestUtils.waitForCondition(() -> {
            removedTasks.addAll(this.stateUpdater.drainRemovedTasks());
            return removedTasks.containsAll(Utils.mkSet((Object[])tasks)) && removedTasks.size() == expectedRemovedTasks.size();
        }, (long)15000L, (String)"Did not get all restored active task within the given timeout!");
        Assertions.assertTrue((boolean)this.stateUpdater.drainRemovedTasks().isEmpty());
    }

    private void verifyExceptionsAndFailedTasks(StateUpdater.ExceptionAndTasks ... exceptionsAndTasks) throws Exception {
        List<StateUpdater.ExceptionAndTasks> expectedExceptionAndTasks = Arrays.asList(exceptionsAndTasks);
        HashSet failedTasks = new HashSet();
        TestUtils.waitForCondition(() -> {
            failedTasks.addAll(this.stateUpdater.getExceptionsAndFailedTasks());
            return failedTasks.containsAll(expectedExceptionAndTasks) && failedTasks.size() == expectedExceptionAndTasks.size();
        }, (long)15000L, (String)"Did not get all exceptions and failed tasks within the given timeout!");
    }

    private void verifyDrainingExceptionsAndFailedTasks(StateUpdater.ExceptionAndTasks ... exceptionsAndTasks) throws Exception {
        List<StateUpdater.ExceptionAndTasks> expectedExceptionAndTasks = Arrays.asList(exceptionsAndTasks);
        ArrayList failedTasks = new ArrayList();
        TestUtils.waitForCondition(() -> {
            failedTasks.addAll(this.stateUpdater.drainExceptionsAndFailedTasks());
            return failedTasks.containsAll(expectedExceptionAndTasks) && failedTasks.size() == expectedExceptionAndTasks.size();
        }, (long)15000L, (String)"Did not get all exceptions and failed tasks within the given timeout!");
        Assertions.assertTrue((boolean)this.stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
    }

    private StreamTask createActiveStatefulTaskInStateRestoring(TaskId taskId, Collection<TopicPartition> changelogPartitions) {
        StreamTask task = this.createActiveStatefulTask(taskId, changelogPartitions);
        Mockito.when((Object)task.state()).thenReturn((Object)Task.State.RESTORING);
        return task;
    }

    private StreamTask createActiveStatefulTask(TaskId taskId, Collection<TopicPartition> changelogPartitions) {
        StreamTask task = (StreamTask)Mockito.mock(StreamTask.class);
        this.setupStatefulTask((Task)task, taskId, changelogPartitions);
        Mockito.when((Object)task.isActive()).thenReturn((Object)true);
        return task;
    }

    private StreamTask createStatelessTaskInStateRestoring(TaskId taskId) {
        StreamTask task = this.createStatelessTask(taskId);
        Mockito.when((Object)task.state()).thenReturn((Object)Task.State.RESTORING);
        return task;
    }

    private StreamTask createStatelessTask(TaskId taskId) {
        StreamTask task = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)task.changelogPartitions()).thenReturn(Collections.emptySet());
        Mockito.when((Object)task.isActive()).thenReturn((Object)true);
        Mockito.when((Object)task.id()).thenReturn((Object)taskId);
        return task;
    }

    private StandbyTask createStandbyTaskInStateRunning(TaskId taskId, Collection<TopicPartition> changelogPartitions) {
        StandbyTask task = this.createStandbyTask(taskId, changelogPartitions);
        Mockito.when((Object)task.state()).thenReturn((Object)Task.State.RUNNING);
        return task;
    }

    private StandbyTask createStandbyTask(TaskId taskId, Collection<TopicPartition> changelogPartitions) {
        StandbyTask task = (StandbyTask)Mockito.mock(StandbyTask.class);
        this.setupStatefulTask((Task)task, taskId, changelogPartitions);
        Mockito.when((Object)task.isActive()).thenReturn((Object)false);
        return task;
    }

    private void setupStatefulTask(Task task, TaskId taskId, Collection<TopicPartition> changelogPartitions) {
        Mockito.when((Object)task.changelogPartitions()).thenReturn(changelogPartitions);
        Mockito.when((Object)task.id()).thenReturn((Object)taskId);
    }
}

