/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrapFactory;
import org.apache.flink.runtime.dispatcher.DispatcherCachedOperationsHandler;
import org.apache.flink.runtime.dispatcher.DispatcherException;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.OnMainThreadJobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.dispatcher.cleanup.CleanupRunnerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleaner;
import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.OperationResult;
import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.ThrowingConsumer;

public abstract class Dispatcher
extends FencedRpcEndpoint<DispatcherId>
implements DispatcherGateway {
    @VisibleForTesting
    @Internal
    public static final ConfigOption<Duration> CLIENT_ALIVENESS_CHECK_DURATION = ConfigOptions.key((String)"$internal.dispatcher.client-aliveness-check.interval").durationType().defaultValue((Object)Duration.ofMinutes(1L));
    public static final String DISPATCHER_NAME = "dispatcher";
    private static final int INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY = 16;
    private final Configuration configuration;
    private final JobGraphWriter jobGraphWriter;
    private final JobResultStore jobResultStore;
    private final HighAvailabilityServices highAvailabilityServices;
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
    private final JobManagerSharedServices jobManagerSharedServices;
    private final HeartbeatServices heartbeatServices;
    private final BlobServer blobServer;
    private final FatalErrorHandler fatalErrorHandler;
    private final Collection<FailureEnricher> failureEnrichers;
    private final OnMainThreadJobManagerRunnerRegistry jobManagerRunnerRegistry;
    private final Collection<JobGraph> recoveredJobs;
    private final Collection<JobResult> recoveredDirtyJobs;
    private final DispatcherBootstrapFactory dispatcherBootstrapFactory;
    private final ExecutionGraphInfoStore executionGraphInfoStore;
    private final JobManagerRunnerFactory jobManagerRunnerFactory;
    private final CleanupRunnerFactory cleanupRunnerFactory;
    private final JobManagerMetricGroup jobManagerMetricGroup;
    private final HistoryServerArchivist historyServerArchivist;
    private final Executor ioExecutor;
    @Nullable
    private final String metricServiceQueryAddress;
    private final Map<JobID, CompletableFuture<Void>> jobManagerRunnerTerminationFutures;
    private final Set<JobID> submittedAndWaitingTerminationJobIDs;
    protected final CompletableFuture<ApplicationStatus> shutDownFuture;
    private DispatcherBootstrap dispatcherBootstrap;
    private final DispatcherCachedOperationsHandler dispatcherCachedOperationsHandler;
    private final ResourceCleaner localResourceCleaner;
    private final ResourceCleaner globalResourceCleaner;
    private final Duration webTimeout;
    private final Map<JobID, Long> jobClientExpiredTimestamp = new HashMap<JobID, Long>();
    private final Map<JobID, Long> uninitializedJobClientHeartbeatTimeout = new HashMap<JobID, Long>();
    private final long jobClientAlivenessCheckInterval;
    private ScheduledFuture<?> jobClientAlivenessCheck;
    private final Set<JobID> pendingJobResourceRequirementsUpdates = new HashSet<JobID>();

    public Dispatcher(RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, Collection<JobResult> recoveredDirtyJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception {
        this(rpcService, fencingToken, recoveredJobs, recoveredDirtyJobs, dispatcherBootstrapFactory, dispatcherServices, new DefaultJobManagerRunnerRegistry(16));
    }

    private Dispatcher(RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, Collection<JobResult> recoveredDirtyJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices, JobManagerRunnerRegistry jobManagerRunnerRegistry) throws Exception {
        this(rpcService, fencingToken, recoveredJobs, recoveredDirtyJobs, dispatcherBootstrapFactory, dispatcherServices, jobManagerRunnerRegistry, new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices));
    }

    @VisibleForTesting
    protected Dispatcher(RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, Collection<JobResult> recoveredDirtyJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices, JobManagerRunnerRegistry jobManagerRunnerRegistry, ResourceCleanerFactory resourceCleanerFactory) throws Exception {
        super(rpcService, RpcServiceUtils.createRandomName((String)DISPATCHER_NAME), (Serializable)((Object)fencingToken));
        Dispatcher.assertRecoveredJobsAndDirtyJobResults(recoveredJobs, recoveredDirtyJobs);
        this.configuration = dispatcherServices.getConfiguration();
        this.highAvailabilityServices = dispatcherServices.getHighAvailabilityServices();
        this.resourceManagerGatewayRetriever = dispatcherServices.getResourceManagerGatewayRetriever();
        this.heartbeatServices = dispatcherServices.getHeartbeatServices();
        this.blobServer = dispatcherServices.getBlobServer();
        this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler();
        this.failureEnrichers = dispatcherServices.getFailureEnrichers();
        this.jobGraphWriter = dispatcherServices.getJobGraphWriter();
        this.jobResultStore = dispatcherServices.getJobResultStore();
        this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup();
        this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress();
        this.ioExecutor = dispatcherServices.getIoExecutor();
        this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(this.configuration, this.blobServer, this.fatalErrorHandler);
        this.jobManagerRunnerRegistry = new OnMainThreadJobManagerRunnerRegistry(jobManagerRunnerRegistry, (ComponentMainThreadExecutor)this.getMainThreadExecutor());
        this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
        this.executionGraphInfoStore = dispatcherServices.getArchivedExecutionGraphStore();
        this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
        this.cleanupRunnerFactory = dispatcherServices.getCleanupRunnerFactory();
        this.jobManagerRunnerTerminationFutures = CollectionUtil.newHashMapWithExpectedSize((int)16);
        this.submittedAndWaitingTerminationJobIDs = new HashSet<JobID>();
        this.shutDownFuture = new CompletableFuture();
        this.dispatcherBootstrapFactory = (DispatcherBootstrapFactory)Preconditions.checkNotNull((Object)dispatcherBootstrapFactory);
        this.recoveredJobs = new HashSet<JobGraph>(recoveredJobs);
        this.recoveredDirtyJobs = new HashSet<JobResult>(recoveredDirtyJobs);
        this.blobServer.retainJobs(recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toSet()), dispatcherServices.getIoExecutor());
        this.dispatcherCachedOperationsHandler = new DispatcherCachedOperationsHandler(dispatcherServices.getOperationCaches(), this::triggerCheckpointAndGetCheckpointID, this::triggerSavepointAndGetLocation, this::stopWithSavepointAndGetLocation);
        this.localResourceCleaner = resourceCleanerFactory.createLocalResourceCleaner((ComponentMainThreadExecutor)this.getMainThreadExecutor());
        this.globalResourceCleaner = resourceCleanerFactory.createGlobalResourceCleaner((ComponentMainThreadExecutor)this.getMainThreadExecutor());
        this.webTimeout = (Duration)this.configuration.get(WebOptions.TIMEOUT);
        this.jobClientAlivenessCheckInterval = ((Duration)this.configuration.get(CLIENT_ALIVENESS_CHECK_DURATION)).toMillis();
    }

    public CompletableFuture<ApplicationStatus> getShutDownFuture() {
        return this.shutDownFuture;
    }

    public void onStart() throws Exception {
        try {
            this.startDispatcherServices();
        }
        catch (Throwable t) {
            DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", this.getAddress()), t);
            this.onFatalError((Throwable)((Object)exception));
            throw exception;
        }
        this.startCleanupRetries();
        this.startRecoveredJobs();
        this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create((DispatcherGateway)this.getSelfGateway(DispatcherGateway.class), this.getRpcService().getScheduledExecutor(), this::onFatalError);
    }

    private void startDispatcherServices() throws Exception {
        try {
            this.registerDispatcherMetrics(this.jobManagerMetricGroup);
        }
        catch (Exception e) {
            this.handleStartDispatcherServicesException(e);
        }
    }

    private static void assertRecoveredJobsAndDirtyJobResults(Collection<JobGraph> recoveredJobs, Collection<JobResult> recoveredDirtyJobResults) {
        Set jobIdsOfFinishedJobs = recoveredDirtyJobResults.stream().map(JobResult::getJobId).collect(Collectors.toSet());
        boolean noRecoveredJobGraphHasDirtyJobResult = recoveredJobs.stream().noneMatch(recoveredJobGraph -> jobIdsOfFinishedJobs.contains(recoveredJobGraph.getJobID()));
        Preconditions.checkArgument((boolean)noRecoveredJobGraphHasDirtyJobResult, (Object)"There should be no overlap between the recovered JobGraphs and the passed dirty JobResults based on their job ID.");
    }

    private void startRecoveredJobs() {
        for (JobGraph recoveredJob : this.recoveredJobs) {
            this.runRecoveredJob(recoveredJob);
        }
        this.recoveredJobs.clear();
    }

    private void runRecoveredJob(JobGraph recoveredJob) {
        Preconditions.checkNotNull((Object)recoveredJob);
        this.initJobClientExpiredTime(recoveredJob);
        try (MdcUtils.MdcCloseable ignored = MdcUtils.withContext((Map)MdcUtils.asContextData((JobID)recoveredJob.getJobID()));){
            this.runJob(this.createJobMasterRunner(recoveredJob), ExecutionType.RECOVERY);
        }
        catch (Throwable throwable) {
            this.onFatalError((Throwable)((Object)new DispatcherException(String.format("Could not start recovered job %s.", recoveredJob.getJobID()), throwable)));
        }
    }

    private void initJobClientExpiredTime(JobGraph jobGraph) {
        JobID jobID = jobGraph.getJobID();
        long initialClientHeartbeatTimeout = jobGraph.getInitialClientHeartbeatTimeout();
        if (initialClientHeartbeatTimeout > 0L) {
            this.log.info("Begin to detect the client's aliveness for job {}. The heartbeat timeout is {}", (Object)jobID, (Object)initialClientHeartbeatTimeout);
            this.uninitializedJobClientHeartbeatTimeout.put(jobID, initialClientHeartbeatTimeout);
            if (this.jobClientAlivenessCheck == null) {
                this.jobClientAlivenessCheck = this.getRpcService().getScheduledExecutor().scheduleWithFixedDelay(() -> this.getMainThreadExecutor(jobID).execute(this::checkJobClientAliveness), 0L, this.jobClientAlivenessCheckInterval, TimeUnit.MILLISECONDS);
            }
        }
    }

    private void startCleanupRetries() {
        this.recoveredDirtyJobs.forEach(this::runCleanupRetry);
        this.recoveredDirtyJobs.clear();
    }

    private void runCleanupRetry(JobResult jobResult) {
        Preconditions.checkNotNull((Object)jobResult);
        try {
            this.runJob(this.createJobCleanupRunner(jobResult), ExecutionType.RECOVERY);
        }
        catch (Throwable throwable) {
            this.onFatalError((Throwable)((Object)new DispatcherException(String.format("Could not start cleanup retry for job %s.", jobResult.getJobId()), throwable)));
        }
    }

    private void handleStartDispatcherServicesException(Exception e) throws Exception {
        try {
            this.stopDispatcherServices();
        }
        catch (Exception exception) {
            e.addSuppressed(exception);
        }
        throw e;
    }

    public CompletableFuture<Void> onStop() {
        this.log.info("Stopping dispatcher {}.", (Object)this.getAddress());
        if (this.jobClientAlivenessCheck != null) {
            this.jobClientAlivenessCheck.cancel(false);
            this.jobClientAlivenessCheck = null;
        }
        CompletableFuture<Void> allJobsTerminationFuture = this.terminateRunningJobsAndGetTerminationFuture();
        return FutureUtils.runAfterwards(allJobsTerminationFuture, () -> {
            this.dispatcherBootstrap.stop();
            this.stopDispatcherServices();
            this.log.info("Stopped dispatcher {}.", (Object)this.getAddress());
        });
    }

    private void stopDispatcherServices() throws Exception {
        Exception exception = null;
        try {
            this.jobManagerSharedServices.shutdown();
        }
        catch (Exception e) {
            exception = e;
        }
        this.jobManagerMetricGroup.close();
        ExceptionUtils.tryRethrowException((Exception)exception);
    }

    @Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Duration timeout) {
        JobID jobID = jobGraph.getJobID();
        try (MdcUtils.MdcCloseable ignored = MdcUtils.withContext((Map)MdcUtils.asContextData((JobID)jobID));){
            this.log.info("Received JobGraph submission '{}' ({}).", (Object)jobGraph.getName(), (Object)jobID);
        }
        return this.isInGloballyTerminalState(jobID).thenComposeAsync(isTerminated -> {
            if (isTerminated.booleanValue()) {
                this.log.warn("Ignoring JobGraph submission '{}' ({}) because the job already reached a globally-terminal state (i.e. {}) in a previous execution.", new Object[]{jobGraph.getName(), jobID, Arrays.stream(JobStatus.values()).filter(JobStatus::isGloballyTerminalState).map(Enum::name).collect(Collectors.joining(", "))});
                return FutureUtils.completedExceptionally((Throwable)((Object)DuplicateJobSubmissionException.ofGloballyTerminated(jobID)));
            }
            if (this.jobManagerRunnerRegistry.isRegistered(jobID) || this.submittedAndWaitingTerminationJobIDs.contains(jobID)) {
                return FutureUtils.completedExceptionally((Throwable)((Object)DuplicateJobSubmissionException.of(jobID)));
            }
            if (this.isPartialResourceConfigured(jobGraph)) {
                return FutureUtils.completedExceptionally((Throwable)((Object)new JobSubmissionException(jobID, "Currently jobs is not supported if parts of the vertices have resources configured. The limitation will be removed in future versions.")));
            }
            return this.internalSubmitJob(jobGraph);
        }, this.getMainThreadExecutor(jobID));
    }

    @Override
    public CompletableFuture<Acknowledge> submitFailedJob(JobID jobId, String jobName, Throwable exception) {
        ArchivedExecutionGraph archivedExecutionGraph = ArchivedExecutionGraph.createSparseArchivedExecutionGraph(jobId, jobName, JobStatus.FAILED, null, exception, null, System.currentTimeMillis());
        ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo(archivedExecutionGraph);
        this.writeToExecutionGraphInfoStore(executionGraphInfo);
        return this.archiveExecutionGraphToHistoryServer(executionGraphInfo);
    }

    private CompletableFuture<Boolean> isInGloballyTerminalState(JobID jobId) {
        return this.jobResultStore.hasJobResultEntryAsync(jobId);
    }

    private boolean isPartialResourceConfigured(JobGraph jobGraph) {
        boolean hasVerticesWithUnknownResource = false;
        boolean hasVerticesWithConfiguredResource = false;
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getMinResources() == ResourceSpec.UNKNOWN) {
                hasVerticesWithUnknownResource = true;
            } else {
                hasVerticesWithConfiguredResource = true;
            }
            if (!hasVerticesWithUnknownResource || !hasVerticesWithConfiguredResource) continue;
            return true;
        }
        return false;
    }

    private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
        this.applyParallelismOverrides(jobGraph);
        this.log.info("Submitting job '{}' ({}).", (Object)jobGraph.getName(), (Object)jobGraph.getJobID());
        this.submittedAndWaitingTerminationJobIDs.add(jobGraph.getJobID());
        return ((CompletableFuture)((CompletableFuture)this.waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob).handle((ignored, throwable) -> this.handleTermination(jobGraph.getJobID(), (Throwable)throwable))).thenCompose(Function.identity())).whenComplete((ignored, throwable) -> this.submittedAndWaitingTerminationJobIDs.remove(jobGraph.getJobID()));
    }

    private CompletableFuture<Acknowledge> handleTermination(JobID jobId, @Nullable Throwable terminationThrowable) {
        if (terminationThrowable != null) {
            return this.globalResourceCleaner.cleanupAsync(jobId).handleAsync((ignored, cleanupThrowable) -> {
                if (cleanupThrowable != null) {
                    this.log.warn("Cleanup didn't succeed after job submission failed for job {}.", (Object)jobId, cleanupThrowable);
                    terminationThrowable.addSuppressed((Throwable)cleanupThrowable);
                }
                ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(terminationThrowable);
                Throwable strippedThrowable = ExceptionUtils.stripCompletionException((Throwable)terminationThrowable);
                this.log.error("Failed to submit job {}.", (Object)jobId, (Object)strippedThrowable);
                throw new CompletionException((Throwable)((Object)new JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable)));
            }, this.getMainThreadExecutor(jobId));
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    private void persistAndRunJob(JobGraph jobGraph) throws Exception {
        this.jobGraphWriter.putJobGraph(jobGraph);
        this.initJobClientExpiredTime(jobGraph);
        this.runJob(this.createJobMasterRunner(jobGraph), ExecutionType.SUBMISSION);
    }

    private JobManagerRunner createJobMasterRunner(JobGraph jobGraph) throws Exception {
        Preconditions.checkState((!this.jobManagerRunnerRegistry.isRegistered(jobGraph.getJobID()) ? 1 : 0) != 0);
        return this.jobManagerRunnerFactory.createJobManagerRunner(jobGraph, this.configuration, this.getRpcService(), this.highAvailabilityServices, this.heartbeatServices, this.jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(this.jobManagerMetricGroup), this.fatalErrorHandler, this.failureEnrichers, System.currentTimeMillis());
    }

    private JobManagerRunner createJobCleanupRunner(JobResult dirtyJobResult) throws Exception {
        Preconditions.checkState((!this.jobManagerRunnerRegistry.isRegistered(dirtyJobResult.getJobId()) ? 1 : 0) != 0);
        return this.cleanupRunnerFactory.create(dirtyJobResult, this.highAvailabilityServices.getCheckpointRecoveryFactory(), this.configuration, this.getIoExecutor(dirtyJobResult.getJobId()));
    }

    private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionType) throws Exception {
        jobManagerRunner.start();
        this.jobManagerRunnerRegistry.register(jobManagerRunner);
        JobID jobId = jobManagerRunner.getJobID();
        CompletionStage cleanupJobStateFuture = ((CompletableFuture)jobManagerRunner.getResultFuture().handleAsync((jobManagerRunnerResult, throwable) -> {
            Preconditions.checkState((this.jobManagerRunnerRegistry.isRegistered(jobId) && this.jobManagerRunnerRegistry.get(jobId) == jobManagerRunner ? 1 : 0) != 0, (Object)"The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner.");
            if (jobManagerRunnerResult != null) {
                return this.handleJobManagerRunnerResult((JobManagerRunnerResult)jobManagerRunnerResult, executionType);
            }
            return CompletableFuture.completedFuture(this.jobManagerRunnerFailed(jobId, JobStatus.FAILED, (Throwable)throwable));
        }, this.getMainThreadExecutor(jobId))).thenCompose(Function.identity());
        CompletionStage jobTerminationFuture = ((CompletableFuture)cleanupJobStateFuture).thenCompose(cleanupJobState -> this.removeJob(jobId, (CleanupJobState)cleanupJobState).exceptionally(throwable -> this.logCleanupErrorWarning(jobId, (Throwable)throwable)));
        FutureUtils.handleUncaughtException((CompletableFuture)jobTerminationFuture, (thread, throwable) -> this.fatalErrorHandler.onFatalError(throwable));
        this.registerJobManagerRunnerTerminationFuture(jobId, (CompletableFuture<Void>)jobTerminationFuture);
    }

    @Nullable
    private Void logCleanupErrorWarning(JobID jobId, Throwable cleanupError) {
        this.log.warn("The cleanup of job {} failed. The job's artifacts in the different directories ('{}', '{}', '{}') and its JobResultStore entry in '{}' (in HA mode) should be checked for manual cleanup.", new Object[]{jobId, this.configuration.get(HighAvailabilityOptions.HA_STORAGE_PATH), this.configuration.get(BlobServerOptions.STORAGE_DIRECTORY), this.configuration.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY), this.configuration.get(JobResultStoreOptions.STORAGE_PATH), cleanupError});
        return null;
    }

    private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(JobManagerRunnerResult jobManagerRunnerResult, ExecutionType executionType) {
        if (jobManagerRunnerResult.isInitializationFailure() && executionType == ExecutionType.RECOVERY) {
            return CompletableFuture.completedFuture(this.jobManagerRunnerFailed(jobManagerRunnerResult.getExecutionGraphInfo().getJobId(), JobStatus.INITIALIZING, jobManagerRunnerResult.getInitializationFailure()));
        }
        return this.jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
    }

    private CleanupJobState jobManagerRunnerFailed(JobID jobId, JobStatus jobStatus, Throwable throwable) {
        this.jobMasterFailed(jobId, throwable);
        return CleanupJobState.localCleanup(jobStatus);
    }

    @Override
    public CompletableFuture<Collection<JobID>> listJobs(Duration timeout) {
        return CompletableFuture.completedFuture(Collections.unmodifiableSet(this.jobManagerRunnerRegistry.getRunningJobIds()));
    }

    @Override
    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Duration timeout) {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        return CompletableFuture.supplyAsync(() -> {
            this.log.info("Disposing savepoint {}.", (Object)savepointPath);
            try {
                Checkpoints.disposeSavepoint(savepointPath, this.configuration, classLoader, this.log);
            }
            catch (IOException | FlinkException e) {
                throw new CompletionException(new FlinkException(String.format("Could not dispose savepoint %s.", savepointPath), e));
            }
            return Acknowledge.get();
        }, this.jobManagerSharedServices.getIoExecutor());
    }

    @Override
    public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Duration timeout) {
        Optional<JobManagerRunner> maybeJob = this.getJobManagerRunner(jobId);
        if (maybeJob.isPresent()) {
            return maybeJob.get().cancel(timeout);
        }
        ExecutionGraphInfo executionGraphInfo = this.executionGraphInfoStore.get(jobId);
        if (executionGraphInfo != null) {
            JobStatus jobStatus = executionGraphInfo.getArchivedExecutionGraph().getState();
            if (jobStatus == JobStatus.CANCELED) {
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobTerminatedWithoutCancellationException(jobId, jobStatus)));
        }
        this.log.debug("Dispatcher is unable to cancel job {}: not found", (Object)jobId);
        return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
    }

    @Override
    public CompletableFuture<ClusterOverview> requestClusterOverview(Duration timeout) {
        CompletableFuture taskManagerOverviewFuture = this.runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));
        List optionalJobInformation = this.queryJobMastersForInformation(jobManagerRunner -> jobManagerRunner.requestJobStatus(timeout));
        FutureUtils.ConjunctFuture allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);
        CompletionStage allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
        JobsOverview completedJobsOverview = this.executionGraphInfoStore.getStoredJobsOverview();
        return ((CompletableFuture)allJobsFuture).thenCombine((CompletionStage)taskManagerOverviewFuture, (runningJobsStatus, resourceOverview) -> {
            JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
            return new ClusterOverview((ResourceOverview)resourceOverview, allJobsOverview);
        });
    }

    @Override
    public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Duration timeout) {
        List individualOptionalJobDetails = this.queryJobMastersForInformation(jobManagerRunner -> jobManagerRunner.requestJobDetails(timeout));
        FutureUtils.ConjunctFuture optionalCombinedJobDetails = FutureUtils.combineAll(individualOptionalJobDetails);
        CompletionStage combinedJobDetails = optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);
        Collection<JobDetails> completedJobDetails = this.executionGraphInfoStore.getAvailableJobDetails();
        return ((CompletableFuture)combinedJobDetails).thenApply(runningJobDetails -> {
            HashMap deduplicatedJobs = new HashMap();
            completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
            runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
            return new MultipleJobsDetails(new HashSet<JobDetails>(deduplicatedJobs.values()));
        });
    }

    @Override
    public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Duration timeout) {
        Optional<JobManagerRunner> maybeJob = this.getJobManagerRunner(jobId);
        return maybeJob.map(job -> job.requestJobStatus(timeout)).orElseGet(() -> {
            JobDetails jobDetails = this.executionGraphInfoStore.getAvailableJobDetails(jobId);
            if (jobDetails == null) {
                return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
            }
            return CompletableFuture.completedFuture(jobDetails.getStatus());
        });
    }

    @Override
    public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(JobID jobId, Duration timeout) {
        Optional<JobManagerRunner> maybeJob = this.getJobManagerRunner(jobId);
        return maybeJob.map(job -> job.requestJob(timeout)).orElse(FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)))).exceptionally(t -> this.getExecutionGraphInfoFromStore((Throwable)t, jobId));
    }

    private ExecutionGraphInfo getExecutionGraphInfoFromStore(Throwable t, JobID jobId) {
        ExecutionGraphInfo executionGraphInfo = this.executionGraphInfoStore.get(jobId);
        if (executionGraphInfo == null) {
            throw new CompletionException(ExceptionUtils.stripCompletionException((Throwable)t));
        }
        return executionGraphInfo;
    }

    @Override
    public CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(JobID jobId, Duration timeout) {
        return this.performOperationOnJobMasterGateway(jobId, gateway -> gateway.requestCheckpointStats(timeout)).exceptionally(t -> this.getExecutionGraphInfoFromStore((Throwable)t, jobId).getArchivedExecutionGraph().getCheckpointStatsSnapshot());
    }

    @Override
    public CompletableFuture<JobResult> requestJobResult(JobID jobId, Duration timeout) {
        if (!this.jobManagerRunnerRegistry.isRegistered(jobId)) {
            ExecutionGraphInfo executionGraphInfo = this.executionGraphInfoStore.get(jobId);
            if (executionGraphInfo == null) {
                return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
            }
            return CompletableFuture.completedFuture(JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
        }
        JobManagerRunner jobManagerRunner = this.jobManagerRunnerRegistry.get(jobId);
        return jobManagerRunner.getResultFuture().thenApply(jobManagerRunnerResult -> JobResult.createFrom(jobManagerRunnerResult.getExecutionGraphInfo().getArchivedExecutionGraph()));
    }

    @Override
    public CompletableFuture<Collection<String>> requestMetricQueryServiceAddresses(Duration timeout) {
        if (this.metricServiceQueryAddress != null) {
            return CompletableFuture.completedFuture(Collections.singleton(this.metricServiceQueryAddress));
        }
        return CompletableFuture.completedFuture(Collections.emptyList());
    }

    @Override
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServiceAddresses(Duration timeout) {
        return this.runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestTaskManagerMetricQueryServiceAddresses(timeout));
    }

    @Override
    public CompletableFuture<ThreadDumpInfo> requestThreadDump(Duration timeout) {
        int stackTraceMaxDepth = (Integer)this.configuration.get(ClusterOptions.THREAD_DUMP_STACKTRACE_MAX_DEPTH);
        return CompletableFuture.completedFuture(ThreadDumpInfo.dumpAndCreate(stackTraceMaxDepth));
    }

    @Override
    public CompletableFuture<Integer> getBlobServerPort(Duration timeout) {
        return CompletableFuture.completedFuture(this.blobServer.getPort());
    }

    @Override
    public CompletableFuture<String> triggerCheckpoint(JobID jobID, Duration timeout) {
        return this.performOperationOnJobMasterGateway(jobID, gateway -> gateway.triggerCheckpoint(timeout));
    }

    @Override
    public CompletableFuture<Acknowledge> triggerCheckpoint(AsynchronousJobOperationKey operationKey, CheckpointType checkpointType, Duration timeout) {
        return this.dispatcherCachedOperationsHandler.triggerCheckpoint(operationKey, checkpointType, timeout);
    }

    @Override
    public CompletableFuture<OperationResult<Long>> getTriggeredCheckpointStatus(AsynchronousJobOperationKey operationKey) {
        return this.dispatcherCachedOperationsHandler.getCheckpointStatus(operationKey);
    }

    @Override
    public CompletableFuture<Long> triggerCheckpointAndGetCheckpointID(JobID jobID, CheckpointType checkpointType, Duration timeout) {
        return this.performOperationOnJobMasterGateway(jobID, gateway -> gateway.triggerCheckpoint(checkpointType, timeout).thenApply(CompletedCheckpoint::getCheckpointID));
    }

    @Override
    public CompletableFuture<Acknowledge> triggerSavepoint(AsynchronousJobOperationKey operationKey, String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, Duration timeout) {
        return this.dispatcherCachedOperationsHandler.triggerSavepoint(operationKey, targetDirectory, formatType, savepointMode, timeout);
    }

    @Override
    public CompletableFuture<String> triggerSavepointAndGetLocation(JobID jobId, String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, Duration timeout) {
        return this.performOperationOnJobMasterGateway(jobId, gateway -> gateway.triggerSavepoint(targetDirectory, savepointMode.isTerminalMode(), formatType, timeout));
    }

    @Override
    public CompletableFuture<OperationResult<String>> getTriggeredSavepointStatus(AsynchronousJobOperationKey operationKey) {
        return this.dispatcherCachedOperationsHandler.getSavepointStatus(operationKey);
    }

    @Override
    public CompletableFuture<Acknowledge> stopWithSavepoint(AsynchronousJobOperationKey operationKey, String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, Duration timeout) {
        return this.dispatcherCachedOperationsHandler.stopWithSavepoint(operationKey, targetDirectory, formatType, savepointMode, timeout);
    }

    @Override
    public CompletableFuture<String> stopWithSavepointAndGetLocation(JobID jobId, String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, Duration timeout) {
        return this.performOperationOnJobMasterGateway(jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, formatType, savepointMode.isTerminalMode(), timeout));
    }

    @Override
    public CompletableFuture<Acknowledge> shutDownCluster() {
        return this.shutDownCluster(ApplicationStatus.SUCCEEDED);
    }

    @Override
    public CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applicationStatus) {
        this.shutDownFuture.complete(applicationStatus);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(JobID jobId, String operatorUid, SerializedValue<CoordinationRequest> serializedRequest, Duration timeout) {
        return this.performOperationOnJobMasterGateway(jobId, gateway -> gateway.deliverCoordinationRequestToCoordinator(StreamingJobGraphGenerator.generateOperatorID(operatorUid), serializedRequest, timeout));
    }

    @Override
    public CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, long expiredTimestamp, Duration timeout) {
        if (!this.getJobManagerRunner(jobId).isPresent()) {
            this.log.warn("Fail to find job {} for client.", (Object)jobId);
        } else {
            this.log.debug("Job {} receives client's heartbeat which expiredTimestamp is {}.", (Object)jobId, (Object)expiredTimestamp);
            this.jobClientExpiredTimestamp.put(jobId, expiredTimestamp);
        }
        return FutureUtils.completedVoidFuture();
    }

    private void checkJobClientAliveness() {
        this.setClientHeartbeatTimeoutForInitializedJob();
        long currentTimestamp = System.currentTimeMillis();
        Iterator<Map.Entry<JobID, Long>> iterator = this.jobClientExpiredTimestamp.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<JobID, Long> entry = iterator.next();
            JobID jobID = entry.getKey();
            long expiredTimestamp = entry.getValue();
            if (!this.getJobManagerRunner(jobID).isPresent()) {
                iterator.remove();
                continue;
            }
            if (expiredTimestamp > currentTimestamp) continue;
            this.log.warn("The heartbeat from the job client is timeout and cancel the job {}. You can adjust the heartbeat interval by 'client.heartbeat.interval' and the timeout by 'client.heartbeat.timeout'", (Object)jobID);
            this.cancelJob(jobID, this.webTimeout);
        }
    }

    @Override
    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
        return this.performOperationOnJobMasterGateway(jobId, JobMasterGateway::requestJobResourceRequirements);
    }

    @Override
    public CompletableFuture<Acknowledge> updateJobResourceRequirements(JobID jobId, JobResourceRequirements jobResourceRequirements) {
        if (!this.pendingJobResourceRequirementsUpdates.add(jobId)) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new RestHandlerException("Another update to the job [%s] resource requirements is in progress.", HttpResponseStatus.CONFLICT)));
        }
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.performOperationOnJobMasterGateway(jobId, gateway -> gateway.requestJob(this.webTimeout)).thenApply(job -> {
            HashMap<JobVertexID, Integer> maxParallelismPerJobVertex = new HashMap<JobVertexID, Integer>();
            for (ArchivedExecutionJobVertex vertex : job.getArchivedExecutionGraph().getVerticesTopologically()) {
                maxParallelismPerJobVertex.put(vertex.getJobVertexId(), vertex.getMaxParallelism());
            }
            return maxParallelismPerJobVertex;
        })).thenAccept(maxParallelismPerJobVertex -> Dispatcher.validateMaxParallelism(jobResourceRequirements, maxParallelismPerJobVertex))).thenRunAsync(() -> {
            try {
                this.jobGraphWriter.putJobResourceRequirements(jobId, jobResourceRequirements);
            }
            catch (Exception e) {
                throw new CompletionException((Throwable)((Object)new RestHandlerException("The resource requirements could not be persisted and have not been applied. Please retry.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e)));
            }
        }, this.getIoExecutor(jobId))).thenComposeAsync(ignored -> this.performOperationOnJobMasterGateway(jobId, jobMasterGateway -> jobMasterGateway.updateJobResourceRequirements(jobResourceRequirements)), this.getMainThreadExecutor(jobId))).whenComplete((ack, error) -> {
            if (error != null) {
                this.log.debug("Failed to update requirements for job {}.", (Object)jobId, error);
            }
            this.pendingJobResourceRequirementsUpdates.remove(jobId);
        });
    }

    private static void validateMaxParallelism(JobResourceRequirements jobResourceRequirements, Map<JobVertexID, Integer> maxParallelismPerJobVertex) {
        List<String> validationErrors = JobResourceRequirements.validate(jobResourceRequirements, maxParallelismPerJobVertex);
        if (!validationErrors.isEmpty()) {
            throw new CompletionException((Throwable)((Object)new RestHandlerException(validationErrors.stream().collect(Collectors.joining(System.lineSeparator())), HttpResponseStatus.BAD_REQUEST)));
        }
    }

    private void setClientHeartbeatTimeoutForInitializedJob() {
        Iterator<Map.Entry<JobID, Long>> iterator = this.uninitializedJobClientHeartbeatTimeout.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<JobID, Long> entry = iterator.next();
            JobID jobID = entry.getKey();
            Optional<JobManagerRunner> jobManagerRunnerOptional = this.getJobManagerRunner(jobID);
            if (!jobManagerRunnerOptional.isPresent()) {
                iterator.remove();
                continue;
            }
            if (!jobManagerRunnerOptional.get().isInitialized()) continue;
            this.jobClientExpiredTimestamp.put(jobID, System.currentTimeMillis() + entry.getValue());
            iterator.remove();
        }
    }

    private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
        Preconditions.checkState((!this.jobManagerRunnerTerminationFutures.containsKey(jobId) ? 1 : 0) != 0);
        this.jobManagerRunnerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture);
        jobManagerRunnerTerminationFuture.thenRunAsync(() -> {
            CompletableFuture<Void> terminationFuture = this.jobManagerRunnerTerminationFutures.remove(jobId);
            if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) {
                this.jobManagerRunnerTerminationFutures.put(jobId, terminationFuture);
            }
        }, this.getMainThreadExecutor(jobId));
    }

    private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJobState) {
        if (cleanupJobState.isGlobalCleanup()) {
            return ((CompletableFuture)((CompletableFuture)this.globalResourceCleaner.cleanupAsync(jobId).thenCompose(unused -> this.jobResultStore.markResultAsCleanAsync(jobId))).handle((unusedVoid, e) -> {
                if (e == null) {
                    this.log.debug("Cleanup for the job '{}' has finished. Job has been marked as clean.", (Object)jobId);
                } else {
                    this.log.warn("Could not properly mark job {} result as clean.", (Object)jobId, e);
                }
                return null;
            })).thenRunAsync(() -> this.runPostJobGloballyTerminated(jobId, cleanupJobState.getJobStatus()), this.getMainThreadExecutor(jobId));
        }
        return this.localResourceCleaner.cleanupAsync(jobId);
    }

    protected void runPostJobGloballyTerminated(JobID jobId, JobStatus jobStatus) {
    }

    private void terminateRunningJobs() {
        this.log.info("Stopping all currently running jobs of dispatcher {}.", (Object)this.getAddress());
        Set<JobID> jobsToRemove = this.jobManagerRunnerRegistry.getRunningJobIds();
        for (JobID jobId : jobsToRemove) {
            this.terminateJob(jobId);
        }
    }

    private void terminateJob(JobID jobId) {
        if (this.jobManagerRunnerRegistry.isRegistered(jobId)) {
            JobManagerRunner jobManagerRunner = this.jobManagerRunnerRegistry.get(jobId);
            jobManagerRunner.closeAsync();
        }
    }

    private CompletableFuture<Void> terminateRunningJobsAndGetTerminationFuture() {
        this.terminateRunningJobs();
        Collection<CompletableFuture<Void>> values = this.jobManagerRunnerTerminationFutures.values();
        return FutureUtils.completeAll(values);
    }

    protected void onFatalError(Throwable throwable) {
        this.fatalErrorHandler.onFatalError(throwable);
    }

    @VisibleForTesting
    protected CompletableFuture<CleanupJobState> jobReachedTerminalState(ExecutionGraphInfo executionGraphInfo) {
        boolean isFailureInfoRelatedToJobTermination;
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        JobStatus terminalJobStatus = archivedExecutionGraph.getState();
        Preconditions.checkArgument((boolean)terminalJobStatus.isTerminalState(), (String)"Job %s is in state %s which is not terminal.", (Object[])new Object[]{archivedExecutionGraph.getJobID(), terminalJobStatus});
        boolean bl = isFailureInfoRelatedToJobTermination = terminalJobStatus == JobStatus.SUSPENDED || terminalJobStatus == JobStatus.FAILED;
        if (archivedExecutionGraph.getFailureInfo() != null && isFailureInfoRelatedToJobTermination) {
            this.log.info("Job {} reached terminal state {}.\n{}", new Object[]{archivedExecutionGraph.getJobID(), terminalJobStatus, archivedExecutionGraph.getFailureInfo().getExceptionAsString().trim()});
        } else {
            this.log.info("Job {} reached terminal state {}.", (Object)archivedExecutionGraph.getJobID(), (Object)terminalJobStatus);
        }
        this.writeToExecutionGraphInfoStore(executionGraphInfo);
        if (!terminalJobStatus.isGloballyTerminalState()) {
            return CompletableFuture.completedFuture(CleanupJobState.localCleanup(terminalJobStatus));
        }
        CompletableFuture<Acknowledge> archiveFuture = this.archiveExecutionGraphToHistoryServer(executionGraphInfo);
        return archiveFuture.thenCompose(ignored -> this.registerGloballyTerminatedJobInJobResultStore(executionGraphInfo));
    }

    private CompletableFuture<CleanupJobState> registerGloballyTerminatedJobInJobResultStore(ExecutionGraphInfo executionGraphInfo) {
        JobID jobId = executionGraphInfo.getJobId();
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        JobStatus terminalJobStatus = archivedExecutionGraph.getState();
        Preconditions.checkArgument((boolean)terminalJobStatus.isGloballyTerminalState(), (String)"Job %s is in state %s which is not globally terminal.", (Object[])new Object[]{jobId, terminalJobStatus});
        return ((CompletableFuture)this.jobResultStore.hasCleanJobResultEntryAsync(jobId).thenCompose(hasCleanJobResultEntry -> this.createDirtyJobResultEntryIfMissingAsync(archivedExecutionGraph, (boolean)hasCleanJobResultEntry))).handleAsync((ignored, error) -> {
            if (error != null) {
                this.fatalErrorHandler.onFatalError((Throwable)new FlinkException(String.format("The job %s couldn't be marked as pre-cleanup finished in JobResultStore.", executionGraphInfo.getJobId()), error));
            }
            return CleanupJobState.globalCleanup(terminalJobStatus);
        }, this.getMainThreadExecutor(jobId));
    }

    private CompletableFuture<Void> createDirtyJobResultEntryIfMissingAsync(AccessExecutionGraph executionGraph, boolean hasCleanJobResultEntry) {
        JobID jobId = executionGraph.getJobID();
        if (hasCleanJobResultEntry) {
            this.log.warn("Job {} is already marked as clean but clean up was triggered again.", (Object)jobId);
            return FutureUtils.completedVoidFuture();
        }
        return this.jobResultStore.hasDirtyJobResultEntryAsync(jobId).thenCompose(hasDirtyJobResultEntry -> this.createDirtyJobResultEntryAsync(executionGraph, (boolean)hasDirtyJobResultEntry));
    }

    private CompletableFuture<Void> createDirtyJobResultEntryAsync(AccessExecutionGraph executionGraph, boolean hasDirtyJobResultEntry) {
        if (hasDirtyJobResultEntry) {
            return FutureUtils.completedVoidFuture();
        }
        return this.jobResultStore.createDirtyResultAsync(new JobResultEntry(JobResult.createFrom(executionGraph)));
    }

    private void writeToExecutionGraphInfoStore(ExecutionGraphInfo executionGraphInfo) {
        try {
            this.executionGraphInfoStore.put(executionGraphInfo);
        }
        catch (IOException e) {
            this.log.info("Could not store completed job {}({}).", new Object[]{executionGraphInfo.getArchivedExecutionGraph().getJobName(), executionGraphInfo.getArchivedExecutionGraph().getJobID(), e});
        }
    }

    private CompletableFuture<Acknowledge> archiveExecutionGraphToHistoryServer(ExecutionGraphInfo executionGraphInfo) {
        return this.historyServerArchivist.archiveExecutionGraph(executionGraphInfo).handleAsync((ignored, throwable) -> {
            if (throwable != null) {
                this.log.info("Could not archive completed job {}({}) to the history server.", new Object[]{executionGraphInfo.getArchivedExecutionGraph().getJobName(), executionGraphInfo.getArchivedExecutionGraph().getJobID(), throwable});
            }
            return Acknowledge.get();
        }, this.getMainThreadExecutor(executionGraphInfo.getArchivedExecutionGraph().getJobID()));
    }

    private void jobMasterFailed(JobID jobId, Throwable cause) {
        this.onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
    }

    private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobId) {
        if (!this.jobManagerRunnerRegistry.isRegistered(jobId)) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
        }
        JobManagerRunner job = this.jobManagerRunnerRegistry.get(jobId);
        if (!job.isInitialized()) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new UnavailableDispatcherOperationException("Unable to get JobMasterGateway for initializing job. The requested operation is not available while the JobManager is initializing.")));
        }
        return job.getJobMasterGateway();
    }

    private <T> CompletableFuture<T> performOperationOnJobMasterGateway(JobID jobId, Function<JobMasterGateway, CompletableFuture<T>> operation) {
        return this.getJobMasterGateway(jobId).thenCompose(operation);
    }

    private CompletableFuture<ResourceManagerGateway> getResourceManagerGateway() {
        return this.resourceManagerGatewayRetriever.getFuture();
    }

    private Optional<JobManagerRunner> getJobManagerRunner(JobID jobId) {
        return this.jobManagerRunnerRegistry.isRegistered(jobId) ? Optional.of(this.jobManagerRunnerRegistry.get(jobId)) : Optional.empty();
    }

    private <T> CompletableFuture<T> runResourceManagerCommand(Function<ResourceManagerGateway, CompletableFuture<T>> resourceManagerCommand) {
        return ((CompletableFuture)this.getResourceManagerGateway().thenApply(resourceManagerCommand)).thenCompose(Function.identity());
    }

    private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> optionalCollection) {
        return optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
    }

    @Nonnull
    private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobManagerRunner, CompletableFuture<T>> queryFunction) {
        ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<CompletableFuture<Optional<T>>>(this.jobManagerRunnerRegistry.size());
        for (JobManagerRunner job : this.jobManagerRunnerRegistry.getJobManagerRunners()) {
            CompletionStage queryResult = queryFunction.apply(job).handle((value, t) -> Optional.ofNullable(value));
            optionalJobInformation.add((CompletableFuture<Optional<T>>)queryResult);
        }
        return optionalJobInformation;
    }

    private CompletableFuture<Void> waitForTerminatingJob(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
        CompletionStage jobManagerTerminationFuture = this.getJobTerminationFuture(jobId).exceptionally(throwable -> {
            throw new CompletionException((Throwable)((Object)new DispatcherException(String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId), (Throwable)throwable)));
        });
        return FutureUtils.thenAcceptAsyncIfNotDone((CompletableFuture)jobManagerTerminationFuture, (Executor)this.getMainThreadExecutor(jobId), (Consumer)FunctionUtils.uncheckedConsumer(ignored -> {
            this.jobManagerRunnerTerminationFutures.remove(jobId);
            action.accept((Object)jobGraph);
        }));
    }

    @VisibleForTesting
    CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
        return this.jobManagerRunnerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
    }

    private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
        jobManagerMetricGroup.gauge("numRunningJobs", () -> (long)this.jobManagerRunnerRegistry.getWrappedDelegate().size());
    }

    public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
        return CompletableFuture.runAsync(() -> this.terminateJob(jobId), this.getMainThreadExecutor(jobId));
    }

    private void applyParallelismOverrides(JobGraph jobGraph) {
        HashMap overrides = new HashMap();
        overrides.putAll((Map)this.configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
        overrides.putAll((Map)jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
        for (JobVertex vertex : jobGraph.getVertices()) {
            String override = (String)overrides.get(vertex.getID().toHexString());
            if (override == null) continue;
            int currentParallelism = vertex.getParallelism();
            int overrideParallelism = Integer.parseInt(override);
            this.log.info("Changing job vertex {} parallelism from {} to {}", new Object[]{vertex.getID(), currentParallelism, overrideParallelism});
            vertex.setParallelism(overrideParallelism);
        }
    }

    private Executor getIoExecutor(JobID jobID) {
        return MdcUtils.scopeToJob((JobID)jobID, (Executor)this.ioExecutor);
    }

    private static class CleanupJobState {
        private final boolean globalCleanup;
        private final JobStatus jobStatus;

        public static CleanupJobState localCleanup(JobStatus jobStatus) {
            return new CleanupJobState(false, jobStatus);
        }

        public static CleanupJobState globalCleanup(JobStatus jobStatus) {
            return new CleanupJobState(true, jobStatus);
        }

        private CleanupJobState(boolean globalCleanup, JobStatus jobStatus) {
            this.globalCleanup = globalCleanup;
            this.jobStatus = jobStatus;
        }

        public boolean isGlobalCleanup() {
            return this.globalCleanup;
        }

        public JobStatus getJobStatus() {
            return this.jobStatus;
        }
    }

    protected static enum ExecutionType {
        SUBMISSION,
        RECOVERY;

    }
}

