/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.runner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.FunctionUtils;

public class SessionDispatcherLeaderProcess
extends AbstractDispatcherLeaderProcess
implements ExecutionPlanStore.ExecutionPlanListener {
    private final AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory;
    private final ExecutionPlanStore executionPlanStore;
    private final JobResultStore jobResultStore;
    private final Executor ioExecutor;
    private CompletableFuture<Void> onGoingRecoveryOperation = FutureUtils.completedVoidFuture();

    private SessionDispatcherLeaderProcess(UUID leaderSessionId, AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory, ExecutionPlanStore executionPlanStore, JobResultStore jobResultStore, Executor ioExecutor, FatalErrorHandler fatalErrorHandler) {
        super(leaderSessionId, fatalErrorHandler);
        this.dispatcherGatewayServiceFactory = dispatcherGatewayServiceFactory;
        this.executionPlanStore = executionPlanStore;
        this.jobResultStore = jobResultStore;
        this.ioExecutor = ioExecutor;
    }

    @Override
    protected void onStart() {
        this.startServices();
        this.onGoingRecoveryOperation = this.createDispatcherBasedOnRecoveredExecutionPlansAndRecoveredDirtyJobResults();
    }

    private void startServices() {
        try {
            this.executionPlanStore.start(this);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException(String.format("Could not start %s when trying to start the %s.", this.executionPlanStore.getClass().getSimpleName(), this.getClass().getSimpleName()), e);
        }
    }

    private void createDispatcherIfRunning(Collection<ExecutionPlan> executionPlans, Collection<JobResult> recoveredDirtyJobResults) {
        this.runIfStateIs(AbstractDispatcherLeaderProcess.State.RUNNING, () -> this.createDispatcher(executionPlans, recoveredDirtyJobResults));
    }

    private void createDispatcher(Collection<ExecutionPlan> executionPlans, Collection<JobResult> recoveredDirtyJobResults) {
        AbstractDispatcherLeaderProcess.DispatcherGatewayService dispatcherService = this.dispatcherGatewayServiceFactory.create(DispatcherId.fromUuid(this.getLeaderSessionId()), executionPlans, recoveredDirtyJobResults, this.executionPlanStore, this.jobResultStore);
        this.completeDispatcherSetup(dispatcherService);
    }

    private CompletableFuture<Void> createDispatcherBasedOnRecoveredExecutionPlansAndRecoveredDirtyJobResults() {
        CompletableFuture<Collection> dirtyJobsFuture = CompletableFuture.supplyAsync(this::getDirtyJobResultsIfRunning, this.ioExecutor);
        return ((CompletableFuture)((CompletableFuture)dirtyJobsFuture.thenApplyAsync(dirtyJobs -> this.recoverJobsIfRunning(dirtyJobs.stream().map(JobResult::getJobId).collect(Collectors.toSet())), this.ioExecutor)).thenAcceptBoth(dirtyJobsFuture, this::createDispatcherIfRunning)).handle(this::onErrorIfRunning);
    }

    private Collection<ExecutionPlan> recoverJobsIfRunning(Set<JobID> recoveredDirtyJobResults) {
        return this.supplyUnsynchronizedIfRunning(() -> this.recoverJobs(recoveredDirtyJobResults)).orElse(Collections.emptyList());
    }

    private Collection<ExecutionPlan> recoverJobs(Set<JobID> recoveredDirtyJobResults) {
        this.log.info("Recover all persisted job graphs that are not finished, yet.");
        Collection<JobID> jobIds = this.getJobIds();
        ArrayList<ExecutionPlan> recoveredExecutionPlans = new ArrayList<ExecutionPlan>();
        for (JobID jobId : jobIds) {
            if (!recoveredDirtyJobResults.contains(jobId)) {
                this.tryRecoverJob(jobId).ifPresent(recoveredExecutionPlans::add);
                continue;
            }
            this.log.info("Skipping recovery of a job with job id {}, because it already reached a globally terminal state", (Object)jobId);
        }
        this.log.info("Successfully recovered {} persisted job graphs.", (Object)recoveredExecutionPlans.size());
        return recoveredExecutionPlans;
    }

    private Collection<JobID> getJobIds() {
        try {
            return this.executionPlanStore.getJobIds();
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Could not retrieve job ids of persisted jobs.", e);
        }
    }

    private Optional<ExecutionPlan> tryRecoverJob(JobID jobId) {
        this.log.info("Trying to recover job with job id {}.", (Object)jobId);
        try {
            ExecutionPlan executionPlan = this.executionPlanStore.recoverExecutionPlan(jobId);
            if (executionPlan == null) {
                this.log.info("Skipping recovery of job with job id {}, because it already finished in a previous execution", (Object)jobId);
            }
            return Optional.ofNullable(executionPlan);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException(String.format("Could not recover job with job id %s.", jobId), e);
        }
    }

    private Collection<JobResult> getDirtyJobResultsIfRunning() {
        return this.supplyUnsynchronizedIfRunning(this::getDirtyJobResults).orElse(Collections.emptyList());
    }

    private Collection<JobResult> getDirtyJobResults() {
        try {
            return this.jobResultStore.getDirtyResults();
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not retrieve JobResults of globally-terminated jobs from JobResultStore", e);
        }
    }

    @Override
    protected CompletableFuture<Void> onClose() {
        return CompletableFuture.runAsync(this::stopServices, this.ioExecutor);
    }

    private void stopServices() {
        try {
            this.executionPlanStore.stop();
        }
        catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
    }

    @Override
    public void onAddedExecutionPlan(JobID jobId) {
        this.runIfStateIs(AbstractDispatcherLeaderProcess.State.RUNNING, () -> this.handleAddedExecutionPlan(jobId));
    }

    private void handleAddedExecutionPlan(JobID jobId) {
        this.log.debug("Job {} has been added to the {} by another process.", (Object)jobId, (Object)this.executionPlanStore.getClass().getSimpleName());
        this.onGoingRecoveryOperation = ((CompletableFuture)((CompletableFuture)this.onGoingRecoveryOperation.thenApplyAsync(ignored -> this.recoverJobIfRunning(jobId), this.ioExecutor)).thenCompose(optionalExecutionPlan -> optionalExecutionPlan.flatMap(this::submitAddedJobIfRunning).orElse(FutureUtils.completedVoidFuture()))).handle(this::onErrorIfRunning);
    }

    private Optional<CompletableFuture<Void>> submitAddedJobIfRunning(ExecutionPlan executionPlan) {
        return this.supplyIfRunning(() -> this.submitAddedJob(executionPlan));
    }

    private CompletableFuture<Void> submitAddedJob(ExecutionPlan executionPlan) {
        DispatcherGateway dispatcherGateway = this.getDispatcherGatewayInternal();
        return ((CompletableFuture)dispatcherGateway.submitJob(executionPlan, RpcUtils.INF_TIMEOUT).thenApply(FunctionUtils.nullFn())).exceptionally(this::filterOutDuplicateJobSubmissionException);
    }

    private Void filterOutDuplicateJobSubmissionException(Throwable throwable) {
        Throwable strippedException = ExceptionUtils.stripCompletionException(throwable);
        if (strippedException instanceof DuplicateJobSubmissionException) {
            DuplicateJobSubmissionException duplicateJobSubmissionException = (DuplicateJobSubmissionException)strippedException;
            this.log.debug("Ignore recovered job {} because the job is currently being executed.", (Object)duplicateJobSubmissionException.getJobID(), (Object)duplicateJobSubmissionException);
            return null;
        }
        throw new CompletionException(throwable);
    }

    private DispatcherGateway getDispatcherGatewayInternal() {
        return Preconditions.checkNotNull(this.getDispatcherGateway().getNow(null));
    }

    private Optional<ExecutionPlan> recoverJobIfRunning(JobID jobId) {
        return this.supplyUnsynchronizedIfRunning(() -> this.tryRecoverJob(jobId)).flatMap(x -> x);
    }

    @Override
    public void onRemovedExecutionPlan(JobID jobId) {
        this.runIfStateIs(AbstractDispatcherLeaderProcess.State.RUNNING, () -> this.handleRemovedExecutionPlan(jobId));
    }

    private void handleRemovedExecutionPlan(JobID jobId) {
        this.log.debug("Job {} has been removed from the {} by another process.", (Object)jobId, (Object)this.executionPlanStore.getClass().getSimpleName());
        this.onGoingRecoveryOperation = ((CompletableFuture)this.onGoingRecoveryOperation.thenCompose(ignored -> this.removeExecutionPlanIfRunning(jobId).orElse(FutureUtils.completedVoidFuture()))).handle(this::onErrorIfRunning);
    }

    private Optional<CompletableFuture<Void>> removeExecutionPlanIfRunning(JobID jobId) {
        return this.supplyIfRunning(() -> this.removeExecutionPlan(jobId));
    }

    private CompletableFuture<Void> removeExecutionPlan(JobID jobId) {
        return this.getDispatcherService().map(dispatcherService -> dispatcherService.onRemovedExecutionPlan(jobId)).orElseGet(FutureUtils::completedVoidFuture);
    }

    public static SessionDispatcherLeaderProcess create(UUID leaderSessionId, AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherFactory, ExecutionPlanStore executionPlanStore, JobResultStore jobResultStore, Executor ioExecutor, FatalErrorHandler fatalErrorHandler) {
        return new SessionDispatcherLeaderProcess(leaderSessionId, dispatcherFactory, executionPlanStore, jobResultStore, ioExecutor, fatalErrorHandler);
    }
}

