/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class ExecutionGraphRestartTest
extends TestLogger {
    private static final int NUM_TASKS = 31;
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
    private ManuallyTriggeredScheduledExecutor taskRestartExecutor;

    @Before
    public void setUp() {
        this.taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    private void completeCanceling(ExecutionGraph eg) {
        this.executeOperationForAllExecutions(eg, Execution::completeCancelling);
    }

    private void executeOperationForAllExecutions(ExecutionGraph eg, Consumer<Execution> operation) {
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            operation.accept(vertex.getCurrentExecutionAttempt());
        }
    }

    @Test
    public void testCancelAllPendingRequestWhileCanceling() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            int numTasksExceedSlotPool = 50;
            JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", 81, NoOpInvokable.class);
            JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(graph, mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).build();
            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 31);
            Assert.assertEquals((long)50L, (long)slotPool.getNumPendingRequests());
            scheduler.cancel();
            Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)executionGraph.getState());
            Assert.assertEquals((long)0L, (long)slotPool.getNumPendingRequests());
        }
    }

    @Test
    public void testCancelAllPendingRequestWhileFailing() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            int numTasksExceedSlotPool = 50;
            JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", 81, NoOpInvokable.class);
            JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(graph, mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).build();
            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 31);
            Assert.assertEquals((long)50L, (long)slotPool.getNumPendingRequests());
            scheduler.handleGlobalFailure((Throwable)new Exception("test"));
            Assert.assertEquals((Object)JobStatus.FAILING, (Object)executionGraph.getState());
            Assert.assertEquals((long)0L, (long)slotPool.getNumPendingRequests());
        }
    }

    @Test
    public void testCancelWhileRestarting() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(ExecutionGraphRestartTest.createJobGraph(), mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor((ScheduledExecutor)this.taskRestartExecutor).build();
            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ResourceID taskManagerResourceId = ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 31);
            slotPool.releaseTaskManager(taskManagerResourceId, new Exception("Test Exception"));
            Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
            scheduler.cancel();
            Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
            this.taskRestartExecutor.triggerScheduledTasks();
            Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
            for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
                Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex.getExecutionState());
            }
        }
    }

    private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
        return SlotPoolUtils.offerSlots(slotPool, mainThreadExecutor, Collections.nCopies(numSlots, ResourceProfile.ANY));
    }

    @Test
    public void testCancelWhileFailing() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(ExecutionGraphRestartTest.createJobGraph(), mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).build();
            ExecutionGraph graph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 31);
            Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
            this.switchAllTasksToRunning(graph);
            scheduler.handleGlobalFailure((Throwable)new Exception("test"));
            Assert.assertEquals((Object)JobStatus.FAILING, (Object)graph.getState());
            scheduler.cancel();
            Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)graph.getState());
            this.completeCanceling(graph);
            Assert.assertEquals((Object)JobStatus.CANCELED, (Object)graph.getState());
        }
    }

    @Test
    public void testFailWhileCanceling() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(ExecutionGraphRestartTest.createJobGraph(), mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).build();
            ExecutionGraph graph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 31);
            Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
            this.switchAllTasksToRunning(graph);
            scheduler.cancel();
            Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)graph.getState());
            scheduler.handleGlobalFailure((Throwable)new Exception("test"));
            Assert.assertEquals((Object)JobStatus.FAILING, (Object)graph.getState());
            this.completeCanceling(graph);
            Assert.assertEquals((Object)JobStatus.FAILED, (Object)graph.getState());
        }
    }

    private void switchAllTasksToRunning(ExecutionGraph graph) {
        this.executeOperationForAllExecutions(graph, Execution::switchToRunning);
    }

    @Test
    public void testFailingExecutionAfterRestart() throws Exception {
        JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
        JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor((ScheduledExecutor)this.taskRestartExecutor).build();
            ExecutionGraph eg = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 2);
            Iterator executionVertices = eg.getAllExecutionVertices().iterator();
            Execution finishedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
            Execution failedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
            finishedExecution.markFinished();
            failedExecution.fail((Throwable)new Exception("Test Exception"));
            failedExecution.completeCancelling();
            this.taskRestartExecutor.triggerScheduledTasks();
            Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                Assert.assertNotNull((String)"No assigned resource (test instability).", (Object)vertex.getCurrentAssignedResource());
                vertex.getCurrentExecutionAttempt().switchToRecovering();
                vertex.getCurrentExecutionAttempt().switchToRunning();
            }
            finishedExecution.fail((Throwable)new Exception("This should have no effect"));
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                vertex.getCurrentExecutionAttempt().markFinished();
            }
            Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)finishedExecution.getState());
            Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
        }
    }

    @Test
    public void testFailExecutionAfterCancel() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(ExecutionGraphRestartTest.createJobGraphToCancel(), mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).setDelayExecutor((ScheduledExecutor)this.taskRestartExecutor).build();
            ExecutionGraph eg = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 1);
            scheduler.cancel();
            for (ExecutionVertex v : eg.getAllExecutionVertices()) {
                v.getCurrentExecutionAttempt().fail((Throwable)new Exception("Test Exception"));
            }
            Assert.assertEquals((Object)JobStatus.CANCELED, eg.getTerminationFuture().get());
            Execution execution = ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt();
            execution.completeCancelling();
            Assert.assertEquals((Object)JobStatus.CANCELED, (Object)eg.getState());
        }
    }

    private static void startScheduling(SchedulerBase scheduler) {
        Assert.assertThat((Object)scheduler.getExecutionGraph().getState(), (Matcher)Matchers.is((Object)JobStatus.CREATED));
        scheduler.startScheduling();
        Assert.assertThat((Object)scheduler.getExecutionGraph().getState(), (Matcher)Matchers.is((Object)JobStatus.RUNNING));
    }

    private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory(SlotPool slotPool) throws Exception {
        ExecutionGraphRestartTest.setupSlotPool(slotPool);
        PhysicalSlotProviderImpl physicalSlotProvider = new PhysicalSlotProviderImpl((SlotSelectionStrategy)LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
        return SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory((PhysicalSlotProvider)physicalSlotProvider);
    }

    private static void setupSlotPool(SlotPool slotPool) throws Exception {
        String jobManagerAddress = "foobar";
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        slotPool.start(JobMasterId.generate(), "foobar", mainThreadExecutor);
        slotPool.connectToResourceManager((ResourceManagerGateway)resourceManagerGateway);
    }

    private static JobGraph createJobGraph() {
        JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", 31, NoOpInvokable.class);
        return JobGraphTestUtils.streamingJobGraph(sender);
    }

    private static JobGraph createJobGraphToCancel() throws IOException {
        JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)Integer.MAX_VALUE));
        return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(vertex).setExecutionConfig(executionConfig).build();
    }
}

