/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.minicluster;

import akka.actor.ActorSystem;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.MiniClusterJobDispatcher;
import org.apache.flink.runtime.minicluster.OneTimeLeaderListenerFuture;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MiniCluster {
    private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
    private final Object lock = new Object();
    private final MiniClusterConfiguration miniClusterConfiguration;
    @GuardedBy(value="lock")
    private MetricRegistryImpl metricRegistry;
    @GuardedBy(value="lock")
    private RpcService commonRpcService;
    @GuardedBy(value="lock")
    private RpcService[] jobManagerRpcServices;
    @GuardedBy(value="lock")
    private RpcService[] taskManagerRpcServices;
    @GuardedBy(value="lock")
    private RpcService[] resourceManagerRpcServices;
    @GuardedBy(value="lock")
    private HighAvailabilityServices haServices;
    @GuardedBy(value="lock")
    private BlobServer blobServer;
    @GuardedBy(value="lock")
    private HeartbeatServices heartbeatServices;
    @GuardedBy(value="lock")
    private ResourceManagerRunner[] resourceManagerRunners;
    private volatile TaskExecutor[] taskManagers;
    @GuardedBy(value="lock")
    private MiniClusterJobDispatcher jobDispatcher;
    private volatile boolean running;

    public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
        this.miniClusterConfiguration = (MiniClusterConfiguration)Preconditions.checkNotNull((Object)miniClusterConfiguration, (String)"config may not be null");
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.running ? 1 : 0) != 0, (Object)"FlinkMiniCluster is already running");
            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", (Object)this.miniClusterConfiguration);
            UnmodifiableConfiguration configuration = this.miniClusterConfiguration.getConfiguration();
            Time rpcTimeout = this.miniClusterConfiguration.getRpcTimeout();
            int numJobManagers = this.miniClusterConfiguration.getNumJobManagers();
            int numTaskManagers = this.miniClusterConfiguration.getNumTaskManagers();
            int numResourceManagers = this.miniClusterConfiguration.getNumResourceManagers();
            boolean useSingleRpcService = this.miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED;
            try {
                LOG.info("Starting Metrics Registry");
                this.metricRegistry = this.createMetricRegistry((Configuration)configuration);
                RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
                RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
                RpcService[] resourceManagerRpcServices = new RpcService[numResourceManagers];
                LOG.info("Starting RPC Service(s)");
                this.commonRpcService = this.createRpcService((Configuration)configuration, rpcTimeout, false, null);
                ActorSystem actorSystem = ((AkkaRpcService)this.commonRpcService).getActorSystem();
                this.metricRegistry.startQueryService(actorSystem, null);
                if (useSingleRpcService) {
                    int i;
                    for (i = 0; i < numJobManagers; ++i) {
                        jobManagerRpcServices[i] = this.commonRpcService;
                    }
                    for (i = 0; i < numTaskManagers; ++i) {
                        taskManagerRpcServices[i] = this.commonRpcService;
                    }
                    for (i = 0; i < numResourceManagers; ++i) {
                        resourceManagerRpcServices[i] = this.commonRpcService;
                    }
                    this.resourceManagerRpcServices = null;
                    this.jobManagerRpcServices = null;
                    this.taskManagerRpcServices = null;
                } else {
                    int i;
                    String jobManagerBindAddress = this.miniClusterConfiguration.getJobManagerBindAddress();
                    String taskManagerBindAddress = this.miniClusterConfiguration.getTaskManagerBindAddress();
                    String resourceManagerBindAddress = this.miniClusterConfiguration.getResourceManagerBindAddress();
                    for (i = 0; i < numJobManagers; ++i) {
                        jobManagerRpcServices[i] = this.createRpcService((Configuration)configuration, rpcTimeout, true, jobManagerBindAddress);
                    }
                    for (i = 0; i < numTaskManagers; ++i) {
                        taskManagerRpcServices[i] = this.createRpcService((Configuration)configuration, rpcTimeout, true, taskManagerBindAddress);
                    }
                    for (i = 0; i < numResourceManagers; ++i) {
                        resourceManagerRpcServices[i] = this.createRpcService((Configuration)configuration, rpcTimeout, true, resourceManagerBindAddress);
                    }
                    this.jobManagerRpcServices = jobManagerRpcServices;
                    this.taskManagerRpcServices = taskManagerRpcServices;
                    this.resourceManagerRpcServices = resourceManagerRpcServices;
                }
                LOG.info("Starting high-availability services");
                this.haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices((Configuration)configuration, this.commonRpcService.getExecutor());
                this.blobServer = new BlobServer((Configuration)configuration, this.haServices.createBlobStore());
                this.blobServer.start();
                this.heartbeatServices = HeartbeatServices.fromConfiguration((Configuration)configuration);
                LOG.info("Starting {} ResourceManger(s)", (Object)numResourceManagers);
                this.resourceManagerRunners = this.startResourceManagers((Configuration)configuration, this.haServices, this.heartbeatServices, this.metricRegistry, numResourceManagers, resourceManagerRpcServices);
                LOG.info("Starting {} TaskManger(s)", (Object)numTaskManagers);
                this.taskManagers = this.startTaskManagers((Configuration)configuration, this.haServices, this.metricRegistry, numTaskManagers, taskManagerRpcServices);
                LOG.info("Starting job dispatcher(s) for {} JobManger(s)", (Object)numJobManagers);
                this.jobDispatcher = new MiniClusterJobDispatcher((Configuration)configuration, this.haServices, this.blobServer, this.heartbeatServices, this.metricRegistry, numJobManagers, jobManagerRpcServices);
            }
            catch (Exception e) {
                try {
                    this.shutdownInternally();
                }
                catch (Exception ee) {
                    e.addSuppressed(ee);
                }
                throw e;
            }
            this.running = true;
            LOG.info("Flink Mini Cluster started successfully");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                LOG.info("Shutting down Flink Mini Cluster");
                try {
                    this.shutdownInternally();
                }
                finally {
                    this.running = false;
                }
                LOG.info("Flink Mini Cluster is shut down");
            }
        }
    }

    @GuardedBy(value="lock")
    private void shutdownInternally() throws Exception {
        assert (Thread.holdsLock(this.lock));
        Throwable exception = null;
        if (this.jobDispatcher != null) {
            try {
                this.jobDispatcher.shutdown();
            }
            catch (Exception e) {
                exception = e;
            }
            this.jobDispatcher = null;
        }
        if (this.resourceManagerRunners != null) {
            for (ResourceManagerRunner rm : this.resourceManagerRunners) {
                if (rm == null) continue;
                try {
                    rm.shutDown();
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)exception);
                }
            }
            this.resourceManagerRunners = null;
        }
        if (this.taskManagers != null) {
            for (TaskExecutor tm : this.taskManagers) {
                if (tm == null) continue;
                try {
                    tm.shutDown();
                    tm.getTerminationFuture().get();
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)exception);
                }
            }
            this.taskManagers = null;
        }
        if (this.metricRegistry != null) {
            this.metricRegistry.shutdown();
            this.metricRegistry = null;
        }
        exception = MiniCluster.shutDownRpc(this.commonRpcService, exception);
        exception = MiniCluster.shutDownRpcs(this.jobManagerRpcServices, exception);
        exception = MiniCluster.shutDownRpcs(this.taskManagerRpcServices, exception);
        exception = MiniCluster.shutDownRpcs(this.resourceManagerRpcServices, exception);
        this.commonRpcService = null;
        this.jobManagerRpcServices = null;
        this.taskManagerRpcServices = null;
        this.resourceManagerRpcServices = null;
        if (this.blobServer != null) {
            try {
                this.blobServer.close();
            }
            catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
            this.blobServer = null;
        }
        if (this.haServices != null) {
            try {
                this.haServices.closeAndCleanupAllData();
            }
            catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
            this.haServices = null;
        }
        if (exception != null) {
            ExceptionUtils.rethrowException((Throwable)exception, (String)"Error while shutting down mini cluster");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitUntilTaskManagerRegistrationsComplete() throws Exception {
        LeaderRetrievalService rmMasterListener = null;
        try {
            int numTaskManagersAvailable;
            CompletableFuture<LeaderAddressAndId> addressAndIdFuture;
            Object object = this.lock;
            synchronized (object) {
                Preconditions.checkState((boolean)this.running, (Object)"FlinkMiniCluster is not running");
                OneTimeLeaderListenerFuture listenerFuture = new OneTimeLeaderListenerFuture();
                rmMasterListener = this.haServices.getResourceManagerLeaderRetriever();
                rmMasterListener.start(listenerFuture);
                addressAndIdFuture = listenerFuture.future();
            }
            LeaderAddressAndId addressAndId = addressAndIdFuture.get();
            ResourceManagerGateway resourceManager = this.commonRpcService.connect(addressAndId.leaderAddress(), new ResourceManagerId(addressAndId.leaderId()), ResourceManagerGateway.class).get();
            int numTaskManagersToWaitFor = this.taskManagers.length;
            while ((numTaskManagersAvailable = resourceManager.getNumberOfRegisteredTaskManagers().get().intValue()) < numTaskManagersToWaitFor) {
                Thread.sleep(2L);
            }
        }
        finally {
            try {
                if (rmMasterListener != null) {
                    rmMasterListener.stop();
                }
            }
            catch (Exception e) {
                LOG.warn("Error shutting down leader listener for ResourceManager");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runDetached(JobGraph job) throws JobExecutionException {
        Preconditions.checkNotNull((Object)job, (String)"job is null");
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.running, (Object)"mini cluster is not running");
            this.jobDispatcher.runDetached(job);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
        MiniClusterJobDispatcher dispatcher;
        Preconditions.checkNotNull((Object)job, (String)"job is null");
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.running, (Object)"mini cluster is not running");
            dispatcher = this.jobDispatcher;
        }
        return dispatcher.runJobBlocking(job);
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration config) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
    }

    protected RpcService createRpcService(Configuration configuration, Time askTimeout, boolean remoteEnabled, String bindAddress) {
        ActorSystem actorSystem = remoteEnabled ? AkkaUtils.createActorSystem(configuration, bindAddress, 0) : AkkaUtils.createLocalActorSystem(configuration);
        return new AkkaRpcService(actorSystem, askTimeout);
    }

    protected ResourceManagerRunner[] startResourceManagers(Configuration configuration, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, int numResourceManagers, RpcService[] resourceManagerRpcServices) throws Exception {
        ResourceManagerRunner[] resourceManagerRunners = new ResourceManagerRunner[numResourceManagers];
        for (int i = 0; i < numResourceManagers; ++i) {
            resourceManagerRunners[i] = new ResourceManagerRunner(ResourceID.generate(), "resourcemanager_" + i, configuration, resourceManagerRpcServices[i], haServices, heartbeatServices, metricRegistry);
            resourceManagerRunners[i].start();
        }
        return resourceManagerRunners;
    }

    protected TaskExecutor[] startTaskManagers(Configuration configuration, HighAvailabilityServices haServices, MetricRegistry metricRegistry, int numTaskManagers, RpcService[] taskManagerRpcServices) throws Exception {
        TaskExecutor[] taskExecutors = new TaskExecutor[numTaskManagers];
        boolean localCommunication = numTaskManagers == 1;
        for (int i = 0; i < numTaskManagers; ++i) {
            taskExecutors[i] = TaskManagerRunner.startTaskManager(configuration, new ResourceID(UUID.randomUUID().toString()), taskManagerRpcServices[i], haServices, this.heartbeatServices, metricRegistry, localCommunication, new TerminatingFatalErrorHandler(i));
            taskExecutors[i].start();
        }
        return taskExecutors;
    }

    private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
        if (rpcService != null) {
            try {
                rpcService.stopService();
            }
            catch (Throwable t) {
                return ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)priorException);
            }
        }
        return priorException;
    }

    private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorException) {
        if (rpcServices != null) {
            Throwable exception = priorException;
            for (RpcService service : rpcServices) {
                try {
                    if (service == null) continue;
                    service.stopService();
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)exception);
                }
            }
        }
        return priorException;
    }

    private class TerminatingFatalErrorHandler
    implements FatalErrorHandler {
        private final int index;

        private TerminatingFatalErrorHandler(int index) {
            this.index = index;
        }

        @Override
        public void onFatalError(Throwable exception) {
            if (MiniCluster.this.running) {
                LOG.error("TaskManager #{} failed.", (Object)this.index, (Object)exception);
                TaskExecutor[] currentTaskManagers = MiniCluster.this.taskManagers;
                if (currentTaskManagers != null) {
                    currentTaskManagers[this.index].shutDown();
                }
            }
        }
    }
}

