/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.cleanup;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.dispatcher.JobCancellationFailedException;
import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckpointResourcesCleanupRunner
implements JobManagerRunner {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointResourcesCleanupRunner.class);
    private final JobResult jobResult;
    private final CheckpointRecoveryFactory checkpointRecoveryFactory;
    private final CheckpointsCleaner checkpointsCleaner;
    private final SharedStateRegistryFactory sharedStateRegistryFactory;
    private final Configuration jobManagerConfiguration;
    private final Executor cleanupExecutor;
    private final long initializationTimestamp;
    private final CompletableFuture<Void> cleanupFuture;
    private final CompletableFuture<JobManagerRunnerResult> resultFuture;

    public CheckpointResourcesCleanupRunner(JobResult jobResult, CheckpointRecoveryFactory checkpointRecoveryFactory, SharedStateRegistryFactory sharedStateRegistryFactory, Configuration jobManagerConfiguration, Executor cleanupExecutor, long initializationTimestamp) {
        this.jobResult = Preconditions.checkNotNull(jobResult);
        this.checkpointRecoveryFactory = Preconditions.checkNotNull(checkpointRecoveryFactory);
        this.sharedStateRegistryFactory = Preconditions.checkNotNull(sharedStateRegistryFactory);
        this.jobManagerConfiguration = Preconditions.checkNotNull(jobManagerConfiguration);
        this.cleanupExecutor = Preconditions.checkNotNull(cleanupExecutor);
        this.initializationTimestamp = initializationTimestamp;
        this.checkpointsCleaner = new CheckpointsCleaner(jobManagerConfiguration.get(CheckpointingOptions.CLEANER_PARALLEL_MODE));
        this.resultFuture = new CompletableFuture();
        this.cleanupFuture = this.resultFuture.thenCompose(ignored -> this.runCleanupAsync());
    }

    private CompletableFuture<Void> runCleanupAsync() {
        return CompletableFuture.runAsync(() -> {
            try {
                this.cleanupCheckpoints();
            }
            catch (Exception e) {
                throw new CompletionException(e);
            }
        }, this.cleanupExecutor).thenCompose(ignore -> this.checkpointsCleaner.closeAsync());
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return this.cleanupFuture;
    }

    @Override
    public void start() throws Exception {
        this.resultFuture.complete(JobManagerRunnerResult.forSuccess(this.createExecutionGraphInfoFromJobResult()));
    }

    private void cleanupCheckpoints() throws Exception {
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore();
        CheckpointIDCounter checkpointIDCounter = this.createCheckpointIDCounter();
        Exception exception = null;
        try {
            completedCheckpointStore.shutdown(this.getJobStatus(), this.checkpointsCleaner);
        }
        catch (Exception e) {
            exception = e;
        }
        try {
            checkpointIDCounter.shutdown(this.getJobStatus()).get();
        }
        catch (Exception e) {
            exception = ExceptionUtils.firstOrSuppressed(e, exception);
        }
        if (exception != null) {
            throw exception;
        }
    }

    private CompletedCheckpointStore createCompletedCheckpointStore() throws Exception {
        return this.checkpointRecoveryFactory.createRecoveredCompletedCheckpointStore(this.getJobID(), DefaultCompletedCheckpointStoreUtils.getMaximumNumberOfRetainedCheckpoints(this.jobManagerConfiguration, LOG), this.sharedStateRegistryFactory, this.cleanupExecutor, RestoreMode.CLAIM);
    }

    private CheckpointIDCounter createCheckpointIDCounter() throws Exception {
        return this.checkpointRecoveryFactory.createCheckpointIDCounter(this.getJobID());
    }

    @Override
    public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
        return FutureUtils.completedExceptionally(new UnavailableDispatcherOperationException("Unable to get JobMasterGateway for job in cleanup phase. The requested operation is not available in that stage."));
    }

    @Override
    public CompletableFuture<JobManagerRunnerResult> getResultFuture() {
        return this.resultFuture;
    }

    @Override
    public JobID getJobID() {
        return this.jobResult.getJobId();
    }

    @Override
    public CompletableFuture<Acknowledge> cancel(Time timeout) {
        return FutureUtils.completedExceptionally(new JobCancellationFailedException("Cleanup tasks are not meant to be cancelled."));
    }

    @Override
    public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
        return CompletableFuture.completedFuture(this.getJobStatus());
    }

    @Override
    public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
        return this.requestJob(timeout).thenApply(executionGraphInfo -> JobDetails.createDetailsForJob(executionGraphInfo.getArchivedExecutionGraph()));
    }

    @Override
    public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
        return CompletableFuture.completedFuture(this.createExecutionGraphInfoFromJobResult());
    }

    @Override
    public boolean isInitialized() {
        return true;
    }

    private ExecutionGraphInfo createExecutionGraphInfoFromJobResult() {
        return CheckpointResourcesCleanupRunner.generateExecutionGraphInfo(this.jobResult, this.initializationTimestamp);
    }

    private JobStatus getJobStatus() {
        return CheckpointResourcesCleanupRunner.getJobStatus(this.jobResult);
    }

    private static JobStatus getJobStatus(JobResult jobResult) {
        return jobResult.getApplicationStatus().deriveJobStatus();
    }

    private static ExecutionGraphInfo generateExecutionGraphInfo(JobResult jobResult, long initializationTimestamp) {
        return new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph(jobResult.getJobId(), "unknown", CheckpointResourcesCleanupRunner.getJobStatus(jobResult), null, jobResult.getSerializedThrowable().orElse(null), null, initializationTimestamp));
    }
}

