/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.tasks;

import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskExecutor;
import org.apache.kafka.streams.processor.internals.tasks.TaskExecutor;
import org.apache.kafka.streams.processor.internals.tasks.TaskManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class DefaultTaskExecutorTest {
    private static final long VERIFICATION_TIMEOUT = 15000L;
    private final Time time = new MockTime(1L);
    private final StreamTask task = (StreamTask)Mockito.mock(StreamTask.class);
    private final TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
    private final DefaultTaskExecutor taskExecutor = new DefaultTaskExecutor(this.taskManager, "TaskExecutor", this.time);

    @BeforeEach
    public void setUp() {
        Mockito.when((Object)this.taskManager.assignNextTask((TaskExecutor)this.taskExecutor)).thenReturn((Object)this.task).thenReturn(null);
        Mockito.when((Object)this.task.isProcessable(ArgumentMatchers.anyLong())).thenReturn((Object)true);
        Mockito.when((Object)this.task.process(ArgumentMatchers.anyLong())).thenReturn((Object)true);
        Mockito.when((Object)this.task.prepareCommit()).thenReturn(Collections.emptyMap());
    }

    @AfterEach
    public void tearDown() {
        this.taskExecutor.shutdown(Duration.ofMinutes(1L));
    }

    @Test
    public void shouldShutdownTaskExecutor() {
        Assertions.assertNull((Object)this.taskExecutor.currentTask(), (String)"Have task assigned before startup");
        this.taskExecutor.start();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).assignNextTask((TaskExecutor)this.taskExecutor);
        this.taskExecutor.shutdown(Duration.ofMinutes(1L));
        ((StreamTask)Mockito.verify((Object)this.task)).prepareCommit();
        ((TaskManager)Mockito.verify((Object)this.taskManager)).unassignTask(this.task, (TaskExecutor)this.taskExecutor);
        Assertions.assertNull((Object)this.taskExecutor.currentTask(), (String)"Have task assigned after shutdown");
    }

    @Test
    public void shouldUnassignTaskWhenNotProcessable() {
        Mockito.when((Object)this.task.isProcessable(ArgumentMatchers.anyLong())).thenReturn((Object)false);
        this.taskExecutor.start();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).unassignTask(this.task, (TaskExecutor)this.taskExecutor);
        ((StreamTask)Mockito.verify((Object)this.task)).prepareCommit();
        Assertions.assertNull((Object)this.taskExecutor.currentTask());
    }

    @Test
    public void shouldUnassignTaskWhenRequired() throws Exception {
        this.taskExecutor.start();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).assignNextTask((TaskExecutor)this.taskExecutor);
        Assertions.assertNotNull((Object)this.taskExecutor.currentTask());
        KafkaFuture future = this.taskExecutor.unassign();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).unassignTask(this.task, (TaskExecutor)this.taskExecutor);
        ((StreamTask)Mockito.verify((Object)this.task)).prepareCommit();
        Assertions.assertNull((Object)this.taskExecutor.currentTask());
        Assertions.assertTrue((boolean)future.isDone(), (String)"Unassign is not completed");
        Assertions.assertEquals((Object)this.task, (Object)future.get(), (String)"Unexpected task was unassigned");
    }
}

