/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.util.UUID;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.ComponentMetricGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobManagerRunner
implements LeaderContender,
OnCompletionActions,
FatalErrorHandler {
    private static final Logger log = LoggerFactory.getLogger(JobManagerRunner.class);
    private final Object lock = new Object();
    private final JobGraph jobGraph;
    private final OnCompletionActions toNotifyOnComplete;
    private final FatalErrorHandler errorHandler;
    private final RunningJobsRegistry runningJobsRegistry;
    private final LeaderElectionService leaderElectionService;
    private final JobManagerServices jobManagerServices;
    private final JobMaster jobManager;
    private final JobManagerMetricGroup jobManagerMetricGroup;
    private volatile boolean shutdown;

    public JobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, OnCompletionActions toNotifyOnComplete, FatalErrorHandler errorHandler) throws Exception {
        this(resourceId, jobGraph, configuration, rpcService, haServices, heartbeatServices, new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)), toNotifyOnComplete, errorHandler);
    }

    public JobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, OnCompletionActions toNotifyOnComplete, FatalErrorHandler errorHandler) throws Exception {
        this(resourceId, jobGraph, configuration, rpcService, haServices, heartbeatServices, JobManagerServices.fromConfiguration(configuration, haServices), metricRegistry, toNotifyOnComplete, errorHandler);
    }

    public JobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions toNotifyOnComplete, FatalErrorHandler errorHandler) throws Exception {
        ComponentMetricGroup jobManagerMetrics = null;
        try {
            this.jobGraph = (JobGraph)Preconditions.checkNotNull((Object)jobGraph);
            this.toNotifyOnComplete = (OnCompletionActions)Preconditions.checkNotNull((Object)toNotifyOnComplete);
            this.errorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)errorHandler);
            this.jobManagerServices = (JobManagerServices)Preconditions.checkNotNull((Object)jobManagerServices);
            Preconditions.checkArgument((jobGraph.getNumberOfVertices() > 0 ? 1 : 0) != 0, (Object)"The given job is empty");
            String hostAddress = rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress();
            jobManagerMetrics = new JobManagerMetricGroup(metricRegistry, hostAddress);
            this.jobManagerMetricGroup = jobManagerMetrics;
            BlobLibraryCacheManager libraryCacheManager = jobManagerServices.libraryCacheManager;
            try {
                libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
            }
            catch (IOException e) {
                throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
            }
            ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
            if (userCodeLoader == null) {
                throw new Exception("The user code class loader could not be initialized.");
            }
            this.runningJobsRegistry = haServices.getRunningJobsRegistry();
            this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
            this.jobManager = new JobMaster(rpcService, resourceId, jobGraph, configuration, haServices, heartbeatServices, jobManagerServices.executorService, jobManagerServices.libraryCacheManager, jobManagerServices.restartStrategyFactory, jobManagerServices.rpcAskTimeout, (JobManagerMetricGroup)jobManagerMetrics, this, this, userCodeLoader);
        }
        catch (Throwable t) {
            try {
                jobManagerServices.shutdown();
            }
            catch (Throwable tt) {
                log.error("Error while shutting down JobManager services", tt);
            }
            if (jobManagerMetrics != null) {
                jobManagerMetrics.close();
            }
            throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", t);
        }
    }

    public void start() throws Exception {
        try {
            this.leaderElectionService.start(this);
        }
        catch (Exception e) {
            log.error("Could not start the JobManager because the leader election service did not start.", (Throwable)e);
            throw new Exception("Could not start the leader election service.", e);
        }
    }

    public void shutdown() {
        this.shutdownInternally();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdownInternally() {
        Object object = this.lock;
        synchronized (object) {
            this.shutdown = true;
            if (this.leaderElectionService != null) {
                try {
                    this.leaderElectionService.stop();
                }
                catch (Throwable t) {
                    log.error("Could not properly shutdown the leader election service", t);
                }
            }
            try {
                this.jobManager.shutDown();
            }
            catch (Throwable t) {
                log.error("Error shutting down JobManager", t);
            }
            try {
                this.jobManagerServices.shutdown();
            }
            catch (Throwable t) {
                log.error("Error shutting down JobManager services", t);
            }
            try {
                this.jobManagerMetricGroup.close();
            }
            catch (Throwable t) {
                log.error("Error while unregistering metrics", t);
            }
        }
    }

    @Override
    public void jobFinished(JobExecutionResult result) {
        try {
            this.unregisterJobFromHighAvailability();
            this.shutdownInternally();
        }
        finally {
            if (this.toNotifyOnComplete != null) {
                this.toNotifyOnComplete.jobFinished(result);
            }
        }
    }

    @Override
    public void jobFailed(Throwable cause) {
        try {
            this.unregisterJobFromHighAvailability();
            this.shutdownInternally();
        }
        finally {
            if (this.toNotifyOnComplete != null) {
                this.toNotifyOnComplete.jobFailed(cause);
            }
        }
    }

    @Override
    public void jobFinishedByOther() {
        try {
            this.shutdownInternally();
        }
        finally {
            if (this.toNotifyOnComplete != null) {
                this.toNotifyOnComplete.jobFinishedByOther();
            }
        }
    }

    @Override
    public void onFatalError(Throwable exception) {
        try {
            log.error("JobManager runner encountered a fatal error.", exception);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        try {
            if (this.errorHandler != null) {
                this.errorHandler.onFatalError(exception);
            }
        }
        finally {
            this.shutdownInternally();
        }
    }

    private void unregisterJobFromHighAvailability() {
        try {
            this.runningJobsRegistry.setJobFinished(this.jobGraph.getJobID());
        }
        catch (Throwable t) {
            log.error("Could not un-register from high-availability services job {} ({}).Other JobManager's may attempt to recover it and re-execute it.", new Object[]{this.jobGraph.getName(), this.jobGraph.getJobID(), t});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void grantLeadership(UUID leaderSessionID) {
        Object object = this.lock;
        synchronized (object) {
            RunningJobsRegistry.JobSchedulingStatus schedulingStatus;
            if (this.shutdown) {
                log.info("JobManagerRunner already shutdown.");
                return;
            }
            log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.", new Object[]{this.jobGraph.getName(), this.jobGraph.getJobID(), leaderSessionID, this.getAddress()});
            this.leaderElectionService.confirmLeaderSessionID(leaderSessionID);
            try {
                schedulingStatus = this.runningJobsRegistry.getJobSchedulingStatus(this.jobGraph.getJobID());
            }
            catch (Throwable t) {
                log.error("Could not access status (running/finished) of job {}. ", (Object)this.jobGraph.getJobID(), (Object)t);
                this.onFatalError(t);
                return;
            }
            if (schedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {
                log.info("Granted leader ship but job {} has been finished. ", (Object)this.jobGraph.getJobID());
                this.jobFinishedByOther();
                return;
            }
            if (this.leaderElectionService.hasLeadership()) {
                try {
                    if (schedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
                        this.runningJobsRegistry.setJobRunning(this.jobGraph.getJobID());
                    }
                    this.jobManager.start(leaderSessionID);
                }
                catch (Exception e) {
                    this.onFatalError(new Exception("Could not start the job manager.", e));
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void revokeLeadership() {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                log.info("JobManagerRunner already shutdown.");
                return;
            }
            log.info("JobManager for job {} ({}) was revoked leadership at {}.", new Object[]{this.jobGraph.getName(), this.jobGraph.getJobID(), this.getAddress()});
            ((JobMasterGateway)this.jobManager.getSelf()).suspendExecution(new Exception("JobManager is no longer the leader."));
        }
    }

    @Override
    public String getAddress() {
        return this.jobManager.getAddress();
    }

    @Override
    public void handleError(Exception exception) {
        log.error("Leader Election Service encountered a fatal error.", (Throwable)exception);
        this.onFatalError(exception);
    }

    @VisibleForTesting
    boolean isShutdown() {
        return this.shutdown;
    }
}

