/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherException;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public abstract class Dispatcher
extends FencedRpcEndpoint<DispatcherId>
implements DispatcherGateway,
LeaderContender {
    public static final String DISPATCHER_NAME = "dispatcher";
    private final Configuration configuration;
    private final SubmittedJobGraphStore submittedJobGraphStore;
    private final RunningJobsRegistry runningJobsRegistry;
    private final HighAvailabilityServices highAvailabilityServices;
    private final ResourceManagerGateway resourceManagerGateway;
    private final JobManagerServices jobManagerServices;
    private final HeartbeatServices heartbeatServices;
    private final MetricRegistry metricRegistry;
    private final FatalErrorHandler fatalErrorHandler;
    private final Map<JobID, JobManagerRunner> jobManagerRunners;
    private final LeaderElectionService leaderElectionService;
    private final CompletableFuture<String> restAddressFuture;

    protected Dispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, Optional<String> restAddress) throws Exception {
        super(rpcService, endpointId);
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityServices);
        this.resourceManagerGateway = (ResourceManagerGateway)Preconditions.checkNotNull((Object)resourceManagerGateway);
        this.jobManagerServices = JobManagerServices.fromConfiguration(configuration, (BlobServer)Preconditions.checkNotNull((Object)blobServer));
        this.heartbeatServices = (HeartbeatServices)Preconditions.checkNotNull((Object)heartbeatServices);
        this.metricRegistry = (MetricRegistry)Preconditions.checkNotNull((Object)metricRegistry);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
        this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
        this.jobManagerRunners = new HashMap<JobID, JobManagerRunner>(16);
        this.leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
        this.restAddressFuture = restAddress.map(CompletableFuture::completedFuture).orElse(FutureUtils.completedExceptionally((Throwable)((Object)new DispatcherException("The Dispatcher has not been started with a REST endpoint."))));
    }

    @Override
    public void postStop() throws Exception {
        Throwable exception = null;
        this.clearState();
        try {
            this.jobManagerServices.shutdown();
        }
        catch (Throwable t) {
            exception = ExceptionUtils.firstOrSuppressed((Throwable)t, exception);
        }
        try {
            this.submittedJobGraphStore.stop();
        }
        catch (Exception e) {
            exception = ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            this.leaderElectionService.stop();
        }
        catch (Exception e) {
            exception = ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            super.postStop();
        }
        catch (Exception e) {
            exception = ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        if (exception != null) {
            throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
        }
    }

    @Override
    public void start() throws Exception {
        super.start();
        this.leaderElectionService.start(this);
    }

    @Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
        RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
        JobID jobId = jobGraph.getJobID();
        this.log.info("Submitting job {} ({}).", (Object)jobGraph.getJobID(), (Object)jobGraph.getName());
        try {
            jobSchedulingStatus = this.runningJobsRegistry.getJobSchedulingStatus(jobId);
        }
        catch (IOException e) {
            this.log.warn("Cannot retrieve job status for {}.", (Object)jobId, (Object)e);
            return FutureUtils.completedExceptionally((Throwable)((Object)new JobSubmissionException(jobId, "Could not retrieve the job status.", e)));
        }
        if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
            JobManagerRunner jobManagerRunner;
            try {
                this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
            }
            catch (Exception e) {
                this.log.warn("Cannot persist JobGraph.", (Throwable)e);
                return FutureUtils.completedExceptionally((Throwable)((Object)new JobSubmissionException(jobId, "Could not persist JobGraph.", e)));
            }
            try {
                jobManagerRunner = this.createJobManagerRunner(ResourceID.generate(), jobGraph, this.configuration, this.getRpcService(), this.highAvailabilityServices, this.heartbeatServices, this.jobManagerServices, this.metricRegistry, new DispatcherOnCompleteActions(jobGraph.getJobID()), this.fatalErrorHandler);
                jobManagerRunner.start();
            }
            catch (Exception e) {
                try {
                    this.submittedJobGraphStore.removeJobGraph(jobId);
                }
                catch (Throwable t) {
                    this.log.warn("Cannot remove job graph from submitted job graph store.", t);
                    e.addSuppressed(t);
                }
                return FutureUtils.completedExceptionally((Throwable)((Object)new JobSubmissionException(jobId, "Could not start JobManager.", e)));
            }
            this.jobManagerRunners.put(jobId, jobManagerRunner);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        return FutureUtils.completedExceptionally((Throwable)((Object)new JobSubmissionException(jobId, "Job has already been submitted and is currently in state " + (Object)((Object)jobSchedulingStatus) + '.')));
    }

    @Override
    public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
        return CompletableFuture.completedFuture(this.jobManagerRunners.keySet());
    }

    @Override
    public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
        JobManagerRunner jobManagerRunner = this.jobManagerRunners.get(jobId);
        if (jobManagerRunner == null) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
        }
        return jobManagerRunner.getJobManagerGateway().cancel(timeout);
    }

    @Override
    public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) {
        JobManagerRunner jobManagerRunner = this.jobManagerRunners.get(jobId);
        if (jobManagerRunner == null) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
        }
        return jobManagerRunner.getJobManagerGateway().stop(timeout);
    }

    @Override
    public CompletableFuture<String> requestRestAddress(Time timeout) {
        return this.restAddressFuture;
    }

    @Override
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
        CompletableFuture<ResourceOverview> taskManagerOverviewFuture = this.resourceManagerGateway.requestResourceOverview(timeout);
        ArrayList<CompletableFuture<JobStatus>> jobStatus = new ArrayList<CompletableFuture<JobStatus>>(this.jobManagerRunners.size());
        for (Map.Entry<JobID, JobManagerRunner> jobManagerRunnerEntry : this.jobManagerRunners.entrySet()) {
            CompletableFuture<JobStatus> jobStatusFuture = jobManagerRunnerEntry.getValue().getJobManagerGateway().requestJobStatus(timeout);
            jobStatus.add(jobStatusFuture);
        }
        FutureUtils.ConjunctFuture allJobsFuture = FutureUtils.combineAll(jobStatus);
        return allJobsFuture.thenCombine(taskManagerOverviewFuture, (allJobsStatus, resourceOverview) -> {
            int numberRunningOrPendingJobs = 0;
            int numberFinishedJobs = 0;
            int numberCancelledJobs = 0;
            int numberFailedJobs = 0;
            block5: for (JobStatus status : allJobsStatus) {
                switch (status) {
                    case FINISHED: {
                        ++numberFinishedJobs;
                        continue block5;
                    }
                    case FAILED: {
                        ++numberFailedJobs;
                        continue block5;
                    }
                    case CANCELED: {
                        ++numberCancelledJobs;
                        continue block5;
                    }
                }
                ++numberRunningOrPendingJobs;
            }
            return new ClusterOverview(resourceOverview.getNumberTaskManagers(), resourceOverview.getNumberRegisteredSlots(), resourceOverview.getNumberFreeSlots(), numberRunningOrPendingJobs, numberFinishedJobs, numberCancelledJobs, numberFailedJobs);
        });
    }

    @Override
    public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) {
        int numberJobsRunning = this.jobManagerRunners.size();
        ArrayList<CompletableFuture<JobDetails>> individualJobDetails = new ArrayList<CompletableFuture<JobDetails>>(numberJobsRunning);
        for (JobManagerRunner jobManagerRunner : this.jobManagerRunners.values()) {
            individualJobDetails.add(jobManagerRunner.getJobManagerGateway().requestJobDetails(timeout));
        }
        FutureUtils.ConjunctFuture combinedJobDetails = FutureUtils.combineAll(individualJobDetails);
        return combinedJobDetails.thenApply(jobDetails -> new MultipleJobsDetails((Collection<JobDetails>)jobDetails, null));
    }

    @Override
    public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
        JobManagerRunner jobManagerRunner = this.jobManagerRunners.get(jobId);
        if (jobManagerRunner == null) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
        }
        return jobManagerRunner.getJobManagerGateway().requestArchivedExecutionGraph(timeout);
    }

    @Override
    public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
        String metricQueryServicePath = this.metricRegistry.getMetricQueryServicePath();
        if (metricQueryServicePath != null) {
            return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
        }
        return CompletableFuture.completedFuture(Collections.emptyList());
    }

    @Override
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
        return this.resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout);
    }

    @Override
    public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
        return CompletableFuture.completedFuture(this.jobManagerServices.blobServer.getPort());
    }

    private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
        JobManagerRunner jobManagerRunner = this.jobManagerRunners.remove(jobId);
        if (jobManagerRunner != null) {
            jobManagerRunner.shutdown();
        }
        if (cleanupHA) {
            this.submittedJobGraphStore.removeJobGraph(jobId);
        }
    }

    private void clearState() throws Exception {
        Exception exception = null;
        for (JobManagerRunner jobManagerRunner : this.jobManagerRunners.values()) {
            try {
                jobManagerRunner.shutdown();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
            }
        }
        this.jobManagerRunners.clear();
        if (exception != null) {
            throw exception;
        }
    }

    private void recoverJobs() {
        this.log.info("Recovering all persisted jobs.");
        this.getRpcService().execute(() -> {
            Collection<JobID> jobIds;
            try {
                jobIds = this.submittedJobGraphStore.getJobIds();
            }
            catch (Exception e) {
                this.log.error("Could not recover job ids from the submitted job graph store. Aborting recovery.", (Throwable)e);
                return;
            }
            for (JobID jobId : jobIds) {
                try {
                    SubmittedJobGraph submittedJobGraph = this.submittedJobGraphStore.recoverJobGraph(jobId);
                    this.runAsync(() -> this.submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT));
                }
                catch (Exception e) {
                    this.log.error("Could not recover the job graph for " + jobId + '.', (Throwable)e);
                }
            }
        });
    }

    private void onFatalError(Throwable throwable) {
        this.log.error("Fatal error occurred in dispatcher {}.", (Object)this.getAddress(), (Object)throwable);
        this.fatalErrorHandler.onFatalError(throwable);
    }

    protected abstract JobManagerRunner createJobManagerRunner(ResourceID var1, JobGraph var2, Configuration var3, RpcService var4, HighAvailabilityServices var5, HeartbeatServices var6, JobManagerServices var7, MetricRegistry var8, OnCompletionActions var9, FatalErrorHandler var10) throws Exception;

    @Override
    public void grantLeadership(UUID newLeaderSessionID) {
        this.runAsyncWithoutFencing(() -> {
            DispatcherId dispatcherId = new DispatcherId(newLeaderSessionID);
            this.log.info("Dispatcher {} was granted leadership with fencing token {}", (Object)this.getAddress(), (Object)dispatcherId);
            if (this.getFencingToken() != null) {
                try {
                    this.clearState();
                }
                catch (Exception e) {
                    this.log.warn("Could not properly clear the Dispatcher state while granting leadership.", (Throwable)e);
                }
            }
            this.setFencingToken(dispatcherId);
            this.getRpcService().execute(() -> this.leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
            this.recoverJobs();
        });
    }

    @Override
    public void revokeLeadership() {
        this.runAsyncWithoutFencing(() -> {
            this.log.info("Dispatcher {} was revoked leadership.", (Object)this.getAddress());
            try {
                this.clearState();
            }
            catch (Exception e) {
                this.log.warn("Could not properly clear the Dispatcher state while revoking leadership.", (Throwable)e);
            }
            this.setFencingToken(null);
        });
    }

    @Override
    public void handleError(Exception exception) {
        this.onFatalError((Throwable)((Object)new DispatcherException("Received an error from the LeaderElectionService.", exception)));
    }

    private class DispatcherOnCompleteActions
    implements OnCompletionActions {
        private final JobID jobId;

        private DispatcherOnCompleteActions(JobID jobId) {
            this.jobId = (JobID)Preconditions.checkNotNull((Object)jobId);
        }

        @Override
        public void jobFinished(JobExecutionResult result) {
            Dispatcher.this.log.info("Job {} finished.", (Object)this.jobId);
            Dispatcher.this.runAsync(() -> {
                try {
                    Dispatcher.this.removeJob(this.jobId, true);
                }
                catch (Exception e) {
                    Dispatcher.this.log.warn("Could not properly remove job {} from the dispatcher.", (Object)this.jobId, (Object)e);
                }
            });
        }

        @Override
        public void jobFailed(Throwable cause) {
            Dispatcher.this.log.info("Job {} failed.", (Object)this.jobId);
            Dispatcher.this.runAsync(() -> {
                try {
                    Dispatcher.this.removeJob(this.jobId, true);
                }
                catch (Exception e) {
                    Dispatcher.this.log.warn("Could not properly remove job {} from the dispatcher.", (Object)this.jobId, (Object)e);
                }
            });
        }

        @Override
        public void jobFinishedByOther() {
            Dispatcher.this.log.info("Job {} was finished by other JobManager.", (Object)this.jobId);
            Dispatcher.this.runAsync(() -> {
                try {
                    Dispatcher.this.removeJob(this.jobId, false);
                }
                catch (Exception e) {
                    Dispatcher.this.log.warn("Could not properly remove job {} from the dispatcher.", (Object)this.jobId, (Object)e);
                }
            });
        }
    }
}

