/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AssignedTasks;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.RestoringTasks;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.ThreadMetadataProvider;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=EasyMockRunner.class)
public class TaskManagerTest {
    private final TaskId taskId0 = new TaskId(0, 0);
    private final TopicPartition t1p0 = new TopicPartition("t1", 0);
    private final Set<TopicPartition> taskId0Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0});
    private final Map<TaskId, Set<TopicPartition>> taskId0Assignment = Collections.singletonMap(this.taskId0, this.taskId0Partitions);
    @Mock(type=MockType.NICE)
    private ChangelogReader changeLogReader;
    @Mock(type=MockType.NICE)
    private Consumer<byte[], byte[]> restoreConsumer;
    @Mock(type=MockType.NICE)
    private Consumer<byte[], byte[]> consumer;
    @Mock(type=MockType.NICE)
    private StreamThread.AbstractTaskCreator activeTaskCreator;
    @Mock(type=MockType.NICE)
    private StreamThread.AbstractTaskCreator standbyTaskCreator;
    @Mock(type=MockType.NICE)
    private ThreadMetadataProvider threadMetadataProvider;
    @Mock(type=MockType.NICE)
    private Task firstTask;
    @Mock(type=MockType.NICE)
    private AssignedTasks active;
    @Mock(type=MockType.NICE)
    private AssignedTasks standby;
    private TaskManager taskManager;

    @Before
    public void setUp() throws Exception {
        this.taskManager = new TaskManager(this.changeLogReader, "", this.restoreConsumer, this.activeTaskCreator, this.standbyTaskCreator, this.active, this.standby);
        this.taskManager.setThreadMetadataProvider(this.threadMetadataProvider);
        this.taskManager.setConsumer(this.consumer);
    }

    private void replay() {
        EasyMock.replay((Object[])new Object[]{this.changeLogReader, this.restoreConsumer, this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.threadMetadataProvider, this.active, this.standby});
    }

    @Test
    public void shouldCloseActiveUnAssignedSuspendedTasksWhenCreatingNewTasks() {
        this.mockSingleActiveTask();
        this.active.closeNonAssignedSuspendedTasks(this.taskId0Assignment);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldCloseStandbyUnAssignedSuspendedTasksWhenCreatingNewTasks() {
        this.mockSingleActiveTask();
        this.standby.closeNonAssignedSuspendedTasks(this.taskId0Assignment);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldResetChangeLogReaderOnCreateTasks() {
        this.mockSingleActiveTask();
        this.changeLogReader.reset();
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.changeLogReader});
    }

    @Test
    public void shouldAddNonResumedActiveTasks() {
        this.mockSingleActiveTask();
        EasyMock.expect((Object)this.active.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions)).andReturn((Object)false);
        this.active.addNewTask((Task)EasyMock.same((Object)this.firstTask));
        this.replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.active});
    }

    @Test
    public void shouldNotAddResumedActiveTasks() {
        EasyMock.checkOrder((Object)this.active, (boolean)true);
        this.mockThreadMetadataProvider(Collections.emptyMap(), this.taskId0Assignment);
        EasyMock.expect((Object)this.active.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions)).andReturn((Object)true);
        this.replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.active, this.activeTaskCreator});
    }

    @Test
    public void shouldAddNonResumedStandbyTasks() {
        this.mockStandbyTaskExpectations();
        EasyMock.expect((Object)this.standby.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions)).andReturn((Object)false);
        this.standby.addNewTask((Task)EasyMock.same((Object)this.firstTask));
        this.replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.standbyTaskCreator, this.active});
    }

    @Test
    public void shouldNotAddResumedStandbyTasks() {
        EasyMock.checkOrder((Object)this.active, (boolean)true);
        this.mockThreadMetadataProvider(this.taskId0Assignment, Collections.emptyMap());
        EasyMock.expect((Object)this.standby.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions)).andReturn((Object)true);
        this.replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.standby, this.standbyTaskCreator});
    }

    @Test
    public void shouldPauseActiveUninitializedPartitions() {
        this.mockSingleActiveTask();
        EasyMock.expect((Object)this.active.uninitializedPartitions()).andReturn(this.taskId0Partitions);
        this.consumer.pause(this.taskId0Partitions);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldSuspendActiveTasks() {
        EasyMock.expect((Object)this.active.suspend()).andReturn(null);
        this.replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldSuspendStandbyTasks() {
        EasyMock.expect((Object)this.standby.suspend()).andReturn(null);
        this.replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify((Object[])new Object[]{this.standby});
    }

    @Test
    public void shouldUnassignChangelogPartitionsOnSuspend() {
        this.restoreConsumer.assign(Collections.emptyList());
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldThrowStreamsExceptionAtEndIfExceptionDuringSuspend() {
        EasyMock.expect((Object)this.active.suspend()).andReturn((Object)new RuntimeException(""));
        EasyMock.expect((Object)this.standby.suspend()).andReturn((Object)new RuntimeException(""));
        EasyMock.expectLastCall();
        this.restoreConsumer.assign(Collections.emptyList());
        this.replay();
        try {
            this.taskManager.suspendTasksAndState();
            Assert.fail((String)"Should have thrown streams exception");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer, this.active, this.standby});
    }

    @Test
    public void shouldCloseActiveTasksOnShutdown() {
        this.active.close(true);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.shutdown(true);
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldCloseStandbyTasksOnShutdown() {
        this.standby.close(false);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.shutdown(false);
        EasyMock.verify((Object[])new Object[]{this.standby});
    }

    @Test
    public void shouldUnassignChangelogPartitionsOnShutdown() {
        this.restoreConsumer.assign(Collections.emptyList());
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.shutdown(true);
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldCloseThreadMetadataProviderOnShutdown() {
        this.threadMetadataProvider.close();
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.shutdown(true);
        EasyMock.verify((Object[])new Object[]{this.threadMetadataProvider});
    }

    @Test
    public void shouldNotPropagateExceptionsOnShutdown() {
        this.threadMetadataProvider.close();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException());
        this.replay();
        this.taskManager.shutdown(false);
    }

    @Test
    public void shouldInitializeNewActiveTasks() {
        EasyMock.expect((Object)this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect((Object)this.active.updateRestored((Collection)EasyMock.anyObject())).andReturn(Collections.emptySet());
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldInitializeNewStandbyTasks() {
        EasyMock.expect((Object)this.standby.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect((Object)this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect((Object)this.active.updateRestored((Collection)EasyMock.anyObject())).andReturn(Collections.emptySet());
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.standby});
    }

    @Test
    public void shouldRestoreStateFromChangeLogReader() {
        EasyMock.expect((Object)this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect((Object)this.changeLogReader.restore((RestoringTasks)this.active)).andReturn(this.taskId0Partitions);
        EasyMock.expect((Object)this.active.updateRestored(this.taskId0Partitions)).andReturn(Collections.emptySet());
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.changeLogReader, this.active});
    }

    @Test
    public void shouldResumeRestoredPartitions() {
        EasyMock.expect((Object)this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect((Object)this.changeLogReader.restore((RestoringTasks)this.active)).andReturn(this.taskId0Partitions);
        EasyMock.expect((Object)this.active.updateRestored(this.taskId0Partitions)).andReturn(this.taskId0Partitions);
        this.consumer.resume(this.taskId0Partitions);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldAssignStandbyPartitionsWhenAllActiveTasksAreRunning() {
        this.mockAssignStandbyPartitions(1L);
        this.replay();
        Assert.assertTrue((boolean)this.taskManager.updateNewAndRestoringTasks());
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
        EasyMock.expect((Object)this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect((Object)this.active.allTasksRunning()).andReturn((Object)false);
        EasyMock.expect((Object)this.active.updateRestored((Collection)EasyMock.anyObject())).andReturn(Collections.emptySet());
        this.replay();
        Assert.assertFalse((boolean)this.taskManager.updateNewAndRestoringTasks());
    }

    @Test
    public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() {
        this.mockAssignStandbyPartitions(1L);
        this.restoreConsumer.seek(this.t1p0, 1L);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldSeekToBeginningIfOffsetIsLessThan0() {
        this.mockAssignStandbyPartitions(-1L);
        this.restoreConsumer.seekToBeginning(this.taskId0Partitions);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldCommitActiveAndStandbyTasks() {
        EasyMock.expect((Object)this.active.commit()).andReturn((Object)1);
        EasyMock.expect((Object)this.standby.commit()).andReturn((Object)2);
        this.replay();
        MatcherAssert.assertThat((Object)this.taskManager.commitAll(), (Matcher)IsEqual.equalTo((Object)3));
        EasyMock.verify((Object[])new Object[]{this.active, this.standby});
    }

    @Test
    public void shouldPropagateExceptionFromActiveCommit() {
        EasyMock.checkOrder((Object)this.standby, (boolean)true);
        this.active.commit();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException(""));
        this.replay();
        try {
            this.taskManager.commitAll();
            Assert.fail((String)"should have thrown first exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        EasyMock.verify((Object[])new Object[]{this.active, this.standby});
    }

    @Test
    public void shouldPropagateExceptionFromStandbyCommit() {
        EasyMock.expect((Object)this.standby.commit()).andThrow((Throwable)new RuntimeException(""));
        this.replay();
        try {
            this.taskManager.commitAll();
            Assert.fail((String)"should have thrown exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        EasyMock.verify((Object[])new Object[]{this.standby});
    }

    @Test
    public void shouldMaybeCommitActiveTasks() {
        EasyMock.expect((Object)this.active.maybeCommit()).andReturn((Object)5);
        this.replay();
        MatcherAssert.assertThat((Object)this.taskManager.maybeCommitActiveTasks(), (Matcher)IsEqual.equalTo((Object)5));
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldProcessActiveTasks() {
        EasyMock.expect((Object)this.active.process()).andReturn((Object)10);
        this.replay();
        MatcherAssert.assertThat((Object)this.taskManager.process(), (Matcher)IsEqual.equalTo((Object)10));
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldPunctuateActiveTasks() {
        EasyMock.expect((Object)this.active.punctuate()).andReturn((Object)20);
        this.replay();
        MatcherAssert.assertThat((Object)this.taskManager.punctuate(), (Matcher)IsEqual.equalTo((Object)20));
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldResumeConsumptionOfInitializedPartitions() {
        Set<TopicPartition> resumed = Collections.singleton(new TopicPartition("topic", 0));
        EasyMock.expect((Object)this.active.initializeNewTasks()).andReturn(resumed);
        EasyMock.expect((Object)this.active.updateRestored((Collection)EasyMock.anyObject())).andReturn(Collections.emptySet());
        this.consumer.resume(resumed);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.active, this.consumer});
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    private void mockAssignStandbyPartitions(long offset) {
        Task task = (Task)EasyMock.createNiceMock(Task.class);
        EasyMock.expect((Object)this.active.initializeNewTasks()).andReturn(new HashSet());
        EasyMock.expect((Object)this.active.allTasksRunning()).andReturn((Object)true);
        EasyMock.expect((Object)this.active.updateRestored((Collection)EasyMock.anyObject())).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.standby.running()).andReturn(Collections.singletonList(task));
        EasyMock.expect((Object)task.checkpointedOffsets()).andReturn(Collections.singletonMap(this.t1p0, offset));
        this.restoreConsumer.assign(this.taskId0Partitions);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{task});
    }

    private void mockStandbyTaskExpectations() {
        this.mockThreadMetadataProvider(this.taskId0Assignment, Collections.emptyMap());
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId0Assignment))).andReturn(Collections.singletonList(this.firstTask));
    }

    private void mockSingleActiveTask() {
        this.mockThreadMetadataProvider(Collections.emptyMap(), this.taskId0Assignment);
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(Consumer.class), (Map)EasyMock.eq(this.taskId0Assignment))).andReturn(Collections.singletonList(this.firstTask));
    }

    private void mockThreadMetadataProvider(Map<TaskId, Set<TopicPartition>> standbyAssignment, Map<TaskId, Set<TopicPartition>> activeAssignment) {
        EasyMock.expect((Object)this.threadMetadataProvider.standbyTasks()).andReturn(standbyAssignment).anyTimes();
        EasyMock.expect((Object)this.threadMetadataProvider.activeTasks()).andReturn(activeAssignment).anyTimes();
    }
}

