/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class DefaultSchedulerCheckpointCoordinatorTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    DefaultSchedulerCheckpointCoordinatorTest() {
    }

    @Test
    void testClosingSchedulerShutsDownCheckpointCoordinatorOnFailedExecutionGraph() throws Exception {
        CompletableFuture<JobStatus> counterShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCheckpointIDCounter counter = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(counterShutdownFuture);
        CompletableFuture<JobStatus> storeShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCompletedCheckpointStore store = TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(storeShutdownFuture);
        DefaultScheduler scheduler = this.createSchedulerAndEnableCheckpointing(counter, store);
        ExecutionGraph graph = scheduler.getExecutionGraph();
        CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
        Assertions.assertThat((Object)checkpointCoordinator).isNotNull();
        Assertions.assertThat((boolean)checkpointCoordinator.isShutdown()).isFalse();
        graph.failJob((Throwable)new Exception("Test Exception"), System.currentTimeMillis());
        scheduler.closeAsync().get();
        Assertions.assertThat((boolean)checkpointCoordinator.isShutdown()).isTrue();
        Assertions.assertThat(counterShutdownFuture).isCompletedWithValue((Object)JobStatus.FAILED);
        Assertions.assertThat(storeShutdownFuture).isCompletedWithValue((Object)JobStatus.FAILED);
    }

    @Test
    void testClosingSchedulerShutsDownCheckpointCoordinatorOnSuspendedExecutionGraph() throws Exception {
        CompletableFuture<JobStatus> counterShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCheckpointIDCounter counter = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(counterShutdownFuture);
        CompletableFuture<JobStatus> storeShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCompletedCheckpointStore store = TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(storeShutdownFuture);
        DefaultScheduler scheduler = this.createSchedulerAndEnableCheckpointing(counter, store);
        ExecutionGraph graph = scheduler.getExecutionGraph();
        CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
        Assertions.assertThat((Object)checkpointCoordinator).isNotNull();
        Assertions.assertThat((boolean)checkpointCoordinator.isShutdown()).isFalse();
        graph.suspend((Throwable)new Exception("Test Exception"));
        scheduler.closeAsync().get();
        Assertions.assertThat((boolean)checkpointCoordinator.isShutdown()).isTrue();
        Assertions.assertThat(counterShutdownFuture).isCompletedWithValue((Object)JobStatus.SUSPENDED);
        Assertions.assertThat(storeShutdownFuture).isCompletedWithValue((Object)JobStatus.SUSPENDED);
    }

    @Test
    void testClosingSchedulerShutsDownCheckpointCoordinatorOnFinishedExecutionGraph() throws Exception {
        CompletableFuture<JobStatus> counterShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCheckpointIDCounter counter = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(counterShutdownFuture);
        CompletableFuture<JobStatus> storeShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCompletedCheckpointStore store = TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(storeShutdownFuture);
        DefaultScheduler scheduler = this.createSchedulerAndEnableCheckpointing(counter, store);
        ExecutionGraph graph = scheduler.getExecutionGraph();
        CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
        Assertions.assertThat((Object)checkpointCoordinator).isNotNull();
        Assertions.assertThat((boolean)checkpointCoordinator.isShutdown()).isFalse();
        scheduler.startScheduling();
        for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) {
            Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
            scheduler.updateTaskExecutionState(new TaskExecutionState(currentExecutionAttempt.getAttemptId(), ExecutionState.FINISHED));
        }
        Assertions.assertThat((CompletableFuture)graph.getTerminationFuture()).isCompletedWithValue((Object)JobStatus.FINISHED);
        scheduler.closeAsync().get();
        Assertions.assertThat((boolean)checkpointCoordinator.isShutdown()).isTrue();
        Assertions.assertThat(counterShutdownFuture).isCompletedWithValue((Object)JobStatus.FINISHED);
        Assertions.assertThat(storeShutdownFuture).isCompletedWithValue((Object)JobStatus.FINISHED);
    }

    @Test
    void testClosingSchedulerSuspendsExecutionGraphAndShutsDownCheckpointCoordinator() throws Exception {
        CompletableFuture<JobStatus> counterShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCheckpointIDCounter counter = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(counterShutdownFuture);
        CompletableFuture<JobStatus> storeShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCompletedCheckpointStore store = TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(storeShutdownFuture);
        DefaultScheduler scheduler = this.createSchedulerAndEnableCheckpointing(counter, store);
        ExecutionGraph graph = scheduler.getExecutionGraph();
        CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
        Assertions.assertThat((Object)checkpointCoordinator).isNotNull();
        Assertions.assertThat((boolean)checkpointCoordinator.isShutdown()).isFalse();
        scheduler.closeAsync().get();
        Assertions.assertThat((Comparable)graph.getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        Assertions.assertThat((boolean)checkpointCoordinator.isShutdown()).isTrue();
        Assertions.assertThat(counterShutdownFuture).isCompletedWithValue((Object)JobStatus.SUSPENDED);
        Assertions.assertThat(storeShutdownFuture).isCompletedWithValue((Object)JobStatus.SUSPENDED);
    }

    private DefaultScheduler createSchedulerAndEnableCheckpointing(CheckpointIDCounter counter, CompletedCheckpointStore store) throws Exception {
        Duration timeout = Duration.ofDays(1L);
        JobVertex jobVertex = new JobVertex("MockVertex");
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        CheckpointCoordinatorConfiguration chkConfig = CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(100L).setCheckpointTimeout(100L).build();
        JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(chkConfig, null);
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).setJobCheckpointingSettings(checkpointingSettings).build();
        return new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor()).setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(store, counter)).setRpcTimeout(timeout).build();
    }
}

