/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.InteractionsCountingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionGraphSuspendTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ExecutionGraphSuspendTest() {
    }

    @Test
    void testSuspendedOutOfCreated() throws Exception {
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway();
        int parallelism = 10;
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.CREATED);
        scheduler.closeAsync();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.CANCELED);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 0);
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(scheduler, gateway);
    }

    @Test
    void testSuspendedOutOfDeploying() throws Exception {
        int parallelism = 10;
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.RUNNING);
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.DEPLOYING);
        scheduler.closeAsync();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(scheduler, gateway);
    }

    @Test
    void testSuspendedOutOfRunning() throws Exception {
        int parallelism = 10;
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.RUNNING);
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.RUNNING);
        scheduler.closeAsync();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(scheduler, gateway);
    }

    @Test
    void testSuspendedOutOfFailing() throws Exception {
        int parallelism = 10;
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        scheduler.handleGlobalFailure((Throwable)new Exception("fail global"));
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.FAILING);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        scheduler.closeAsync();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(scheduler, gateway);
    }

    @Test
    void testSuspendedOutOfFailed() throws Exception {
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway();
        int parallelism = 10;
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        scheduler.handleGlobalFailure((Throwable)new Exception("fail global"));
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.FAILING);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.FAILED);
        scheduler.closeAsync();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.FAILED);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
    }

    @Test
    void testSuspendedOutOfCanceling() throws Exception {
        int parallelism = 10;
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        scheduler.cancel();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.CANCELLING);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        scheduler.closeAsync();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(scheduler, gateway);
    }

    @Test
    void testSuspendedOutOfCanceled() throws Exception {
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway();
        int parallelism = 10;
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        scheduler.cancel();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.CANCELLING);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        FlinkAssertions.assertThatFuture((CompletableFuture)eg.getTerminationFuture()).eventuallySucceeds().isEqualTo((Object)JobStatus.CANCELED);
        scheduler.closeAsync();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.CANCELED);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
    }

    @Test
    void testSuspendWhileRestarting() throws Exception {
        ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.emptyJobGraph(), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor((ScheduledExecutor)taskRestartExecutor).build();
        scheduler.startScheduling();
        ExecutionGraph eg = scheduler.getExecutionGraph();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.RUNNING);
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        scheduler.handleGlobalFailure((Throwable)new Exception("test"));
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.RESTARTING);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.RESTARTING);
        scheduler.closeAsync();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
    }

    private static void ensureCannotLeaveSuspendedState(SchedulerBase scheduler, InteractionsCountingTaskManagerGateway gateway) {
        ExecutionGraph eg = scheduler.getExecutionGraph();
        gateway.waitUntilAllTasksAreSubmitted();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        gateway.resetCounts();
        scheduler.handleGlobalFailure((Throwable)new Exception("fail"));
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        ExecutionGraphSuspendTest.validateNoInteractions(gateway);
        scheduler.cancel();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        ExecutionGraphSuspendTest.validateNoInteractions(gateway);
        scheduler.closeAsync();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        ExecutionGraphSuspendTest.validateNoInteractions(gateway);
        for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
            Assertions.assertThat((int)ev.getCurrentExecutionAttempt().getAttemptNumber()).isZero();
        }
    }

    private static void validateNoInteractions(InteractionsCountingTaskManagerGateway gateway) {
        Assertions.assertThat((int)gateway.getInteractionsCount()).isZero();
    }

    private static void validateAllVerticesInState(ExecutionGraph eg, ExecutionState expected) {
        for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
            Assertions.assertThat((Comparable)ev.getCurrentExecutionAttempt().getState()).isEqualTo((Object)expected);
        }
    }

    private static void validateCancelRpcCalls(InteractionsCountingTaskManagerGateway gateway, int num) {
        Assertions.assertThat((int)gateway.getCancelTaskCount()).isEqualTo(num);
    }

    private static SchedulerBase createScheduler(TaskManagerGateway gateway, int parallelism) throws Exception {
        JobVertex vertex = new JobVertex("vertex");
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(parallelism);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(vertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(parallelism, gateway))).build();
        return scheduler;
    }
}

