/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.TestFailoverStrategyFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.SlotExecutionVertexAssignment;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.TestExecutionVertexOperationsDecorator;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryMatcher;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
import org.apache.flink.runtime.shuffle.TestingShuffleMaster;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyIterable;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.hamcrest.collection.IsIterableWithSize;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class DefaultSchedulerTest
extends TestLogger {
    private static final int TIMEOUT_MS = 1000;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutorService;
    private Configuration configuration;
    private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy;
    private TestExecutionVertexOperationsDecorator testExecutionVertexOperations;
    private ExecutionVertexVersioner executionVertexVersioner;
    private TestExecutionSlotAllocatorFactory executionSlotAllocatorFactory;
    private TestExecutionSlotAllocator testExecutionSlotAllocator;
    private TestingShuffleMaster shuffleMaster;
    private TestingJobMasterPartitionTracker partitionTracker;
    private Time timeout;

    @Before
    public void setUp() throws Exception {
        this.executor = Executors.newSingleThreadExecutor();
        this.scheduledExecutorService = new DirectScheduledExecutorService();
        this.configuration = new Configuration();
        this.testRestartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 0L);
        this.testExecutionVertexOperations = new TestExecutionVertexOperationsDecorator((ExecutionVertexOperations)new DefaultExecutionVertexOperations());
        this.executionVertexVersioner = new ExecutionVertexVersioner();
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
        this.testExecutionSlotAllocator = this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator();
        this.shuffleMaster = new TestingShuffleMaster();
        this.partitionTracker = new TestingJobMasterPartitionTracker();
        this.timeout = Time.seconds((long)60L);
    }

    @After
    public void tearDown() throws Exception {
        if (this.scheduledExecutorService != null) {
            ExecutorUtils.gracefulShutdown((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{this.scheduledExecutorService});
        }
        if (this.executor != null) {
            ExecutorUtils.gracefulShutdown((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{this.executor});
        }
    }

    @Test
    public void startScheduling() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        this.createSchedulerAndStartScheduling(jobGraph);
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        MatcherAssert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId}));
    }

    @Test
    public void testCorrectSettingOfInitializationTimestamp() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionGraphInfo executionGraphInfo = scheduler.requestJob();
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        MatcherAssert.assertThat((Object)archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        MatcherAssert.assertThat((Object)archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        MatcherAssert.assertThat((Object)archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        MatcherAssert.assertThat((Object)(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING) <= archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED) ? 1 : 0), (Matcher)Is.is((Object)true));
    }

    @Test
    public void deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(4);
        JobVertexID onlyJobVertexId = DefaultSchedulerTest.getOnlyJobVertex(jobGraph).getID();
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        scheduler.startScheduling();
        List<ExecutionVertexID> verticesToSchedule = Arrays.asList(new ExecutionVertexID(onlyJobVertexId, 0), new ExecutionVertexID(onlyJobVertexId, 1), new ExecutionVertexID(onlyJobVertexId, 2), new ExecutionVertexID(onlyJobVertexId, 3));
        schedulingStrategy.schedule(verticesToSchedule);
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), (Matcher)Matchers.hasSize((int)0));
        this.testExecutionSlotAllocator.completePendingRequest(verticesToSchedule.get(0));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), (Matcher)Matchers.hasSize((int)0));
        this.testExecutionSlotAllocator.completePendingRequests();
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), (Matcher)Matchers.hasSize((int)4));
    }

    @Test
    public void scheduledVertexOrderFromSchedulingStrategyIsRespected() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(10);
        JobVertexID onlyJobVertexId = DefaultSchedulerTest.getOnlyJobVertex(jobGraph).getID();
        List<ExecutionVertexID> desiredScheduleOrder = Arrays.asList(new ExecutionVertexID(onlyJobVertexId, 4), new ExecutionVertexID(onlyJobVertexId, 0), new ExecutionVertexID(onlyJobVertexId, 3), new ExecutionVertexID(onlyJobVertexId, 1), new ExecutionVertexID(onlyJobVertexId, 2));
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        schedulingStrategy.schedule(desiredScheduleOrder);
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        Assert.assertEquals(desiredScheduleOrder, deployedExecutionVertices);
    }

    @Test
    public void restartAfterDeploymentFails() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        this.testExecutionVertexOperations.enableFailDeploy();
        this.createSchedulerAndStartScheduling(jobGraph);
        this.testExecutionVertexOperations.disableFailDeploy();
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        MatcherAssert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId}));
    }

    @Test
    public void restartFailedTask() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex archivedExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        MatcherAssert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId}));
    }

    @Test
    public void updateTaskExecutionStateReturnsFalseIfExecutionDoesNotExist() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        TaskExecutionState taskExecutionState = DefaultSchedulerTest.createFailedTaskExecutionState(new ExecutionAttemptID());
        Assert.assertFalse((boolean)scheduler.updateTaskExecutionState(taskExecutionState));
    }

    @Test
    public void failJobIfCannotRestart() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        this.waitForTermination(scheduler);
        JobStatus jobStatus = scheduler.requestJobStatus();
        MatcherAssert.assertThat((Object)jobStatus, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.FAILED)));
    }

    @Test
    public void failJobIfNotEnoughResources() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        this.testExecutionSlotAllocator.timeoutPendingRequests();
        this.waitForTermination(scheduler);
        JobStatus jobStatus = scheduler.requestJobStatus();
        MatcherAssert.assertThat((Object)jobStatus, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.FAILED)));
        Throwable failureCause = scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo().getException().deserializeError(DefaultSchedulerTest.class.getClassLoader());
        Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)failureCause, NoResourceAvailableException.class).isPresent());
        Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)failureCause, (String)"Could not allocate the required slot within slot request timeout.").isPresent());
        MatcherAssert.assertThat((Object)jobStatus, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.FAILED)));
    }

    @Test
    public void restartVerticesOnSlotAllocationTimeout() throws Exception {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        this.testRestartVerticesOnFailuresInScheduling(vid -> this.testExecutionSlotAllocator.timeoutPendingRequest((ExecutionVertexID)vid));
    }

    @Test
    public void restartVerticesOnAssignedSlotReleased() throws Exception {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        this.testRestartVerticesOnFailuresInScheduling(vid -> {
            LogicalSlot slot = this.testExecutionSlotAllocator.completePendingRequest((ExecutionVertexID)vid);
            slot.releaseSlot((Throwable)new Exception("Release slot for test"));
        });
    }

    private void testRestartVerticesOnFailuresInScheduling(Consumer<ExecutionVertexID> actionsToTriggerTaskFailure) throws Exception {
        int parallelism = 2;
        JobVertex v1 = DefaultSchedulerTest.createVertex("vertex1", 2);
        JobVertex v2 = DefaultSchedulerTest.createVertex("vertex2", 2);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(v1, v2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), schedulingStrategyFactory, (FailoverStrategy.Factory)new RestartPipelinedRegionFailoverStrategy.Factory());
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        scheduler.startScheduling();
        ExecutionVertexID vid11 = new ExecutionVertexID(v1.getID(), 0);
        ExecutionVertexID vid12 = new ExecutionVertexID(v1.getID(), 1);
        ExecutionVertexID vid21 = new ExecutionVertexID(v2.getID(), 0);
        ExecutionVertexID vid22 = new ExecutionVertexID(v2.getID(), 1);
        schedulingStrategy.schedule(Arrays.asList(vid11, vid12, vid21, vid22));
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getPendingRequests().keySet(), (Matcher)Matchers.hasSize((int)4));
        actionsToTriggerTaskFailure.accept(vid11);
        Iterator vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex ev11 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex ev12 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex ev21 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex ev22 = (ArchivedExecutionVertex)vertexIterator.next();
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getPendingRequests().keySet(), (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)ev11.getExecutionState(), (Matcher)Matchers.is((Object)ExecutionState.FAILED));
        MatcherAssert.assertThat((Object)ev21.getExecutionState(), (Matcher)Matchers.is((Object)ExecutionState.CANCELED));
        MatcherAssert.assertThat((Object)ev12.getExecutionState(), (Matcher)Matchers.is((Object)ExecutionState.SCHEDULED));
        MatcherAssert.assertThat((Object)ev22.getExecutionState(), (Matcher)Matchers.is((Object)ExecutionState.SCHEDULED));
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat(schedulingStrategy.getReceivedVerticesToRestart(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{vid11, vid21}));
    }

    @Test
    public void skipDeploymentIfVertexVersionOutdated() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        List sortedJobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        ExecutionVertexID sourceExecutionVertexId = new ExecutionVertexID(((JobVertex)sortedJobVertices.get(0)).getID(), 0);
        ExecutionVertexID sinkExecutionVertexId = new ExecutionVertexID(((JobVertex)sortedJobVertices.get(1)).getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        this.testExecutionSlotAllocator.completePendingRequest(sourceExecutionVertexId);
        ArchivedExecutionVertex sourceExecutionVertex = (ArchivedExecutionVertex)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator().next();
        ExecutionAttemptID attemptId = sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(attemptId));
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        this.testExecutionSlotAllocator.enableAutoCompletePendingRequests();
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{sourceExecutionVertexId, sinkExecutionVertexId}));
        MatcherAssert.assertThat((Object)scheduler.requestJob().getArchivedExecutionGraph().getState(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.RUNNING)));
    }

    @Test
    public void releaseSlotIfVertexVersionOutdated() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(DefaultSchedulerTest.getOnlyJobVertex(jobGraph).getID(), 0);
        this.createSchedulerAndStartScheduling(jobGraph);
        this.executionVertexVersioner.recordModification(onlyExecutionVertexId);
        this.testExecutionSlotAllocator.completePendingRequests();
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getReturnedSlots(), (Matcher)Matchers.hasSize((int)1));
    }

    @Test
    public void vertexIsResetBeforeRestarted() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        SchedulingTopology topology = schedulingStrategy.getSchedulingTopology();
        scheduler.startScheduling();
        SchedulingExecutionVertex onlySchedulingVertex = (SchedulingExecutionVertex)Iterables.getOnlyElement((Iterable)topology.getVertices());
        schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertex.getId()));
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat(schedulingStrategy.getReceivedVerticesToRestart(), (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat((Object)onlySchedulingVertex.getState(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)ExecutionState.CREATED)));
    }

    @Test
    public void scheduleOnlyIfVertexIsCreated() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        SchedulingTopology topology = schedulingStrategy.getSchedulingTopology();
        scheduler.startScheduling();
        ExecutionVertexID onlySchedulingVertexId = (ExecutionVertexID)((SchedulingExecutionVertex)Iterables.getOnlyElement((Iterable)topology.getVertices())).getId();
        schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertexId));
        try {
            schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertexId));
            Assert.fail((String)"IllegalStateException should happen");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void handleGlobalFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        scheduler.handleGlobalFailure((Throwable)new Exception("forced failure"));
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.CANCELED));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        MatcherAssert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId}));
    }

    @Test
    public void handleGlobalFailureWithLocalFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        List attemptIds = StreamSupport.stream(scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().spliterator(), false).map(ArchivedExecutionVertex::getCurrentExecutionAttempt).map(ArchivedExecution::getAttemptId).collect(Collectors.toList());
        ExecutionAttemptID localFailureAttemptId = (ExecutionAttemptID)attemptIds.get(0);
        scheduler.handleGlobalFailure((Throwable)new Exception("global failure"));
        scheduler.updateTaskExecutionState(new TaskExecutionState(localFailureAttemptId, ExecutionState.FAILED, (Throwable)new Exception("local failure")));
        for (ExecutionAttemptID attemptId : attemptIds) {
            scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.CANCELED));
        }
        this.taskRestartExecutor.triggerScheduledTasks();
        ExecutionVertexID executionVertexId0 = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        ExecutionVertexID executionVertexId1 = new ExecutionVertexID(onlyJobVertex.getID(), 1);
        MatcherAssert.assertThat((String)"The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.", this.testExecutionVertexOperations.getDeployedVertices(), (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId0, executionVertexId1, executionVertexId0, executionVertexId1}));
    }

    @Test
    public void testStartingCheckpointSchedulerAfterExecutionGraphFinished() {
        this.assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(SchedulerBase::startCheckpointScheduler);
    }

    @Test
    public void testStoppingCheckpointSchedulerAfterExecutionGraphFinished() {
        this.assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(SchedulerBase::stopCheckpointScheduler);
    }

    private void assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(Consumer<DefaultScheduler> callSchedulingOperation) {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        MatcherAssert.assertThat((Object)scheduler.getCheckpointCoordinator(), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        scheduler.updateTaskExecutionState(new TaskExecutionState(((ExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.getExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED));
        MatcherAssert.assertThat((Object)scheduler.getCheckpointCoordinator(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        callSchedulingOperation.accept(scheduler);
        MatcherAssert.assertThat((Object)scheduler.getCheckpointCoordinator(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
    }

    @Test
    public void vertexIsNotAffectedByOutdatedDeployment() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        Iterator vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex v1 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex v2 = (ArchivedExecutionVertex)vertexIterator.next();
        SchedulingExecutionVertex sv1 = (SchedulingExecutionVertex)scheduler.getSchedulingTopology().getVertices().iterator().next();
        scheduler.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(v1.getCurrentExecutionAttempt().getAttemptId()));
        this.taskRestartExecutor.triggerScheduledTasks();
        scheduler.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(v2.getCurrentExecutionAttempt().getAttemptId()));
        MatcherAssert.assertThat((Object)sv1.getState(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)ExecutionState.SCHEDULED)));
    }

    @Test
    public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        CountDownLatch checkpointTriggeredLatch = this.getCheckpointTriggeredLatch();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        this.transitionToRunning(scheduler, attemptId);
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        MatcherAssert.assertThat((Object)checkpointCoordinator.getNumberOfPendingCheckpoints(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)1)));
        scheduler.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat((Object)checkpointCoordinator.getNumberOfPendingCheckpoints(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)0)));
    }

    @Test
    public void restoreStateWhenRestartingTasks() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        CountDownLatch checkpointTriggeredLatch = this.getCheckpointTriggeredLatch();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        this.transitionToRunning(scheduler, attemptId);
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        TestMasterHook masterHook = TestMasterHook.fromId("testHook");
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)masterHook);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(scheduler, checkpointId);
        scheduler.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat((Object)masterHook.getRestoreCount(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)1)));
    }

    @Test
    public void failGlobalWhenRestoringStateFails() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        CountDownLatch checkpointTriggeredLatch = this.getCheckpointTriggeredLatch();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        this.transitionToRunning(scheduler, attemptId);
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        TestMasterHook masterHook = TestMasterHook.fromId("testHook");
        masterHook.enableFailOnRestore();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)masterHook);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(scheduler, checkpointId);
        scheduler.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        MatcherAssert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId}));
        masterHook.disableFailOnRestore();
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId}));
    }

    @Test
    public void failJobWillIncrementVertexVersions() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
        scheduler.failJob((Throwable)new FlinkException("Test failure."), System.currentTimeMillis());
        Assert.assertTrue((boolean)this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void cancelJobWillIncrementVertexVersions() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
        scheduler.cancel();
        Assert.assertTrue((boolean)this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void suspendJobWillIncrementVertexVersions() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
        scheduler.close();
        Assert.assertTrue((boolean)this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        Iterator vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ExecutionAttemptID attemptId1 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId1, ExecutionState.FAILED, (Throwable)new RuntimeException("expected")));
        JobStatus jobStatusAfterFirstFailure = scheduler.requestJobStatus();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId2, ExecutionState.FAILED, (Throwable)new RuntimeException("expected")));
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        JobStatus jobStatusWithPendingRestarts = scheduler.requestJobStatus();
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        JobStatus jobStatusAfterRestarts = scheduler.requestJobStatus();
        MatcherAssert.assertThat((Object)jobStatusAfterFirstFailure, (Matcher)Matchers.equalTo((Object)JobStatus.RESTARTING));
        MatcherAssert.assertThat((Object)jobStatusWithPendingRestarts, (Matcher)Matchers.equalTo((Object)JobStatus.RESTARTING));
        MatcherAssert.assertThat((Object)jobStatusAfterRestarts, (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
    }

    @Test
    public void cancelWhileRestartingShouldWaitForRunningTasks() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        SchedulingTopology topology = scheduler.getSchedulingTopology();
        Iterator vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ExecutionAttemptID attemptId1 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionVertexID executionVertex2 = scheduler.getExecutionVertexIdOrThrow(attemptId2);
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId1, ExecutionState.FAILED, (Throwable)new RuntimeException("expected")));
        scheduler.cancel();
        ExecutionState vertex2StateAfterCancel = topology.getVertex(executionVertex2).getState();
        JobStatus statusAfterCancelWhileRestarting = scheduler.requestJobStatus();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId2, ExecutionState.CANCELED, (Throwable)new RuntimeException("expected")));
        MatcherAssert.assertThat((Object)vertex2StateAfterCancel, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)ExecutionState.CANCELING)));
        MatcherAssert.assertThat((Object)statusAfterCancelWhileRestarting, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.CANCELLING)));
        MatcherAssert.assertThat((Object)scheduler.requestJobStatus(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.CANCELED)));
    }

    @Test
    public void failureInfoIsSetAfterTaskFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        String exceptionMessage = "expected exception";
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)new RuntimeException("expected exception")));
        ErrorInfo failureInfo = scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo();
        MatcherAssert.assertThat((Object)failureInfo, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        MatcherAssert.assertThat((Object)failureInfo.getExceptionAsString(), (Matcher)Matchers.containsString((String)"expected exception"));
    }

    @Test
    public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (SchedulingStrategyFactory)new PipelinedRegionSchedulingStrategy.Factory(), (FailoverStrategy.Factory)new RestartAllFailoverStrategy.Factory());
        scheduler.startScheduling();
        Iterator vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex v1 = (ArchivedExecutionVertex)vertexIterator.next();
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getPendingRequests().keySet(), (Matcher)Matchers.hasSize((int)2));
        String exceptionMessage = "expected exception";
        scheduler.updateTaskExecutionState(new TaskExecutionState(v1.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, (Throwable)new RuntimeException("expected exception")));
        vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        v1 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex v2 = (ArchivedExecutionVertex)vertexIterator.next();
        MatcherAssert.assertThat((Object)v1.getExecutionState(), (Matcher)Matchers.is((Object)ExecutionState.FAILED));
        MatcherAssert.assertThat((Object)v2.getExecutionState(), (Matcher)Matchers.is((Object)ExecutionState.CANCELED));
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getPendingRequests().keySet(), (Matcher)Matchers.hasSize((int)0));
    }

    @Test
    public void pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots() throws Exception {
        int parallelism = 10;
        JobGraph jobGraph = DefaultSchedulerTest.sourceSinkJobGraph(10);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        this.testExecutionSlotAllocator.enableCompletePendingRequestsWithReturnedSlots();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (SchedulingStrategyFactory)new PipelinedRegionSchedulingStrategy.Factory(), (FailoverStrategy.Factory)new RestartAllFailoverStrategy.Factory());
        scheduler.startScheduling();
        ExecutionVertex ev1 = (ExecutionVertex)Iterables.get((Iterable)scheduler.getExecutionGraph().getAllExecutionVertices(), (int)0);
        Set pendingLogicalSlotFutures = this.testExecutionSlotAllocator.getPendingRequests().values().stream().map(SlotExecutionVertexAssignment::getLogicalSlotFuture).collect(Collectors.toSet());
        MatcherAssert.assertThat(pendingLogicalSlotFutures, (Matcher)Matchers.hasSize((int)20));
        this.testExecutionSlotAllocator.completePendingRequest(ev1.getID());
        MatcherAssert.assertThat((Object)pendingLogicalSlotFutures.stream().filter(CompletableFuture::isDone).count(), (Matcher)Matchers.is((Object)1L));
        String exceptionMessage = "expected exception";
        scheduler.updateTaskExecutionState(new TaskExecutionState(ev1.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, (Throwable)new RuntimeException("expected exception")));
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getPendingRequests().keySet(), (Matcher)Matchers.hasSize((int)0));
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getReturnedSlots(), (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)pendingLogicalSlotFutures.stream().filter(CompletableFuture::isCancelled).count(), (Matcher)Matchers.is((Object)18L));
    }

    @Test
    public void testExceptionHistoryWithGlobalFailOver() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        Exception expectedException = new Exception("Expected exception");
        scheduler.handleGlobalFailure((Throwable)expectedException);
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.CANCELED, (Throwable)expectedException));
        this.taskRestartExecutor.triggerScheduledTasks();
        Iterable actualExceptionHistory = scheduler.getExceptionHistory();
        MatcherAssert.assertThat((Object)actualExceptionHistory, (Matcher)IsIterableWithSize.iterableWithSize((int)1));
        RootExceptionHistoryEntry failure = (RootExceptionHistoryEntry)actualExceptionHistory.iterator().next();
        MatcherAssert.assertThat((Object)failure, ExceptionHistoryEntryMatcher.matchesGlobalFailure(expectedException, scheduler.getExecutionGraph().getFailureInfo().getTimestamp()));
        MatcherAssert.assertThat((Object)failure.getConcurrentExceptions(), (Matcher)IsEmptyIterable.emptyIterable());
    }

    @Test
    public void testExceptionHistoryWithRestartableFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TestingLogicalSlotBuilder logicalSlotBuilder = new TestingLogicalSlotBuilder();
        logicalSlotBuilder.setTaskManagerLocation(taskManagerLocation);
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory(logicalSlotBuilder);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex taskFailureExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        RuntimeException restartableException = new RuntimeException("restartable exception");
        long updateStateTriggeringRestartTimestamp = DefaultSchedulerTest.initiateFailure(scheduler, taskFailureExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), restartableException);
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        ExecutionAttemptID failingAttemptId = ((ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        RuntimeException failingException = new RuntimeException("failing exception");
        long updateStateTriggeringJobFailureTimestamp = DefaultSchedulerTest.initiateFailure(scheduler, failingAttemptId, failingException);
        Iterable actualExceptionHistory = scheduler.getExceptionHistory();
        MatcherAssert.assertThat((Object)actualExceptionHistory, (Matcher)IsIterableContainingInOrder.contains((Matcher[])new Matcher[]{ExceptionHistoryEntryMatcher.matchesFailure(restartableException, updateStateTriggeringRestartTimestamp, taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(), taskFailureExecutionVertex.getCurrentAssignedResourceLocation()), ExceptionHistoryEntryMatcher.matchesGlobalFailure(failingException, updateStateTriggeringJobFailureTimestamp)}));
    }

    @Test
    public void testExceptionHistoryWithPreDeployFailure() {
        this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator().disableAutoCompletePendingRequests();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(DefaultSchedulerTest.singleNonParallelJobVertexJobGraph());
        this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator().timeoutPendingRequests();
        ArchivedExecutionVertex taskFailureExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        MatcherAssert.assertThat((Object)taskFailureExecutionVertex.getCurrentAssignedResourceLocation(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        ErrorInfo failureInfo = (ErrorInfo)taskFailureExecutionVertex.getFailureInfo().orElseThrow(() -> new AssertionError((Object)"A failureInfo should be set."));
        Iterable actualExceptionHistory = scheduler.getExceptionHistory();
        MatcherAssert.assertThat((Object)actualExceptionHistory, (Matcher)IsIterableContainingInOrder.contains(ExceptionHistoryEntryMatcher.matchesFailure((Throwable)failureInfo.getException(), failureInfo.getTimestamp(), taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(), taskFailureExecutionVertex.getCurrentAssignedResourceLocation())));
    }

    @Test
    public void testExceptionHistoryConcurrentRestart() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TestingLogicalSlotBuilder logicalSlotBuilder = new TestingLogicalSlotBuilder();
        logicalSlotBuilder.setTaskManagerLocation(taskManagerLocation);
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory(logicalSlotBuilder);
        ReorganizableManuallyTriggeredScheduledExecutor delayExecutor = new ReorganizableManuallyTriggeredScheduledExecutor();
        TestFailoverStrategyFactory failoverStrategyFactory = new TestFailoverStrategyFactory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (SchedulingStrategyFactory)new PipelinedRegionSchedulingStrategy.Factory(), failoverStrategyFactory, delayExecutor);
        scheduler.startScheduling();
        ExecutionVertex executionVertex0 = (ExecutionVertex)Iterables.get((Iterable)scheduler.getExecutionGraph().getAllExecutionVertices(), (int)0);
        ExecutionVertex executionVertex1 = (ExecutionVertex)Iterables.get((Iterable)scheduler.getExecutionGraph().getAllExecutionVertices(), (int)1);
        RuntimeException exception0 = new RuntimeException("failure #0");
        failoverStrategyFactory.setTasksToRestart(executionVertex0.getID());
        long updateStateTriggeringRestartTimestamp0 = DefaultSchedulerTest.initiateFailure(scheduler, executionVertex0.getCurrentExecutionAttempt().getAttemptId(), exception0);
        RuntimeException exception1 = new RuntimeException("failure #1");
        failoverStrategyFactory.setTasksToRestart(executionVertex1.getID(), executionVertex0.getID());
        long updateStateTriggeringRestartTimestamp1 = DefaultSchedulerTest.initiateFailure(scheduler, executionVertex1.getCurrentExecutionAttempt().getAttemptId(), exception1);
        Collections.reverse(delayExecutor.getCollectedScheduledTasks());
        delayExecutor.triggerNonPeriodicScheduledTasks();
        MatcherAssert.assertThat((Object)scheduler.getExceptionHistory(), (Matcher)IsIterableWithSize.iterableWithSize((int)2));
        Iterator actualExceptionHistory = scheduler.getExceptionHistory().iterator();
        RootExceptionHistoryEntry entry0 = (RootExceptionHistoryEntry)actualExceptionHistory.next();
        MatcherAssert.assertThat((Object)entry0, (Matcher)Matchers.is(ExceptionHistoryEntryMatcher.matchesFailure(exception0, updateStateTriggeringRestartTimestamp0, executionVertex0.getTaskNameWithSubtaskIndex(), executionVertex0.getCurrentAssignedResourceLocation())));
        MatcherAssert.assertThat((Object)entry0.getConcurrentExceptions(), (Matcher)IsIterableContainingInOrder.contains(ExceptionHistoryEntryMatcher.matchesFailure(exception1, updateStateTriggeringRestartTimestamp1, executionVertex1.getTaskNameWithSubtaskIndex(), executionVertex1.getCurrentAssignedResourceLocation())));
        RootExceptionHistoryEntry entry1 = (RootExceptionHistoryEntry)actualExceptionHistory.next();
        MatcherAssert.assertThat((Object)entry1, (Matcher)Matchers.is(ExceptionHistoryEntryMatcher.matchesFailure(exception1, updateStateTriggeringRestartTimestamp1, executionVertex1.getTaskNameWithSubtaskIndex(), executionVertex1.getCurrentAssignedResourceLocation())));
        MatcherAssert.assertThat((Object)entry1.getConcurrentExceptions(), (Matcher)IsEmptyIterable.emptyIterable());
    }

    @Test
    public void testExceptionHistoryTruncation() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        this.configuration.set(WebOptions.MAX_EXCEPTION_HISTORY_SIZE, (Object)1);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionAttemptID attemptId0 = ((ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        DefaultSchedulerTest.initiateFailure(scheduler, attemptId0, new RuntimeException("old exception"));
        this.taskRestartExecutor.triggerNonPeriodicScheduledTasks();
        ArchivedExecutionVertex executionVertex1 = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        RuntimeException exception = new RuntimeException("relevant exception");
        long relevantTimestamp = DefaultSchedulerTest.initiateFailure(scheduler, executionVertex1.getCurrentExecutionAttempt().getAttemptId(), exception);
        this.taskRestartExecutor.triggerNonPeriodicScheduledTasks();
        MatcherAssert.assertThat((Object)scheduler.getExceptionHistory(), (Matcher)IsIterableContainingInOrder.contains(ExceptionHistoryEntryMatcher.matchesFailure(exception, relevantTimestamp, executionVertex1.getTaskNameWithSubtaskIndex(), executionVertex1.getCurrentAssignedResourceLocation())));
    }

    @Test
    public void testDeploymentWaitForProducedPartitionRegistration() {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        ArrayList trackedPartitions = new ArrayList();
        this.partitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> trackedPartitions.add(resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID()));
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        this.createSchedulerAndStartScheduling(jobGraph);
        MatcherAssert.assertThat(trackedPartitions, (Matcher)Matchers.hasSize((int)0));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), (Matcher)Matchers.hasSize((int)0));
        this.shuffleMaster.completeAllPendingRegistrations();
        MatcherAssert.assertThat(trackedPartitions, (Matcher)Matchers.hasSize((int)1));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), (Matcher)Matchers.hasSize((int)2));
    }

    @Test
    public void testFailedProducedPartitionRegistration() {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        this.createSchedulerAndStartScheduling(jobGraph);
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getCanceledVertices(), (Matcher)Matchers.hasSize((int)0));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getFailedVertices(), (Matcher)Matchers.hasSize((int)0));
        this.shuffleMaster.failAllPendingRegistrations();
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getCanceledVertices(), (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getFailedVertices(), (Matcher)Matchers.hasSize((int)1));
    }

    @Test
    public void testDirectExceptionOnProducedPartitionRegistration() {
        this.shuffleMaster.setThrowExceptionalOnRegistration(true);
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        this.createSchedulerAndStartScheduling(jobGraph);
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getCanceledVertices(), (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getFailedVertices(), (Matcher)Matchers.hasSize((int)1));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProducedPartitionRegistrationTimeout() throws Exception {
        ScheduledExecutorService scheduledExecutorService = null;
        try {
            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService);
            this.shuffleMaster.setAutoCompleteRegistration(false);
            JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
            this.timeout = Time.milliseconds((long)1L);
            this.createSchedulerAndStartScheduling(jobGraph, mainThreadExecutor);
            this.testExecutionVertexOperations.awaitCanceledVertices(2);
            this.testExecutionVertexOperations.awaitFailedVertices(1);
        }
        finally {
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
            }
        }
    }

    @Test
    public void testLateRegisteredPartitionsWillBeReleased() {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        ArrayList trackedPartitions = new ArrayList();
        this.partitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> trackedPartitions.add(resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID()));
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex sourceExecutionVertex = (ArchivedExecutionVertex)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator().next();
        ExecutionAttemptID attemptId = sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(attemptId));
        this.shuffleMaster.completeAllPendingRegistrations();
        MatcherAssert.assertThat(trackedPartitions, (Matcher)Matchers.hasSize((int)0));
        MatcherAssert.assertThat(this.shuffleMaster.getExternallyReleasedPartitions(), (Matcher)Matchers.hasSize((int)1));
    }

    @Test
    public void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        try {
            DefaultSchedulerTest.doTestCheckpointCleanerIsClosedAfterCheckpointServices((checkpointRecoveryFactory, checkpointCleaner) -> {
                JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(1);
                SchedulerTestingUtils.enableCheckpointing(jobGraph);
                try {
                    return SchedulerTestingUtils.newSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(executorService)).setCheckpointRecoveryFactory((CheckpointRecoveryFactory)checkpointRecoveryFactory).setCheckpointCleaner((CheckpointsCleaner)checkpointCleaner).build();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, executorService);
        }
        finally {
            executorService.shutdownNow();
        }
    }

    public static void doTestCheckpointCleanerIsClosedAfterCheckpointServices(BiFunction<CheckpointRecoveryFactory, CheckpointsCleaner, SchedulerNG> schedulerFactory, ScheduledExecutorService executorService) throws Exception {
        final CountDownLatch checkpointServicesShutdownBlocked = new CountDownLatch(1);
        final CountDownLatch cleanerClosed = new CountDownLatch(1);
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1){

            public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception {
                checkpointServicesShutdownBlocked.await();
                super.shutdown(jobStatus, checkpointsCleaner);
            }
        };
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter(){

            public void shutdown(JobStatus jobStatus) throws Exception {
                checkpointServicesShutdownBlocked.await();
                super.shutdown(jobStatus);
            }
        };
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(){

            public synchronized CompletableFuture<Void> closeAsync() {
                cleanerClosed.countDown();
                return super.closeAsync();
            }
        };
        SchedulerNG scheduler = schedulerFactory.apply(new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)checkpointIDCounter), checkpointsCleaner);
        CompletableFuture schedulerClosed = new CompletableFuture();
        CountDownLatch schedulerClosing = new CountDownLatch(1);
        executorService.submit(() -> {
            scheduler.closeAsync().thenRun(() -> schedulerClosed.complete(null));
            schedulerClosing.countDown();
        });
        schedulerClosing.await();
        Assert.assertFalse((String)"CheckpointCleaner should not close before checkpoint services.", (boolean)cleanerClosed.await(10L, TimeUnit.MILLISECONDS));
        checkpointServicesShutdownBlocked.countDown();
        cleanerClosed.await();
        schedulerClosed.get();
    }

    private static TaskExecutionState createFailedTaskExecutionState(ExecutionAttemptID executionAttemptID) {
        return new TaskExecutionState(executionAttemptID, ExecutionState.FAILED, (Throwable)new Exception("Expected failure cause"));
    }

    private static long initiateFailure(DefaultScheduler scheduler, ExecutionAttemptID executionAttemptId, Throwable exception) {
        scheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.FAILED, exception));
        return DefaultSchedulerTest.getFailureTimestamp(scheduler, executionAttemptId);
    }

    private static long getFailureTimestamp(DefaultScheduler scheduler, ExecutionAttemptID executionAttemptId) {
        ExecutionVertex failedExecutionVertex = StreamSupport.stream(scheduler.getExecutionGraph().getAllExecutionVertices().spliterator(), false).filter(v -> executionAttemptId.equals((Object)v.getCurrentExecutionAttempt().getAttemptId())).findFirst().orElseThrow(() -> new IllegalArgumentException("No ExecutionVertex available for the passed ExecutionAttemptId " + executionAttemptId));
        return failedExecutionVertex.getFailureInfo().map(ErrorInfo::getTimestamp).orElseThrow(() -> new IllegalStateException("No failure was set for ExecutionVertex having the passed execution " + executionAttemptId));
    }

    private static JobVertex createVertex(String name, int parallelism) {
        JobVertex v = new JobVertex(name);
        v.setParallelism(parallelism);
        v.setInvokableClass(AbstractInvokable.class);
        return v;
    }

    private void waitForTermination(DefaultScheduler scheduler) throws Exception {
        scheduler.getJobTerminationFuture().get(1000L, TimeUnit.MILLISECONDS);
    }

    private static JobGraph singleNonParallelJobVertexJobGraph() {
        return DefaultSchedulerTest.singleJobVertexJobGraph(1);
    }

    private static JobGraph singleJobVertexJobGraph(int parallelism) {
        JobVertex vertex = new JobVertex("source");
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(parallelism);
        return JobGraphTestUtils.streamingJobGraph(vertex);
    }

    private static JobGraph nonParallelSourceSinkJobGraph() {
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(NoOpInvokable.class);
        sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(source, sink);
    }

    private static JobGraph sourceSinkJobGraph(int parallelism) {
        JobVertex source = new JobVertex("source");
        source.setParallelism(parallelism);
        source.setInvokableClass(NoOpInvokable.class);
        JobVertex sink = new JobVertex("sink");
        sink.setParallelism(parallelism);
        sink.setInvokableClass(NoOpInvokable.class);
        sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(source, sink);
    }

    private static JobVertex getOnlyJobVertex(JobGraph jobGraph) {
        List sortedVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Preconditions.checkState((sortedVertices.size() == 1 ? 1 : 0) != 0);
        return (JobVertex)sortedVertices.get(0);
    }

    private DefaultScheduler createSchedulerAndStartScheduling(JobGraph jobGraph) {
        return this.createSchedulerAndStartScheduling(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread());
    }

    private DefaultScheduler createSchedulerAndStartScheduling(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
        PipelinedRegionSchedulingStrategy.Factory schedulingStrategyFactory = new PipelinedRegionSchedulingStrategy.Factory();
        try {
            DefaultScheduler scheduler = this.createScheduler(jobGraph, mainThreadExecutor, (SchedulingStrategyFactory)schedulingStrategyFactory);
            mainThreadExecutor.execute(() -> ((DefaultScheduler)scheduler).startScheduling());
            return scheduler;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, SchedulingStrategyFactory schedulingStrategyFactory) throws Exception {
        return this.createScheduler(jobGraph, mainThreadExecutor, schedulingStrategyFactory, (FailoverStrategy.Factory)new RestartPipelinedRegionFailoverStrategy.Factory());
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory failoverStrategyFactory) throws Exception {
        return this.createScheduler(jobGraph, mainThreadExecutor, schedulingStrategyFactory, failoverStrategyFactory, this.taskRestartExecutor);
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory failoverStrategyFactory, ScheduledExecutor delayExecutor) throws Exception {
        return SchedulerTestingUtils.newSchedulerBuilder(jobGraph, mainThreadExecutor).setLogger(this.log).setIoExecutor(this.executor).setJobMasterConfiguration(this.configuration).setFutureExecutor(this.scheduledExecutorService).setDelayExecutor(delayExecutor).setSchedulingStrategyFactory(schedulingStrategyFactory).setFailoverStrategyFactory(failoverStrategyFactory).setRestartBackoffTimeStrategy(this.testRestartBackoffTimeStrategy).setExecutionVertexOperations(this.testExecutionVertexOperations).setExecutionVertexVersioner(this.executionVertexVersioner).setExecutionSlotAllocatorFactory(this.executionSlotAllocatorFactory).setShuffleMaster(this.shuffleMaster).setPartitionTracker(this.partitionTracker).setRpcTimeout(this.timeout).build();
    }

    private CountDownLatch getCheckpointTriggeredLatch() {
        CountDownLatch checkpointTriggeredLatch = new CountDownLatch(1);
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        this.testExecutionSlotAllocator.getLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway);
        taskManagerGateway.setCheckpointConsumer((executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) -> checkpointTriggeredLatch.countDown());
        return checkpointTriggeredLatch;
    }

    private void transitionToRunning(DefaultScheduler scheduler, ExecutionAttemptID attemptId) {
        Preconditions.checkState((boolean)scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.INITIALIZING)));
        Preconditions.checkState((boolean)scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.RUNNING)));
    }

    private static class ReorganizableManuallyTriggeredScheduledExecutor
    extends ManuallyTriggeredScheduledExecutor {
        private final List<ScheduledTask<?>> scheduledTasks = new ArrayList();

        private ReorganizableManuallyTriggeredScheduledExecutor() {
        }

        @Override
        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            return this.schedule(() -> {
                command.run();
                return null;
            }, delay, unit);
        }

        @Override
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            ScheduledTask scheduledTask = new ScheduledTask(callable, unit.convert(delay, TimeUnit.MILLISECONDS));
            this.scheduledTasks.add(scheduledTask);
            return scheduledTask;
        }

        public List<ScheduledTask<?>> getCollectedScheduledTasks() {
            return this.scheduledTasks;
        }

        public void scheduleCollectedScheduledTasks() {
            for (ScheduledTask<?> scheduledTask : this.scheduledTasks) {
                super.schedule(scheduledTask.getCallable(), scheduledTask.getDelay(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            }
            this.scheduledTasks.clear();
        }

        @Override
        public void triggerNonPeriodicScheduledTask() {
            this.scheduleCollectedScheduledTasks();
            super.triggerNonPeriodicScheduledTask();
        }

        @Override
        public void triggerNonPeriodicScheduledTasks() {
            this.scheduleCollectedScheduledTasks();
            super.triggerNonPeriodicScheduledTasks();
        }
    }
}

