/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.CompletedScheduledFuture;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.ExecutingTest;
import org.apache.flink.runtime.scheduler.adaptive.MockStateWithExecutionGraphContext;
import org.apache.flink.runtime.scheduler.adaptive.Restarting;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.StateValidator;
import org.apache.flink.runtime.scheduler.adaptive.TestingOperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class RestartingTest
extends TestLogger {
    @Test
    public void testExecutionGraphCancellationOnEnter() throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            this.createRestartingState(ctx, mockExecutionGraph);
            Assert.assertThat((Object)mockExecutionGraph.getState(), (Matcher)CoreMatchers.is((Object)JobStatus.CANCELLING));
        }
    }

    @Test
    public void testTransitionToWaitingForResourcesWhenCancellationComplete() throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            Restarting restarting = this.createRestartingState(ctx);
            ctx.setExpectWaitingForResources();
            restarting.onGloballyTerminalState(JobStatus.CANCELED);
        }
    }

    @Test
    public void testCancel() throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            Restarting restarting = this.createRestartingState(ctx);
            ctx.setExpectCancelling(WaitingForResourcesTest.assertNonNull());
            restarting.cancel();
        }
    }

    @Test
    public void testSuspendWithJobInCancellingState() throws Exception {
        this.testSuspend(false);
    }

    @Test
    public void testSuspendWithJobInCancelledState() throws Exception {
        this.testSuspend(true);
    }

    private void testSuspend(boolean cancellationCompleted) throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
            Restarting restarting = this.createRestartingState(ctx, executionGraph);
            if (cancellationCompleted) {
                executionGraph.completeTerminationFuture(JobStatus.CANCELED);
            }
            ctx.setExpectFinished(archivedExecutionGraph -> Assert.assertThat((Object)archivedExecutionGraph.getState(), (Matcher)CoreMatchers.is((Object)JobStatus.SUSPENDED)));
            RuntimeException cause = new RuntimeException("suspend");
            restarting.suspend((Throwable)cause);
        }
    }

    @Test
    public void testGlobalFailuresAreIgnored() throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            Restarting restarting = this.createRestartingState(ctx);
            restarting.handleGlobalFailure((Throwable)new RuntimeException(), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            ctx.assertNoStateTransition();
        }
    }

    @Test
    public void testStateDoesNotExposeGloballyTerminalExecutionGraph() throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            Restarting restarting = this.createRestartingState(ctx, mockExecutionGraph);
            ctx.setExpectWaitingForResources();
            mockExecutionGraph.completeTerminationFuture(JobStatus.CANCELED);
            Assert.assertThat((Object)restarting.getExecutionGraph().getState(), (Matcher)CoreMatchers.is((Object)JobStatus.CANCELED));
            Assert.assertThat((Object)restarting.getJobStatus(), (Matcher)CoreMatchers.is((Object)JobStatus.RESTARTING));
            Assert.assertThat((Object)restarting.getJob().getState(), (Matcher)CoreMatchers.is((Object)JobStatus.RESTARTING));
            Assert.assertThat((Object)restarting.getJob().getStatusTimestamp(JobStatus.CANCELED), (Matcher)CoreMatchers.is((Object)0L));
        }
    }

    public Restarting createRestartingState(MockRestartingContext ctx, ExecutionGraph executionGraph) {
        ExecutionGraphHandler executionGraphHandler = new ExecutionGraphHandler(executionGraph, this.log, (Executor)ctx.getMainThreadExecutor(), ctx.getMainThreadExecutor());
        TestingOperatorCoordinatorHandler operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();
        executionGraph.transitionToRunning();
        return new Restarting((Restarting.Context)ctx, executionGraph, executionGraphHandler, (OperatorCoordinatorHandler)operatorCoordinatorHandler, this.log, Duration.ZERO, ClassLoader.getSystemClassLoader(), new ArrayList());
    }

    public Restarting createRestartingState(MockRestartingContext ctx) throws JobException, JobExecutionException {
        return this.createRestartingState(ctx, new StateTrackingMockExecutionGraph());
    }

    private static class MockRestartingContext
    extends MockStateWithExecutionGraphContext
    implements Restarting.Context {
        private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator = new StateValidator("Cancelling");
        private final StateValidator<Void> waitingForResourcesStateValidator = new StateValidator("WaitingForResources");

        private MockRestartingContext() {
        }

        public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
            this.cancellingStateValidator.expectInput(asserter);
        }

        public void setExpectWaitingForResources() {
            this.waitingForResourcesStateValidator.expectInput(none -> {});
        }

        public void goToCanceling(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection) {
            this.cancellingStateValidator.validateInput(new ExecutingTest.CancellingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler));
            this.hadStateTransition = true;
        }

        @Override
        public void archiveFailure(RootExceptionHistoryEntry failure) {
        }

        public void goToWaitingForResources(ExecutionGraph previousExecutionGraph) {
            this.waitingForResourcesStateValidator.validateInput(null);
            this.hadStateTransition = true;
        }

        public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) {
            if (!this.hadStateTransition) {
                action.run();
            }
            return CompletedScheduledFuture.create(null);
        }

        @Override
        public void close() throws Exception {
            super.close();
            this.cancellingStateValidator.close();
            this.waitingForResourcesStateValidator.close();
        }
    }
}

