/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.DefaultSubtaskAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.EdgeManager;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.SubtaskAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.failover.partitionrelease.PartitionGroupReleaseStrategy;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.adaptive.Executing;
import org.apache.flink.runtime.scheduler.adaptive.FailureResult;
import org.apache.flink.runtime.scheduler.adaptive.MockStateWithExecutionGraphContext;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager;
import org.apache.flink.runtime.scheduler.adaptive.StateValidator;
import org.apache.flink.runtime.scheduler.adaptive.TestingOperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.TestingStateTransitionManager;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicIntegerAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ExecutingTest {
    private static final Logger log = LoggerFactory.getLogger(ExecutingTest.class);
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    ExecutingTest() {
    }

    @Test
    void testExecutionGraphDeploymentOnEnter() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex(MockExecutionVertex::new);
            MockExecutionVertex mockExecutionVertex = (MockExecutionVertex)mockExecutionJobVertex.getMockExecutionVertex();
            mockExecutionVertex.setMockedExecutionState(ExecutionState.CREATED);
            MockExecutionGraph executionGraph = new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
            Executing exec = new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx);
            Assertions.assertThat((boolean)mockExecutionVertex.isDeployCalled()).isTrue();
            Assertions.assertThat((Comparable)executionGraph.getState()).isEqualTo((Object)JobStatus.RUNNING);
        }
    }

    @Test
    void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex(MockExecutionVertex::new);
            MockExecutionGraph executionGraph = new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
            executionGraph.transitionToRunning();
            MockExecutionVertex mockExecutionVertex = (MockExecutionVertex)mockExecutionJobVertex.getMockExecutionVertex();
            mockExecutionVertex.setMockedExecutionState(ExecutionState.RUNNING);
            new Executing((ExecutionGraph)executionGraph, this.getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()), (OperatorCoordinatorHandler)new TestingOperatorCoordinatorHandler(), log, (Executing.Context)ctx, ClassLoader.getSystemClassLoader(), new ArrayList(), context -> TestingStateTransitionManager.withNoOp(), 1);
            Assertions.assertThat((boolean)mockExecutionVertex.isDeployCalled()).isFalse();
        }
    }

    @Test
    void testIllegalStateExceptionOnNotRunningExecutionGraph() {
        Assertions.assertThatThrownBy(() -> {
            try (MockExecutingContext ctx = new MockExecutingContext();){
                StateTrackingMockExecutionGraph notRunningExecutionGraph = new StateTrackingMockExecutionGraph();
                Assertions.assertThat((Comparable)notRunningExecutionGraph.getState()).isNotEqualTo((Object)JobStatus.RUNNING);
                new Executing((ExecutionGraph)notRunningExecutionGraph, this.getExecutionGraphHandler(notRunningExecutionGraph, ctx.getMainThreadExecutor()), (OperatorCoordinatorHandler)new TestingOperatorCoordinatorHandler(), log, (Executing.Context)ctx, ClassLoader.getSystemClassLoader(), new ArrayList(), context -> TestingStateTransitionManager.withNoOp(), 1);
            }
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    public void testTriggerRescaleOnCompletedCheckpoint() throws Exception {
        AtomicBoolean rescaleTriggered = new AtomicBoolean();
        Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory = context -> TestingStateTransitionManager.withOnTriggerEventOnly(() -> rescaleTriggered.set(true));
        try (MockExecutingContext ctx = new MockExecutingContext();){
            Executing testInstance = new ExecutingStateBuilder().setStateTransitionManagerFactory(stateTransitionManagerFactory).build(ctx);
            Assertions.assertThat((AtomicBoolean)rescaleTriggered).isFalse();
            testInstance.onCompletedCheckpoint();
            Assertions.assertThat((AtomicBoolean)rescaleTriggered).isTrue();
        }
    }

    @Test
    public void testTriggerRescaleOnFailedCheckpoint() throws Exception {
        AtomicInteger rescaleTriggerCount = new AtomicInteger();
        Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory = context -> TestingStateTransitionManager.withOnTriggerEventOnly(rescaleTriggerCount::incrementAndGet);
        int rescaleOnFailedCheckpointsCount = 3;
        try (MockExecutingContext ctx = new MockExecutingContext();){
            Executing testInstance = new ExecutingStateBuilder().setStateTransitionManagerFactory(stateTransitionManagerFactory).setRescaleOnFailedCheckpointCount(3).build(ctx);
            for (int rescaleIteration = 1; rescaleIteration <= 3; ++rescaleIteration) {
                testInstance.onFailedCheckpoint();
                testInstance.onNewResourceRequirements();
                for (int i = 0; i < 3; ++i) {
                    ((AtomicIntegerAssert)Assertions.assertThat((AtomicInteger)rescaleTriggerCount).as("No rescale operation should have been triggered for iteration #%d, yet.", new Object[]{rescaleIteration})).hasValue(rescaleIteration - 1);
                    testInstance.onFailedCheckpoint();
                }
                ((AtomicIntegerAssert)Assertions.assertThat((AtomicInteger)rescaleTriggerCount).as("The rescale operation for iteration #%d should have been properly triggered.", new Object[]{rescaleIteration})).hasValue(rescaleIteration);
            }
        }
    }

    @Test
    public void testOnCompletedCheckpointResetsFailedCheckpointCount() throws Exception {
        AtomicInteger rescaleTriggeredCount = new AtomicInteger();
        Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory = context -> TestingStateTransitionManager.withOnTriggerEventOnly(rescaleTriggeredCount::incrementAndGet);
        int rescaleOnFailedCheckpointsCount = 3;
        try (MockExecutingContext ctx = new MockExecutingContext();){
            Executing testInstance = new ExecutingStateBuilder().setStateTransitionManagerFactory(stateTransitionManagerFactory).setRescaleOnFailedCheckpointCount(3).build(ctx);
            testInstance.onFailedCheckpoint();
            testInstance.onNewResourcesAvailable();
            IntStream.range(0, 2).forEach(ignored -> testInstance.onFailedCheckpoint());
            ((AtomicIntegerAssert)Assertions.assertThat((AtomicInteger)rescaleTriggeredCount).as("No rescaling should have been trigger, yet.", new Object[0])).hasValue(0);
            testInstance.onCompletedCheckpoint();
            testInstance.onNewResourceRequirements();
            ((AtomicIntegerAssert)Assertions.assertThat((AtomicInteger)rescaleTriggeredCount).as("The completed checkpoint should have triggered a rescale.", new Object[0])).hasValue(1);
            IntStream.range(0, 2).forEach(ignored -> testInstance.onFailedCheckpoint());
            ((AtomicIntegerAssert)Assertions.assertThat((AtomicInteger)rescaleTriggeredCount).as("No additional rescaling should have been trigger by any subsequent failed checkpoint, yet.", new Object[0])).hasValue(1);
            testInstance.onFailedCheckpoint();
            ((AtomicIntegerAssert)Assertions.assertThat((AtomicInteger)rescaleTriggeredCount).as("The previous failed checkpoint should have triggered the rescale.", new Object[0])).hasValue(2);
        }
    }

    @Test
    void testDisposalOfOperatorCoordinatorsOnLeaveOfStateWithExecutionGraph() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            TestingOperatorCoordinatorHandler operatorCoordinator = new TestingOperatorCoordinatorHandler();
            Executing exec = new ExecutingStateBuilder().setOperatorCoordinatorHandler(operatorCoordinator).build(ctx);
            exec.onLeave(MockState.class);
            Assertions.assertThat((boolean)operatorCoordinator.isDisposed()).isTrue();
        }
    }

    @Test
    void testUnrecoverableGlobalFailureTransitionsToFailingState() throws Exception {
        String failureMsg = "test exception";
        try (MockExecutingContext ctx = new MockExecutingContext();){
            Executing exec = new ExecutingStateBuilder().build(ctx);
            ctx.setExpectFailing(failingArguments -> {
                Assertions.assertThat((Object)failingArguments.getExecutionGraph()).isNotNull();
                Assertions.assertThat((String)failingArguments.getFailureCause().getMessage()).isEqualTo("test exception");
            });
            ctx.setHowToHandleFailure(FailureResult::canNotRestart);
            exec.handleGlobalFailure((Throwable)new RuntimeException("test exception"), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
        }
    }

    @Test
    void testRecoverableGlobalFailureTransitionsToRestarting() throws Exception {
        Duration duration = Duration.ZERO;
        try (MockExecutingContext ctx = new MockExecutingContext();){
            Executing exec = new ExecutingStateBuilder().build(ctx);
            ctx.setExpectRestarting(restartingArguments -> {
                Assertions.assertThat((Duration)restartingArguments.getBackoffTime()).isEqualTo((Object)duration);
                Assertions.assertThat(restartingArguments.getRestartWithParallelism()).isEmpty();
            });
            ctx.setHowToHandleFailure(f -> FailureResult.canRestart((Throwable)f, (Duration)duration));
            exec.handleGlobalFailure((Throwable)new RuntimeException("Recoverable error"), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
        }
    }

    @Test
    void testCancelTransitionsToCancellingState() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            Executing exec = new ExecutingStateBuilder().build(ctx);
            ctx.setExpectCancelling(WaitingForResourcesTest.assertNonNull());
            exec.cancel();
        }
    }

    @Test
    void testTransitionToFinishedOnFailedExecutionGraph() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            Executing exec = new ExecutingStateBuilder().build(ctx);
            ctx.setExpectFinished(archivedExecutionGraph -> Assertions.assertThat((Comparable)archivedExecutionGraph.getState()).isEqualTo((Object)JobStatus.FAILED));
            exec.getExecutionGraph().failJob((Throwable)new RuntimeException("test failure"), System.currentTimeMillis());
        }
    }

    @Test
    void testTransitionToFinishedOnSuspend() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            Executing exec = new ExecutingStateBuilder().build(ctx);
            ctx.setExpectFinished(archivedExecutionGraph -> Assertions.assertThat((Comparable)archivedExecutionGraph.getState()).isEqualTo((Object)JobStatus.SUSPENDED));
            exec.suspend((Throwable)new RuntimeException("suspend"));
        }
    }

    @Test
    void testFailureReportedViaUpdateTaskExecutionStateCausesFailingOnNoRestart() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            StateTrackingMockExecutionGraph returnsFailedStateExecutionGraph = new StateTrackingMockExecutionGraph();
            Executing exec = new ExecutingStateBuilder().setExecutionGraph(returnsFailedStateExecutionGraph).build(ctx);
            ctx.setHowToHandleFailure(FailureResult::canNotRestart);
            ctx.setExpectFailing(WaitingForResourcesTest.assertNonNull());
            RuntimeException exception = new RuntimeException();
            TestingAccessExecution execution = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo((Throwable)exception, System.currentTimeMillis())).build();
            returnsFailedStateExecutionGraph.registerExecution(execution);
            TaskExecutionStateTransition taskExecutionStateTransition = ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
            exec.updateTaskExecutionState(taskExecutionStateTransition, FailureEnricherUtils.EMPTY_FAILURE_LABELS);
        }
    }

    @Test
    void testFailureReportedViaUpdateTaskExecutionStateCausesRestart() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            StateTrackingMockExecutionGraph returnsFailedStateExecutionGraph = new StateTrackingMockExecutionGraph();
            Executing exec = new ExecutingStateBuilder().setExecutionGraph(returnsFailedStateExecutionGraph).build(ctx);
            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart((Throwable)failure, (Duration)Duration.ZERO));
            ctx.setExpectRestarting(restartingArguments -> {
                Assertions.assertThat((Object)restartingArguments).isNotNull();
                Assertions.assertThat(restartingArguments.getRestartWithParallelism()).isEmpty();
            });
            RuntimeException exception = new RuntimeException();
            TestingAccessExecution execution = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo((Throwable)exception, System.currentTimeMillis())).build();
            returnsFailedStateExecutionGraph.registerExecution(execution);
            TaskExecutionStateTransition taskExecutionStateTransition = ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
            exec.updateTaskExecutionState(taskExecutionStateTransition, FailureEnricherUtils.EMPTY_FAILURE_LABELS);
        }
    }

    @Test
    void testFalseReportsViaUpdateTaskExecutionStateAreIgnored() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            MockExecutionGraph returnsFailedStateExecutionGraph = new MockExecutionGraph(false, Collections::emptyList);
            Executing exec = new ExecutingStateBuilder().setExecutionGraph(returnsFailedStateExecutionGraph).build(ctx);
            RuntimeException exception = new RuntimeException();
            TestingAccessExecution execution = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo((Throwable)exception, System.currentTimeMillis())).build();
            returnsFailedStateExecutionGraph.registerExecution(execution);
            TaskExecutionStateTransition taskExecutionStateTransition = ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
            exec.updateTaskExecutionState(taskExecutionStateTransition, FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            ctx.assertNoStateTransition();
        }
    }

    @Test
    void testExecutionVertexMarkedAsFailedOnDeploymentFailure() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            MockExecutionJobVertex mejv = new MockExecutionJobVertex(FailOnDeployMockExecutionVertex::new);
            MockExecutionGraph executionGraph = new MockExecutionGraph(() -> Collections.singletonList(mejv));
            Executing exec = new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx);
            Assertions.assertThat((Throwable)((FailOnDeployMockExecutionVertex)mejv.getMockExecutionVertex()).getMarkedFailure()).isInstanceOf(JobException.class);
        }
    }

    @Test
    void testTransitionToStopWithSavepointState() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            final CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().build((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
            StateTrackingMockExecutionGraph mockedExecutionGraphWithCheckpointCoordinator = new StateTrackingMockExecutionGraph(){

                @Override
                @Nullable
                public CheckpointCoordinator getCheckpointCoordinator() {
                    return coordinator;
                }
            };
            Executing exec = new ExecutingStateBuilder().setExecutionGraph(mockedExecutionGraphWithCheckpointCoordinator).build(ctx);
            ctx.setExpectStopWithSavepoint(WaitingForResourcesTest.assertNonNull());
            exec.stopWithSavepoint("file:///tmp/target", true, SavepointFormatType.CANONICAL);
        }
    }

    @Test
    void testCheckpointSchedulerIsStoppedOnStopWithSavepoint() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            final CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().build((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
            StateTrackingMockExecutionGraph mockedExecutionGraphWithCheckpointCoordinator = new StateTrackingMockExecutionGraph(){

                @Override
                @Nullable
                public CheckpointCoordinator getCheckpointCoordinator() {
                    return coordinator;
                }
            };
            Executing exec = new ExecutingStateBuilder().setExecutionGraph(mockedExecutionGraphWithCheckpointCoordinator).build(ctx);
            coordinator.startCheckpointScheduler();
            Assertions.assertThat((boolean)coordinator.isPeriodicCheckpointingStarted()).isTrue();
            ctx.setExpectStopWithSavepoint(WaitingForResourcesTest.assertNonNull());
            exec.stopWithSavepoint("file:///tmp/target", true, SavepointFormatType.CANONICAL);
            Assertions.assertThat((boolean)coordinator.isPeriodicCheckpointingStarted()).isFalse();
        }
    }

    @Test
    void testJobInformationMethods() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            Executing exec = new ExecutingStateBuilder().build(ctx);
            JobID jobId = exec.getExecutionGraph().getJobID();
            Assertions.assertThat((Object)exec.getJob()).isInstanceOf(ArchivedExecutionGraph.class);
            Assertions.assertThat((Comparable)exec.getJob().getJobID()).isEqualTo((Object)jobId);
            Assertions.assertThat((Comparable)exec.getJobStatus()).isEqualTo((Object)JobStatus.RUNNING);
        }
    }

    @Test
    void testStateDoesNotExposeGloballyTerminalExecutionGraph() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            FinishingMockExecutionGraph finishingMockExecutionGraph = new FinishingMockExecutionGraph();
            Executing executing = new ExecutingStateBuilder().setExecutionGraph(finishingMockExecutionGraph).build(ctx);
            ctx.setExpectFinished(eg -> {});
            finishingMockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED);
            Assertions.assertThat((Comparable)executing.getExecutionGraph().getState()).isEqualTo((Object)JobStatus.FINISHED);
            Assertions.assertThat((Comparable)executing.getJobStatus()).isEqualTo((Object)JobStatus.RUNNING);
            Assertions.assertThat((Comparable)executing.getJob().getState()).isEqualTo((Object)JobStatus.RUNNING);
            Assertions.assertThat((long)executing.getJob().getStatusTimestamp(JobStatus.FINISHED)).isZero();
        }
    }

    @Test
    void testExecutingChecksForNewResourcesWhenBeingCreated() throws Exception {
        String onChangeEventLabel = "onChange";
        String onTriggerEventLabel = "onTrigger";
        ArrayDeque actualEvents = new ArrayDeque();
        try (MockExecutingContext ctx = new MockExecutingContext();){
            new ExecutingStateBuilder().setStateTransitionManagerFactory(context -> new TestingStateTransitionManager(() -> actualEvents.add("onChange"), () -> actualEvents.add("onTrigger"))).build(ctx);
            ctx.triggerExecutors();
            Assertions.assertThat((String)((String)actualEvents.poll())).isEqualTo("onChange");
            Assertions.assertThat((String)((String)actualEvents.poll())).isEqualTo("onTrigger");
            Assertions.assertThat((boolean)actualEvents.isEmpty()).isTrue();
        }
    }

    @Test
    public void testOmitsWaitingForResourcesStateWhenRestarting() throws Exception {
        try (MockExecutingContext ctx = new MockExecutingContext();){
            Executing testInstance = new ExecutingStateBuilder().build(ctx);
            VertexParallelism vertexParallelism = new VertexParallelism(Collections.singletonMap(new JobVertexID(), 2));
            ctx.setVertexParallelism(vertexParallelism);
            ctx.setExpectRestarting(restartingArguments -> Assertions.assertThat(restartingArguments.getRestartWithParallelism()).hasValue((Object)vertexParallelism));
            testInstance.transitionToSubsequentState();
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testInternalParallelismChangeBehavior(boolean parallelismChanged) throws Exception {
        try (MockExecutingContext adaptiveSchedulerCtx = new MockExecutingContext();){
            AtomicBoolean onChangeCalled = new AtomicBoolean();
            Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory = transitionCtx -> TestingStateTransitionManager.withOnChangeEventOnly(() -> {
                Assertions.assertThat((boolean)transitionCtx.hasDesiredResources()).isEqualTo(parallelismChanged);
                Assertions.assertThat((boolean)transitionCtx.hasSufficientResources()).isEqualTo(parallelismChanged);
                onChangeCalled.set(true);
            });
            MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex(MockExecutionVertex::new);
            MockExecutionGraph executionGraph = new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
            adaptiveSchedulerCtx.setHasDesiredResources(() -> true);
            adaptiveSchedulerCtx.setHasSufficientResources(() -> true);
            adaptiveSchedulerCtx.setVertexParallelism(new VertexParallelism(executionGraph.getAllVertices().values().stream().collect(Collectors.toMap(AccessExecutionJobVertex::getJobVertexId, v -> parallelismChanged ? 1 + v.getParallelism() : v.getParallelism()))));
            Executing exec = new ExecutingStateBuilder().setStateTransitionManagerFactory(stateTransitionManagerFactory).setExecutionGraph(executionGraph).build(adaptiveSchedulerCtx);
            exec.onNewResourcesAvailable();
            Assertions.assertThat((boolean)onChangeCalled.get()).isTrue();
        }
    }

    public static TaskExecutionStateTransition createFailingStateTransition(ExecutionAttemptID attemptId, Exception exception) throws JobException {
        return new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)exception));
    }

    private ExecutionGraphHandler getExecutionGraphHandler(ExecutionGraph executionGraph, ComponentMainThreadExecutor mainThreadExecutor) {
        return new ExecutionGraphHandler(executionGraph, log, (Executor)mainThreadExecutor, mainThreadExecutor);
    }

    private static class MockInternalExecutionGraphAccessor
    implements InternalExecutionGraphAccessor {
        private MockInternalExecutionGraphAccessor() {
        }

        public Executor getFutureExecutor() {
            return ForkJoinPool.commonPool();
        }

        public ClassLoader getUserClassLoader() {
            return null;
        }

        public JobID getJobID() {
            return null;
        }

        public BlobWriter getBlobWriter() {
            return null;
        }

        @Nonnull
        public ComponentMainThreadExecutor getJobMasterMainThreadExecutor() {
            return null;
        }

        public ShuffleMaster<?> getShuffleMaster() {
            return null;
        }

        public JobMasterPartitionTracker getPartitionTracker() {
            return null;
        }

        public void registerExecution(Execution exec) {
        }

        public void deregisterExecution(Execution exec) {
        }

        public PartitionGroupReleaseStrategy getPartitionGroupReleaseStrategy() {
            return null;
        }

        public void failGlobal(Throwable t) {
        }

        public void notifySchedulerNgAboutInternalTaskFailure(ExecutionAttemptID attemptId, Throwable t, boolean cancelTask, boolean releasePartitions) {
        }

        public void jobVertexFinished() {
        }

        public void jobVertexUnFinished() {
        }

        public ExecutionDeploymentListener getExecutionDeploymentListener() {
            return null;
        }

        public void notifyExecutionChange(Execution execution, ExecutionState previousState, ExecutionState newExecutionState) {
        }

        public EdgeManager getEdgeManager() {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public ExecutionVertex getExecutionVertexOrThrow(ExecutionVertexID id) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public IntermediateResultPartition getResultPartitionOrThrow(IntermediateResultPartitionID id) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public void deleteBlobs(List<PermanentBlobKey> blobKeys) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public ExecutionJobVertex getJobVertex(JobVertexID id) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public boolean isDynamic() {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy() {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public ExecutionGraphID getExecutionGraphID() {
            return new ExecutionGraphID();
        }

        public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(IntermediateDataSetID intermediateResultPartition) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public JobVertexInputInfo getJobVertexInputInfo(JobVertexID jobVertexId, IntermediateDataSetID resultId) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public TaskDeploymentDescriptorFactory getTaskDeploymentDescriptorFactory() {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }
    }

    static class MockExecutionVertex
    extends ExecutionVertex {
        private boolean deployCalled = false;
        private ExecutionState mockedExecutionState = ExecutionState.RUNNING;

        MockExecutionVertex(ExecutionJobVertex jobVertex) {
            super(jobVertex, 1, new IntermediateResult[0], Duration.ofMillis(1L), 1L, 1, 0);
        }

        public void deploy() throws JobException {
            this.deployCalled = true;
        }

        public boolean isDeployCalled() {
            return this.deployCalled;
        }

        public ExecutionState getExecutionState() {
            return this.mockedExecutionState;
        }

        public void setMockedExecutionState(ExecutionState mockedExecutionState) {
            this.mockedExecutionState = mockedExecutionState;
        }
    }

    static class FailOnDeployMockExecutionVertex
    extends ExecutionVertex {
        @Nullable
        private Throwable markFailed = null;

        public FailOnDeployMockExecutionVertex(ExecutionJobVertex jobVertex) {
            super(jobVertex, 1, new IntermediateResult[0], Duration.ofMillis(1L), 1L, 1, 0);
        }

        public void deploy() throws JobException {
            throw new JobException("Intentional Test exception");
        }

        public void markFailed(Throwable t) {
            this.markFailed = t;
        }

        @Nullable
        public Throwable getMarkedFailure() {
            return this.markFailed;
        }
    }

    static class MockExecutionJobVertex
    extends ExecutionJobVertex {
        private final ExecutionVertex mockExecutionVertex;

        MockExecutionJobVertex(Function<ExecutionJobVertex, ExecutionVertex> executionVertexSupplier) throws JobException {
            this(executionVertexSupplier, new JobVertex("test"));
        }

        MockExecutionJobVertex(Function<ExecutionJobVertex, ExecutionVertex> executionVertexSupplier, JobVertex jobVertex) throws JobException {
            super((InternalExecutionGraphAccessor)new MockInternalExecutionGraphAccessor(), jobVertex, (VertexParallelismInformation)new DefaultVertexParallelismInfo(1, 1, max -> Optional.empty()), (CoordinatorStore)new CoordinatorStoreImpl(), UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
            this.initialize(1, Duration.ofMillis(1L), 1L, (SubtaskAttemptNumberStore)new DefaultSubtaskAttemptNumberStore(Collections.emptyList()));
            this.mockExecutionVertex = executionVertexSupplier.apply(this);
        }

        public ExecutionVertex[] getTaskVertices() {
            return new ExecutionVertex[]{this.mockExecutionVertex};
        }

        public ExecutionVertex getMockExecutionVertex() {
            return this.mockExecutionVertex;
        }
    }

    private static class MockState
    implements State {
        private MockState() {
        }

        public void cancel() {
        }

        public void suspend(Throwable cause) {
        }

        public JobID getJobId() {
            return null;
        }

        public JobStatus getJobStatus() {
            return null;
        }

        public ArchivedExecutionGraph getJob() {
            return null;
        }

        public void handleGlobalFailure(Throwable cause, CompletableFuture<Map<String, String>> failureLabels) {
        }

        public Logger getLogger() {
            return null;
        }
    }

    private static class FinishingMockExecutionGraph
    extends StateTrackingMockExecutionGraph {
        private FinishingMockExecutionGraph() {
        }

        @Override
        public long getStatusTimestamp(JobStatus status) {
            switch (status) {
                case INITIALIZING: {
                    return 1L;
                }
                case CREATED: {
                    return 2L;
                }
                case RUNNING: {
                    return 3L;
                }
                case FINISHED: {
                    return 4L;
                }
            }
            return 0L;
        }
    }

    static class MockExecutionGraph
    extends StateTrackingMockExecutionGraph {
        private final boolean updateStateReturnValue;
        private final Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier;

        MockExecutionGraph(Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier) {
            this(false, getVerticesTopologicallySupplier);
        }

        private MockExecutionGraph(boolean updateStateReturnValue, Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier) {
            this.updateStateReturnValue = updateStateReturnValue;
            this.getVerticesTopologicallySupplier = getVerticesTopologicallySupplier;
        }

        @Override
        public boolean updateState(TaskExecutionStateTransition state) {
            return this.updateStateReturnValue;
        }

        @Override
        public Iterable<ExecutionJobVertex> getVerticesTopologically() {
            return this.getVerticesTopologicallySupplier.get();
        }
    }

    static class FailingArguments
    extends CancellingArguments {
        private final Throwable failureCause;

        public FailingArguments(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable failureCause) {
            super(executionGraph, executionGraphHandler, operatorCoordinatorHandler);
            this.failureCause = failureCause;
        }

        public Throwable getFailureCause() {
            return this.failureCause;
        }
    }

    static class RestartingArguments
    extends CancellingArguments {
        private final Duration backoffTime;
        @Nullable
        private final VertexParallelism restartWithParallelism;

        public RestartingArguments(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, @Nullable VertexParallelism restartWithParallelism) {
            super(executionGraph, executionGraphHandler, operatorCoordinatorHandler);
            this.backoffTime = backoffTime;
            this.restartWithParallelism = restartWithParallelism;
        }

        public Duration getBackoffTime() {
            return this.backoffTime;
        }

        public Optional<VertexParallelism> getRestartWithParallelism() {
            return Optional.ofNullable(this.restartWithParallelism);
        }
    }

    static class StopWithSavepointArguments
    extends CancellingArguments {
        private final CheckpointScheduling checkpointScheduling;
        private final CompletableFuture<String> savepointFuture;

        public StopWithSavepointArguments(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandle, CheckpointScheduling checkpointScheduling, CompletableFuture<String> savepointFuture) {
            super(executionGraph, executionGraphHandler, operatorCoordinatorHandle);
            this.checkpointScheduling = checkpointScheduling;
            this.savepointFuture = savepointFuture;
        }
    }

    static class CancellingArguments {
        private final ExecutionGraph executionGraph;
        private final ExecutionGraphHandler executionGraphHandler;
        private final OperatorCoordinatorHandler operatorCoordinatorHandle;

        public CancellingArguments(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandle) {
            this.executionGraph = executionGraph;
            this.executionGraphHandler = executionGraphHandler;
            this.operatorCoordinatorHandle = operatorCoordinatorHandle;
        }

        public ExecutionGraph getExecutionGraph() {
            return this.executionGraph;
        }

        public ExecutionGraphHandler getExecutionGraphHandler() {
            return this.executionGraphHandler;
        }

        public OperatorCoordinatorHandler getOperatorCoordinatorHandle() {
            return this.operatorCoordinatorHandle;
        }
    }

    private static class MockExecutingContext
    extends MockStateWithExecutionGraphContext
    implements Executing.Context {
        private final StateValidator<FailingArguments> failingStateValidator = new StateValidator("failing");
        private final StateValidator<RestartingArguments> restartingStateValidator = new StateValidator("restarting");
        private final StateValidator<CancellingArguments> cancellingStateValidator = new StateValidator("cancelling");
        private Function<Throwable, FailureResult> howToHandleFailure;
        private StateValidator<StopWithSavepointArguments> stopWithSavepointValidator = new StateValidator("stopWithSavepoint");
        private CompletableFuture<String> mockedStopWithSavepointOperationFuture = new CompletableFuture();
        private VertexParallelism vertexParallelism = new VertexParallelism(Collections.emptyMap());
        private Supplier<Boolean> hasDesiredResourcesSupplier = () -> false;
        private Supplier<Boolean> hasSufficientResourcesSupplier = () -> false;

        private MockExecutingContext() {
        }

        public void setExpectFailing(Consumer<FailingArguments> asserter) {
            this.failingStateValidator.expectInput(asserter);
        }

        public void setExpectRestarting(Consumer<RestartingArguments> asserter) {
            this.restartingStateValidator.expectInput(asserter);
        }

        public void setExpectCancelling(Consumer<CancellingArguments> asserter) {
            this.cancellingStateValidator.expectInput(asserter);
        }

        public void setExpectStopWithSavepoint(Consumer<StopWithSavepointArguments> asserter) {
            this.stopWithSavepointValidator.expectInput(asserter);
        }

        public void setHowToHandleFailure(Function<Throwable, FailureResult> function) {
            this.howToHandleFailure = function;
        }

        public void setVertexParallelism(VertexParallelism vertexParallelism) {
            this.vertexParallelism = vertexParallelism;
        }

        public void setHasDesiredResources(Supplier<Boolean> sup) {
            this.hasDesiredResourcesSupplier = sup;
        }

        public void setHasSufficientResources(Supplier<Boolean> sup) {
            this.hasSufficientResourcesSupplier = sup;
        }

        public void goToCanceling(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection) {
            this.cancellingStateValidator.validateInput(new CancellingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler));
            this.hadStateTransition = true;
        }

        public FailureResult howToHandleFailure(Throwable failure, CompletableFuture<Map<String, String>> failureLabels) {
            return this.howToHandleFailure.apply(failure);
        }

        public Optional<VertexParallelism> getAvailableVertexParallelism() {
            return Optional.ofNullable(this.vertexParallelism);
        }

        public void goToRestarting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, @Nullable VertexParallelism restartWithParallelism, List<ExceptionHistoryEntry> failureCollection) {
            this.restartingStateValidator.validateInput(new RestartingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler, backoffTime, restartWithParallelism));
            this.hadStateTransition = true;
        }

        public void goToFailing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable failureCause, List<ExceptionHistoryEntry> failureCollection) {
            this.failingStateValidator.validateInput(new FailingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler, failureCause));
            this.hadStateTransition = true;
        }

        public CompletableFuture<String> goToStopWithSavepoint(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, CompletableFuture<String> savepointFuture, List<ExceptionHistoryEntry> failureCollection) {
            this.stopWithSavepointValidator.validateInput(new StopWithSavepointArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler, checkpointScheduling, savepointFuture));
            this.hadStateTransition = true;
            return this.mockedStopWithSavepointOperationFuture;
        }

        public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) {
            return this.getMainThreadExecutor().schedule(() -> this.runIfState(expectedState, action), delay.toMillis(), TimeUnit.MILLISECONDS);
        }

        public boolean hasDesiredResources() {
            return this.hasDesiredResourcesSupplier.get();
        }

        public boolean hasSufficientResources() {
            return this.hasSufficientResourcesSupplier.get();
        }

        @Override
        public void close() throws Exception {
            super.close();
            this.failingStateValidator.close();
            this.restartingStateValidator.close();
            this.cancellingStateValidator.close();
            this.stopWithSavepointValidator.close();
        }

        @Override
        public void archiveFailure(RootExceptionHistoryEntry failure) {
        }
    }

    private final class ExecutingStateBuilder {
        private ExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().build((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
        private OperatorCoordinatorHandler operatorCoordinatorHandler;
        private Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory = context -> TestingStateTransitionManager.withNoOp();
        private int rescaleOnFailedCheckpointCount = 1;

        private ExecutingStateBuilder() throws JobException, JobExecutionException {
            this.operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();
        }

        public ExecutingStateBuilder setExecutionGraph(ExecutionGraph executionGraph) {
            this.executionGraph = executionGraph;
            return this;
        }

        public ExecutingStateBuilder setOperatorCoordinatorHandler(OperatorCoordinatorHandler operatorCoordinatorHandler) {
            this.operatorCoordinatorHandler = operatorCoordinatorHandler;
            return this;
        }

        public ExecutingStateBuilder setStateTransitionManagerFactory(Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory) {
            this.stateTransitionManagerFactory = stateTransitionManagerFactory;
            return this;
        }

        public ExecutingStateBuilder setRescaleOnFailedCheckpointCount(int rescaleOnFailedCheckpointCount) {
            this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
            return this;
        }

        private Executing build(MockExecutingContext ctx) {
            Executing executing;
            this.executionGraph.transitionToRunning();
            try {
                executing = new Executing(this.executionGraph, ExecutingTest.this.getExecutionGraphHandler(this.executionGraph, ctx.getMainThreadExecutor()), this.operatorCoordinatorHandler, log, (Executing.Context)ctx, ClassLoader.getSystemClassLoader(), new ArrayList(), this.stateTransitionManagerFactory::apply, this.rescaleOnFailedCheckpointCount);
                Preconditions.checkState((!ctx.hadStateTransition ? 1 : 0) != 0, (Object)"State construction is an on-going state transition, during which no further transitions are allowed.");
            }
            catch (Throwable throwable) {
                Preconditions.checkState((!ctx.hadStateTransition ? 1 : 0) != 0, (Object)"State construction is an on-going state transition, during which no further transitions are allowed.");
                throw throwable;
            }
            return executing;
        }
    }
}

