/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.AssignedTasks;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AssignedTasksTest {
    private final AssignedTasks<AbstractTask> assignedTasks = new AssignedTasks("log", "task", Time.SYSTEM);
    private final AbstractTask t1 = (AbstractTask)EasyMock.createMock(AbstractTask.class);
    private final AbstractTask t2 = (AbstractTask)EasyMock.createMock(AbstractTask.class);
    private final TopicPartition tp1 = new TopicPartition("t1", 0);
    private final TopicPartition tp2 = new TopicPartition("t2", 0);
    private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
    private final TopicPartition changeLog2 = new TopicPartition("cl2", 0);
    private final TaskId taskId1 = new TaskId(0, 0);
    private final TaskId taskId2 = new TaskId(1, 0);
    private final Metrics metrics = new Metrics();
    private final Sensor punctuateSensor = this.metrics.sensor("punctuate");
    private final Sensor commitSensor = this.metrics.sensor("commit");

    @Before
    public void before() {
        EasyMock.expect((Object)this.t1.id()).andReturn((Object)this.taskId1).anyTimes();
        EasyMock.expect((Object)this.t2.id()).andReturn((Object)this.taskId2).anyTimes();
    }

    @Test
    public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
        EasyMock.expect((Object)this.t1.hasStateStores()).andReturn((Object)true);
        EasyMock.expect((Object)this.t2.hasStateStores()).andReturn((Object)true);
        EasyMock.expect((Object)this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect((Object)this.t2.partitions()).andReturn(Collections.singleton(this.tp2)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.t1, this.t2});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.addNewTask(this.t2);
        Set partitions = this.assignedTasks.uninitializedPartitions();
        MatcherAssert.assertThat((Object)partitions, (Matcher)IsEqual.equalTo((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp1, this.tp2})));
        EasyMock.verify((Object[])new Object[]{this.t1, this.t2});
    }

    @Test
    public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
        EasyMock.expect((Object)this.t1.hasStateStores()).andReturn((Object)false);
        EasyMock.expect((Object)this.t2.hasStateStores()).andReturn((Object)false);
        EasyMock.expect((Object)this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect((Object)this.t2.partitions()).andReturn(Collections.singleton(this.tp2)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.t1, this.t2});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.addNewTask(this.t2);
        Set partitions = this.assignedTasks.uninitializedPartitions();
        Assert.assertTrue((boolean)partitions.isEmpty());
        EasyMock.verify((Object[])new Object[]{this.t1, this.t2});
    }

    @Test
    public void shouldInitializeNewTasks() {
        EasyMock.expect((Object)this.t1.initialize()).andReturn((Object)false);
        EasyMock.expect((Object)this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
        EasyMock.expect((Object)this.t1.initialize()).andReturn((Object)false);
        EasyMock.expect((Object)this.t2.initialize()).andReturn((Object)true);
        EasyMock.expect((Object)this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect((Object)this.t2.partitions()).andReturn(Collections.singleton(this.tp2)).anyTimes();
        EasyMock.expect((Object)this.t2.changelogPartitions()).andReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{this.t1, this.t2});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.addNewTask(this.t2);
        this.assignedTasks.initializeNewTasks();
        Collection restoring = this.assignedTasks.restoringTasks();
        MatcherAssert.assertThat((Object)restoring.size(), (Matcher)IsEqual.equalTo((Object)1));
        Assert.assertSame(restoring.iterator().next(), (Object)this.t1);
    }

    @Test
    public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
        EasyMock.expect((Object)this.t2.initialize()).andReturn((Object)true);
        EasyMock.expect((Object)this.t2.partitions()).andReturn(Collections.singleton(this.tp2)).anyTimes();
        EasyMock.expect((Object)this.t2.changelogPartitions()).andReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{this.t2});
        this.assignedTasks.addNewTask(this.t2);
        this.assignedTasks.initializeNewTasks();
        MatcherAssert.assertThat((Object)this.assignedTasks.runningTaskIds(), (Matcher)IsEqual.equalTo(Collections.singleton(this.taskId2)));
    }

    @Test
    public void shouldTransitionFullyRestoredTasksToRunning() {
        Set task1Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.tp1});
        EasyMock.expect((Object)this.t1.initialize()).andReturn((Object)false);
        EasyMock.expect((Object)this.t1.partitions()).andReturn((Object)task1Partitions).anyTimes();
        EasyMock.expect((Object)this.t1.changelogPartitions()).andReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{this.changeLog1, this.changeLog2})).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        Assert.assertTrue((boolean)this.assignedTasks.updateRestored((Collection)Utils.mkSet((Object[])new TopicPartition[]{this.changeLog1})).isEmpty());
        Set partitions = this.assignedTasks.updateRestored((Collection)Utils.mkSet((Object[])new TopicPartition[]{this.changeLog2}));
        MatcherAssert.assertThat((Object)partitions, (Matcher)IsEqual.equalTo((Object)task1Partitions));
        MatcherAssert.assertThat((Object)this.assignedTasks.runningTaskIds(), (Matcher)IsEqual.equalTo(Collections.singleton(this.taskId1)));
    }

    @Test
    public void shouldSuspendRunningTasks() {
        this.mockRunningTaskSuspension();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.suspendTask();
        MatcherAssert.assertThat((Object)this.assignedTasks.previousTaskIds(), (Matcher)IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldCloseUnInitializedTasksOnSuspend() {
        this.t1.close(false, false);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.suspend();
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldNotSuspendSuspendedTasks() {
        this.mockRunningTaskSuspension();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.suspendTask();
        this.assignedTasks.suspend();
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnSuspendWhenRuntimeException() {
        this.mockInitializedTask();
        this.t1.suspend();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("KABOOM!"));
        this.t1.close(false, false);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.t1});
        RuntimeException expectedException = this.suspendTask();
        MatcherAssert.assertThat((Object)expectedException, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.nullValue()));
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnSuspendWhenProducerFencedException() {
        this.mockInitializedTask();
        this.t1.suspend();
        EasyMock.expectLastCall().andThrow((Throwable)new ProducerFencedException("KABOOM!"));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.t1});
        MatcherAssert.assertThat((Object)this.suspendTask(), (Matcher)CoreMatchers.nullValue());
        Assert.assertTrue((boolean)this.assignedTasks.previousTaskIds().isEmpty());
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    private void mockInitializedTask() {
        EasyMock.expect((Object)this.t1.initialize()).andReturn((Object)true);
        EasyMock.expect((Object)this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect((Object)this.t1.changelogPartitions()).andReturn(Collections.emptyList());
    }

    @Test
    public void shouldResumeMatchingSuspendedTasks() {
        this.mockRunningTaskSuspension();
        this.t1.resume();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.suspendTask();
        Assert.assertTrue((boolean)this.assignedTasks.maybeResumeSuspendedTask(this.taskId1, Collections.singleton(this.tp1)));
        MatcherAssert.assertThat((Object)this.assignedTasks.runningTaskIds(), (Matcher)IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldCommitRunningTasks() {
        this.mockInitializedTask();
        this.t1.commit();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        this.assignedTasks.commit();
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnCommitIfProduceFencedException() {
        this.mockInitializedTask();
        this.t1.commit();
        EasyMock.expectLastCall().andThrow((Throwable)new ProducerFencedException(""));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        this.assignedTasks.commit();
        Assert.assertTrue((boolean)this.assignedTasks.runningTasks().isEmpty());
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldNotThrowCommitFailedExceptionOnCommit() {
        this.mockInitializedTask();
        this.t1.commit();
        EasyMock.expectLastCall().andThrow((Throwable)new CommitFailedException());
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        this.assignedTasks.commit();
        MatcherAssert.assertThat((Object)this.assignedTasks.runningTaskIds(), (Matcher)IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() {
        this.mockInitializedTask();
        this.t1.commit();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException(""));
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        try {
            this.assignedTasks.commit();
            Assert.fail((String)"Should have thrown exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        MatcherAssert.assertThat((Object)this.assignedTasks.runningTaskIds(), (Matcher)IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldProcessRunningTasks() {
        this.mockInitializedTask();
        EasyMock.expect((Object)this.t1.process()).andReturn((Object)true);
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        MatcherAssert.assertThat((Object)this.assignedTasks.process(), (Matcher)IsEqual.equalTo((Object)1));
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnProcessIfProducerFencedException() {
        this.mockInitializedTask();
        EasyMock.expect((Object)this.t1.process()).andThrow((Throwable)new ProducerFencedException(""));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        this.assignedTasks.process();
        Assert.assertTrue((boolean)this.assignedTasks.runningTasks().isEmpty());
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldThrowExceptionOnProcessWhenNotCommitFailedOrProducerFencedException() {
        this.mockInitializedTask();
        EasyMock.expect((Object)this.t1.process()).andThrow((Throwable)new RuntimeException(""));
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        try {
            this.assignedTasks.process();
            Assert.fail((String)"should have thrown exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        MatcherAssert.assertThat((Object)this.assignedTasks.runningTaskIds(), (Matcher)IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldPunctuateRunningTasks() {
        this.mockInitializedTask();
        EasyMock.expect((Object)this.t1.maybePunctuate()).andReturn((Object)true);
        EasyMock.expect((Object)this.t1.commitNeeded()).andReturn((Object)false);
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        this.assignedTasks.punctuateAndCommit(this.commitSensor, this.punctuateSensor);
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldCommitRunningTasksIfNeeded() {
        this.mockInitializedTask();
        EasyMock.expect((Object)this.t1.maybePunctuate()).andReturn((Object)true);
        EasyMock.expect((Object)this.t1.commitNeeded()).andReturn((Object)true);
        this.t1.commit();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        this.assignedTasks.punctuateAndCommit(this.commitSensor, this.punctuateSensor);
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldThrowExceptionOnPunctuateAndCommitWhenNotCommitFailedOrProducerFencedException() {
        this.mockInitializedTask();
        EasyMock.expect((Object)this.t1.maybePunctuate()).andThrow((Throwable)new RuntimeException(""));
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        try {
            this.assignedTasks.punctuateAndCommit(this.commitSensor, this.punctuateSensor);
            Assert.fail((String)"should have thrown exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        MatcherAssert.assertThat((Object)this.assignedTasks.runningTaskIds(), (Matcher)IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnPunctuateAndCommitIfProducerFencedException() {
        this.mockInitializedTask();
        EasyMock.expect((Object)this.t1.maybePunctuate()).andThrow((Throwable)new ProducerFencedException(""));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        this.assignedTasks.punctuateAndCommit(this.commitSensor, this.punctuateSensor);
        Assert.assertTrue((boolean)this.assignedTasks.runningTasks().isEmpty());
        EasyMock.verify((Object[])new Object[]{this.t1});
    }

    private RuntimeException suspendTask() {
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
        return this.assignedTasks.suspend();
    }

    private void mockRunningTaskSuspension() {
        EasyMock.expect((Object)this.t1.initialize()).andReturn((Object)true);
        EasyMock.expect((Object)this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect((Object)this.t1.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
        this.t1.suspend();
        EasyMock.expectLastCall();
    }
}

