/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.dispatcher.JobCancellationFailedException;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobmaster.JobAlreadyDoneException;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterServiceProcess;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobMasterServiceLeadershipRunner
implements JobManagerRunner,
LeaderContender {
    private static final Logger LOG = LoggerFactory.getLogger(JobMasterServiceLeadershipRunner.class);
    private final Object lock = new Object();
    private final JobMasterServiceProcessFactory jobMasterServiceProcessFactory;
    private final LeaderElection leaderElection;
    private final JobResultStore jobResultStore;
    private final LibraryCacheManager.ClassLoaderLease classLoaderLease;
    private final FatalErrorHandler fatalErrorHandler;
    private final CompletableFuture<Void> terminationFuture = new CompletableFuture();
    private final CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture();
    @GuardedBy(value="lock")
    private State state = State.RUNNING;
    @GuardedBy(value="lock")
    private CompletableFuture<Void> sequentialOperation = FutureUtils.completedVoidFuture();
    @GuardedBy(value="lock")
    private JobMasterServiceProcess jobMasterServiceProcess = JobMasterServiceProcess.waitingForLeadership();
    @GuardedBy(value="lock")
    private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = new CompletableFuture();
    @GuardedBy(value="lock")
    private boolean hasCurrentLeaderBeenCancelled = false;

    public JobMasterServiceLeadershipRunner(JobMasterServiceProcessFactory jobMasterServiceProcessFactory, LeaderElection leaderElection, JobResultStore jobResultStore, LibraryCacheManager.ClassLoaderLease classLoaderLease, FatalErrorHandler fatalErrorHandler) {
        this.jobMasterServiceProcessFactory = jobMasterServiceProcessFactory;
        this.leaderElection = leaderElection;
        this.jobResultStore = jobResultStore;
        this.classLoaderLease = classLoaderLease;
        this.fatalErrorHandler = fatalErrorHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture processTerminationFuture;
        Object object = this.lock;
        synchronized (object) {
            if (this.state == State.STOPPED) {
                return this.terminationFuture;
            }
            this.state = State.STOPPED;
            LOG.debug("Terminating the leadership runner for job {}.", (Object)this.getJobID());
            this.jobMasterGatewayFuture.completeExceptionally(new FlinkException("JobMasterServiceLeadershipRunner is closed. Therefore, the corresponding JobMaster will never acquire the leadership."));
            this.resultFuture.complete(JobManagerRunnerResult.forSuccess(this.createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED)));
            processTerminationFuture = this.jobMasterServiceProcess.closeAsync();
        }
        CompletableFuture serviceTerminationFuture = FutureUtils.runAfterwards((CompletableFuture)processTerminationFuture, () -> {
            this.classLoaderLease.release();
            this.leaderElection.close();
        });
        FutureUtils.forward((CompletableFuture)serviceTerminationFuture, this.terminationFuture);
        this.terminationFuture.whenComplete((unused, throwable) -> LOG.debug("Leadership runner for job {} has been terminated.", (Object)this.getJobID()));
        return this.terminationFuture;
    }

    @Override
    public void start() throws Exception {
        LOG.debug("Start leadership runner for job {}.", (Object)this.getJobID());
        this.leaderElection.startLeaderElection(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
        Object object = this.lock;
        synchronized (object) {
            return this.jobMasterGatewayFuture;
        }
    }

    @Override
    public CompletableFuture<JobManagerRunnerResult> getResultFuture() {
        return this.resultFuture;
    }

    @Override
    public JobID getJobID() {
        return this.jobMasterServiceProcessFactory.getJobId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Acknowledge> cancel(Duration timeout) {
        Object object = this.lock;
        synchronized (object) {
            this.hasCurrentLeaderBeenCancelled = true;
            return ((CompletableFuture)this.getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout))).exceptionally(e -> {
                throw new CompletionException((Throwable)((Object)new JobCancellationFailedException("Cancellation failed.", ExceptionUtils.stripCompletionException((Throwable)e))));
            });
        }
    }

    @Override
    public CompletableFuture<JobStatus> requestJobStatus(Duration timeout) {
        return this.requestJob(timeout).thenApply(executionGraphInfo -> executionGraphInfo.getArchivedExecutionGraph().getState());
    }

    @Override
    public CompletableFuture<JobDetails> requestJobDetails(Duration timeout) {
        return this.requestJob(timeout).thenApply(executionGraphInfo -> JobDetails.createDetailsForJob(executionGraphInfo.getArchivedExecutionGraph()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<ExecutionGraphInfo> requestJob(Duration timeout) {
        Object object = this.lock;
        synchronized (object) {
            if (this.state == State.RUNNING) {
                if (this.jobMasterServiceProcess.isInitializedAndRunning()) {
                    return this.getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(timeout));
                }
                return CompletableFuture.completedFuture(this.createExecutionGraphInfoWithJobStatus(this.hasCurrentLeaderBeenCancelled ? JobStatus.CANCELLING : JobStatus.INITIALIZING));
            }
            return this.resultFuture.thenApply(JobManagerRunnerResult::getExecutionGraphInfo);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isInitialized() {
        Object object = this.lock;
        synchronized (object) {
            return this.jobMasterServiceProcess.isInitializedAndRunning();
        }
    }

    @Override
    public void grantLeadership(UUID leaderSessionID) {
        this.runIfStateRunning(() -> this.startJobMasterServiceProcessAsync(leaderSessionID), "starting a new JobMasterServiceProcess");
    }

    @GuardedBy(value="lock")
    private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
        this.sequentialOperation = this.sequentialOperation.thenCompose(unused -> this.jobResultStore.hasJobResultEntryAsync(this.getJobID()).thenCompose(hasJobResult -> {
            if (hasJobResult.booleanValue()) {
                return this.handleJobAlreadyDoneIfValidLeader(leaderSessionId);
            }
            return this.createNewJobMasterServiceProcessIfValidLeader(leaderSessionId);
        }));
        this.handleAsyncOperationError(this.sequentialOperation, "Could not start the job manager.");
    }

    private CompletableFuture<Void> handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
        return this.runIfValidLeader(leaderSessionId, () -> this.jobAlreadyDone(leaderSessionId), "check completed job");
    }

    private CompletableFuture<Void> createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId) {
        return this.runIfValidLeader(leaderSessionId, () -> ThrowingRunnable.unchecked(() -> this.createNewJobMasterServiceProcess(leaderSessionId)).run(), "create new job master service process");
    }

    private void printLogIfNotValidLeader(String actionDescription, UUID leaderSessionId) {
        LOG.debug("Ignore leader action '{}' because the leadership runner is no longer the valid leader for {}.", (Object)actionDescription, (Object)leaderSessionId);
    }

    private ExecutionGraphInfo createExecutionGraphInfoWithJobStatus(JobStatus jobStatus) {
        return new ExecutionGraphInfo(this.jobMasterServiceProcessFactory.createArchivedExecutionGraph(jobStatus, null));
    }

    private void jobAlreadyDone(UUID leaderSessionId) {
        LOG.info("{} for job {} was granted leadership with leader id {}, but job was already done.", new Object[]{this.getClass().getSimpleName(), this.getJobID(), leaderSessionId});
        this.resultFuture.complete(JobManagerRunnerResult.forSuccess(new ExecutionGraphInfo(this.jobMasterServiceProcessFactory.createArchivedExecutionGraph(JobStatus.FAILED, (Throwable)((Object)new JobAlreadyDoneException(this.getJobID()))))));
    }

    @GuardedBy(value="lock")
    private void createNewJobMasterServiceProcess(UUID leaderSessionId) {
        Preconditions.checkState((boolean)this.jobMasterServiceProcess.closeAsync().isDone());
        LOG.info("{} for job {} was granted leadership with leader id {}. Creating new {}.", new Object[]{this.getClass().getSimpleName(), this.getJobID(), leaderSessionId, JobMasterServiceProcess.class.getSimpleName()});
        this.jobMasterServiceProcess = this.jobMasterServiceProcessFactory.create(leaderSessionId);
        this.forwardIfValidLeader(leaderSessionId, this.jobMasterServiceProcess.getJobMasterGatewayFuture(), this.jobMasterGatewayFuture, "JobMasterGatewayFuture from JobMasterServiceProcess");
        this.forwardResultFuture(leaderSessionId, this.jobMasterServiceProcess.getResultFuture());
        this.confirmLeadership(leaderSessionId, this.jobMasterServiceProcess.getLeaderAddressFuture());
    }

    private void confirmLeadership(UUID leaderSessionId, CompletableFuture<String> leaderAddressFuture) {
        FutureUtils.assertNoException((CompletableFuture)leaderAddressFuture.thenCompose(address -> this.callIfRunning(() -> {
            LOG.debug("Confirm leadership {}.", (Object)leaderSessionId);
            return this.leaderElection.confirmLeadershipAsync(leaderSessionId, (String)address);
        }, "confirming leadership").orElse(FutureUtils.completedVoidFuture())));
    }

    private void forwardResultFuture(UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult> resultFuture) {
        resultFuture.whenComplete((jobManagerRunnerResult, throwable) -> this.runIfValidLeader(leaderSessionId, () -> this.onJobCompletion((JobManagerRunnerResult)jobManagerRunnerResult, (Throwable)throwable), "result future forwarding"));
    }

    @GuardedBy(value="lock")
    private void onJobCompletion(JobManagerRunnerResult jobManagerRunnerResult, Throwable throwable) {
        this.state = State.JOB_COMPLETED;
        LOG.debug("Completing the result for job {}.", (Object)this.getJobID());
        if (throwable != null) {
            this.resultFuture.completeExceptionally(throwable);
            this.jobMasterGatewayFuture.completeExceptionally(new FlinkException("Could not retrieve JobMasterGateway because the JobMaster failed.", throwable));
        } else {
            if (!jobManagerRunnerResult.isSuccess()) {
                this.jobMasterGatewayFuture.completeExceptionally(new FlinkException("Could not retrieve JobMasterGateway because the JobMaster initialization failed.", jobManagerRunnerResult.getInitializationFailure()));
            }
            this.resultFuture.complete(jobManagerRunnerResult);
        }
    }

    @Override
    public void revokeLeadership() {
        this.runIfStateRunning(this::stopJobMasterServiceProcessAsync, "revoke leadership from JobMasterServiceProcess");
    }

    @GuardedBy(value="lock")
    private void stopJobMasterServiceProcessAsync() {
        this.sequentialOperation = this.sequentialOperation.thenCompose(ignored -> this.callIfRunning(this::stopJobMasterServiceProcess, "stop leading JobMasterServiceProcess").orElse(FutureUtils.completedVoidFuture()));
        this.handleAsyncOperationError(this.sequentialOperation, "Could not suspend the job manager.");
    }

    @GuardedBy(value="lock")
    private CompletableFuture<Void> stopJobMasterServiceProcess() {
        LOG.info("{} for job {} was revoked leadership with leader id {}. Stopping current {}.", new Object[]{this.getClass().getSimpleName(), this.getJobID(), this.jobMasterServiceProcess.getLeaderSessionId(), JobMasterServiceProcess.class.getSimpleName()});
        this.jobMasterGatewayFuture.completeExceptionally(new FlinkException("Cannot obtain JobMasterGateway because the JobMaster lost leadership."));
        this.jobMasterGatewayFuture = new CompletableFuture();
        this.hasCurrentLeaderBeenCancelled = false;
        return this.jobMasterServiceProcess.closeAsync();
    }

    @Override
    public void handleError(Exception exception) {
        this.fatalErrorHandler.onFatalError((Throwable)exception);
    }

    private void handleAsyncOperationError(CompletableFuture<Void> operation, String message) {
        operation.whenComplete((unused, throwable) -> {
            if (throwable != null) {
                this.runIfStateRunning(() -> this.handleJobMasterServiceLeadershipRunnerError(new FlinkException(message, throwable)), "handle JobMasterServiceLeadershipRunner error");
            }
        });
    }

    private void handleJobMasterServiceLeadershipRunnerError(Throwable cause) {
        if (ExceptionUtils.isJvmFatalError((Throwable)cause)) {
            this.fatalErrorHandler.onFatalError(cause);
        } else {
            this.resultFuture.completeExceptionally(cause);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runIfStateRunning(Runnable action, String actionDescription) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning()) {
                action.run();
            } else {
                LOG.debug("Ignore '{}' because the leadership runner is no longer running.", (Object)actionDescription);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> Optional<T> callIfRunning(Supplier<? extends T> supplier, String supplierDescription) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning()) {
                return Optional.of(supplier.get());
            }
            LOG.debug("Ignore '{}' because the leadership runner is no longer running.", (Object)supplierDescription);
            return Optional.empty();
        }
    }

    @GuardedBy(value="lock")
    private boolean isRunning() {
        return this.state == State.RUNNING;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> runIfValidLeader(UUID expectedLeaderId, Runnable action, Runnable noLeaderFallback) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning() && this.leaderElection != null) {
                return this.leaderElection.hasLeadershipAsync(expectedLeaderId).thenAccept(hasLeadership -> {
                    Object object = this.lock;
                    synchronized (object) {
                        if (this.isRunning() && hasLeadership.booleanValue()) {
                            action.run();
                        } else {
                            noLeaderFallback.run();
                        }
                    }
                });
            }
            noLeaderFallback.run();
            return FutureUtils.completedVoidFuture();
        }
    }

    private CompletableFuture<Void> runIfValidLeader(UUID expectedLeaderId, Runnable action, String noLeaderFallbackCommandDescription) {
        return this.runIfValidLeader(expectedLeaderId, action, () -> this.printLogIfNotValidLeader(noLeaderFallbackCommandDescription, expectedLeaderId));
    }

    private <T> void forwardIfValidLeader(UUID expectedLeaderId, CompletableFuture<? extends T> source, CompletableFuture<T> target, String forwardDescription) {
        source.whenComplete((t, throwable) -> this.runIfValidLeader(expectedLeaderId, () -> {
            if (throwable != null) {
                target.completeExceptionally((Throwable)throwable);
            } else {
                target.complete(t);
            }
        }, forwardDescription));
    }

    static enum State {
        RUNNING,
        STOPPED,
        JOB_COMPLETED;

    }
}

