/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.clusterframework;

import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.Preconditions;
import scala.Function1;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable>
extends FlinkUntypedActor {
    protected static final int EXIT_CODE_FATAL_ERROR = -13;
    public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
    protected final Configuration config;
    private final FiniteDuration messageTimeout;
    private final LeaderRetrievalService leaderRetriever;
    private final Map<ResourceID, WorkerType> startedWorkers;
    private final Set<ActorRef> infoMessageListeners;
    private ActorRef jobManager;
    private UUID leaderSessionID;
    private int designatedPoolSize;

    protected FlinkResourceManager(int numInitialTaskManagers, Configuration flinkConfig, LeaderRetrievalService leaderRetriever) {
        FiniteDuration lt;
        this.config = Objects.requireNonNull(flinkConfig);
        this.leaderRetriever = Objects.requireNonNull(leaderRetriever);
        this.startedWorkers = new HashMap<ResourceID, WorkerType>();
        try {
            lt = AkkaUtils.getLookupTimeout(this.config);
        }
        catch (Exception e) {
            lt = new FiniteDuration(Duration.apply((String)ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS);
        }
        this.messageTimeout = lt;
        this.designatedPoolSize = numInitialTaskManagers;
        this.infoMessageListeners = new HashSet<ActorRef>();
    }

    public void preStart() {
        try {
            this.leaderRetriever.start(new LeaderRetrievalListener(){

                @Override
                public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
                    FlinkResourceManager.this.self().tell((Object)new NewLeaderAvailable(leaderAddress, leaderSessionID), ActorRef.noSender());
                }

                @Override
                public void handleError(Exception e) {
                    FlinkResourceManager.this.self().tell((Object)new FatalErrorOccurred("Leader retrieval service failed", e), ActorRef.noSender());
                }
            });
            this.initialize();
        }
        catch (Throwable t) {
            this.self().tell((Object)new FatalErrorOccurred("Error during startup of ResourceManager actor", t), ActorRef.noSender());
        }
    }

    public void postStop() {
        try {
            this.leaderRetriever.stop();
        }
        catch (Throwable t) {
            this.LOG.error("Could not cleanly shut down leader retrieval service", t);
        }
    }

    @Override
    protected void handleMessage(Object message) {
        try {
            if (message instanceof CheckAndAllocateContainers) {
                this.checkWorkersPool();
            } else if (message instanceof SetWorkerPoolSize) {
                SetWorkerPoolSize msg = (SetWorkerPoolSize)message;
                this.adjustDesignatedNumberOfWorkers(msg.numberOfWorkers());
            } else if (message instanceof RemoveResource) {
                RemoveResource msg = (RemoveResource)message;
                this.removeRegisteredResource(msg.resourceId());
            } else if (message instanceof NotifyResourceStarted) {
                NotifyResourceStarted msg = (NotifyResourceStarted)message;
                this.handleResourceStarted(this.sender(), msg.getResourceID());
            } else if (message instanceof NewLeaderAvailable) {
                NewLeaderAvailable msg = (NewLeaderAvailable)message;
                this.newJobManagerLeaderAvailable(msg.leaderAddress(), msg.leaderSessionId());
            } else if (message instanceof TriggerRegistrationAtJobManager) {
                TriggerRegistrationAtJobManager msg = (TriggerRegistrationAtJobManager)message;
                this.triggerConnectingToJobManager(msg.jobManagerAddress());
            } else if (message instanceof RegisterResourceManagerSuccessful) {
                RegisterResourceManagerSuccessful msg = (RegisterResourceManagerSuccessful)message;
                this.jobManagerLeaderConnected(msg.jobManager(), msg.currentlyRegisteredTaskManagers());
            } else if (message instanceof StopCluster) {
                StopCluster msg = (StopCluster)message;
                this.shutdownCluster(msg.finalStatus(), msg.message());
            } else if (message instanceof RegisterInfoMessageListener) {
                if (this.jobManager != null) {
                    this.infoMessageListeners.add(this.sender());
                    this.sender().tell(this.decorateMessage(RegisterInfoMessageListenerSuccessful.get()), this.jobManager);
                }
            } else if (message instanceof UnRegisterInfoMessageListener) {
                this.infoMessageListeners.remove(this.sender());
            } else if (message instanceof FatalErrorOccurred) {
                FatalErrorOccurred fatalErrorOccurred = (FatalErrorOccurred)message;
                this.fatalError(fatalErrorOccurred.message(), fatalErrorOccurred.error());
            } else {
                this.LOG.error("Discarding unknown message: {}", message);
            }
        }
        catch (Throwable t) {
            this.fatalError("Error processing actor message", t);
        }
    }

    @Override
    protected final UUID getLeaderSessionID() {
        return this.leaderSessionID;
    }

    public int getDesignatedWorkerPoolSize() {
        return this.designatedPoolSize;
    }

    public int getNumberOfStartedTaskManagers() {
        return this.startedWorkers.size();
    }

    public Collection<WorkerType> getStartedTaskManagers() {
        return this.startedWorkers.values();
    }

    public boolean isStarted(ResourceID resourceId) {
        return this.startedWorkers.containsKey(resourceId);
    }

    public Collection<WorkerType> allStartedWorkers() {
        return this.startedWorkers.values();
    }

    private void handleResourceStarted(ActorRef jobManager, ResourceID resourceID) {
        if (resourceID != null) {
            ResourceIDRetrievable oldWorker = (ResourceIDRetrievable)this.startedWorkers.get(resourceID);
            if (oldWorker != null) {
                this.LOG.debug("Notification that TaskManager {} had been started was sent before.", (Object)resourceID);
            } else {
                WorkerType newWorker = this.workerStarted(resourceID);
                if (newWorker != null) {
                    this.startedWorkers.put(resourceID, newWorker);
                    this.LOG.info("TaskManager {} has started.", (Object)resourceID);
                } else {
                    this.LOG.info("TaskManager {} has not been started by this resource manager.", (Object)resourceID);
                }
            }
        }
        jobManager.tell(this.decorateMessage(Acknowledge.get()), this.self());
    }

    private void removeRegisteredResource(ResourceID resourceId) {
        ResourceIDRetrievable worker = (ResourceIDRetrievable)this.startedWorkers.remove(resourceId);
        if (worker != null) {
            this.releaseStartedWorker(worker);
        } else {
            this.LOG.warn("Resource {} could not be released", (Object)resourceId);
        }
    }

    private void newJobManagerLeaderAvailable(String leaderAddress, UUID leaderSessionID) {
        this.LOG.debug("Received new leading JobManager {}. Connecting.", (Object)leaderAddress);
        this.jobManagerLostLeadership();
        if (leaderAddress != null) {
            this.leaderSessionID = leaderSessionID;
            this.triggerConnectingToJobManager(leaderAddress);
        }
    }

    protected void triggerConnectingToJobManager(String leaderAddress) {
        this.LOG.info("Trying to associate with JobManager leader " + leaderAddress);
        Object registerMessage = this.decorateMessage(new RegisterResourceManager(this.self()));
        final Object retryMessage = this.decorateMessage(new TriggerRegistrationAtJobManager(leaderAddress));
        ActorSelection jobManagerSel = this.context().actorSelection(leaderAddress);
        Future future = Patterns.ask((ActorSelection)jobManagerSel, (Object)registerMessage, (Timeout)new Timeout(this.messageTimeout));
        future.onComplete((Function1)new OnComplete<Object>(){

            public void onComplete(Throwable failure, Object msg) {
                if (FlinkResourceManager.this.jobManager == null) {
                    if (msg != null) {
                        if (msg instanceof JobManagerMessages.LeaderSessionMessage && ((JobManagerMessages.LeaderSessionMessage)msg).message() instanceof RegisterResourceManagerSuccessful) {
                            FlinkResourceManager.this.self().tell(msg, ActorRef.noSender());
                        } else {
                            FlinkResourceManager.this.LOG.error("Invalid response type to registration at JobManager: {}", msg);
                            FlinkResourceManager.this.self().tell(retryMessage, ActorRef.noSender());
                        }
                    } else {
                        FlinkResourceManager.this.LOG.error("Resource manager could not register at JobManager", failure);
                        FlinkResourceManager.this.self().tell(retryMessage, ActorRef.noSender());
                    }
                }
            }
        }, (ExecutionContext)this.context().dispatcher());
    }

    private void jobManagerLostLeadership() {
        if (this.jobManager != null) {
            this.LOG.info("Associated JobManager {} lost leader status", (Object)this.jobManager);
            this.jobManager = null;
            this.leaderSessionID = null;
            this.infoMessageListeners.clear();
        }
    }

    private void jobManagerLeaderConnected(ActorRef newJobManagerLeader, Collection<ResourceID> workers) {
        if (this.jobManager == null) {
            this.LOG.info("Resource Manager associating with leading JobManager {} - leader session {}", (Object)newJobManagerLeader, (Object)this.leaderSessionID);
            this.jobManager = newJobManagerLeader;
            if (workers.size() > 0) {
                this.LOG.info("Received TaskManagers that were registered at the leader JobManager. Trying to consolidate.");
                HashSet<ResourceID> toHandle = new HashSet<ResourceID>(workers.size());
                toHandle.addAll(workers);
                try {
                    Collection<WorkerType> consolidated = this.reacceptRegisteredWorkers(workers);
                    this.LOG.info("Consolidated {} TaskManagers", (Object)consolidated.size());
                    for (ResourceIDRetrievable worker : consolidated) {
                        ResourceID resourceID = worker.getResourceID();
                        this.startedWorkers.put(resourceID, worker);
                        toHandle.remove(resourceID);
                    }
                }
                catch (Throwable t) {
                    this.LOG.error("Error during consolidation of known TaskManagers", t);
                    for (ResourceID id : toHandle) {
                        this.releasePendingWorker(id);
                    }
                }
            }
            this.checkWorkersPool();
        } else {
            String msg = "Attempting to associate with new JobManager leader " + newJobManagerLeader + " without previously disassociating from current leader " + this.jobManager;
            this.fatalError(msg, new Exception(msg));
        }
    }

    private void shutdownCluster(ApplicationStatus status, String diagnostics) {
        this.LOG.info("Shutting down cluster with status {} : {}", (Object)status, (Object)diagnostics);
        this.shutdownApplication(status, diagnostics);
    }

    private void checkWorkersPool() {
        int numWorkersPending = this.getNumWorkerRequestsPending();
        int numWorkersPendingRegistration = this.getNumWorkersPendingRegistration();
        Preconditions.checkState((numWorkersPending >= 0 ? 1 : 0) != 0, (Object)"Number of pending workers should never be below 0.");
        Preconditions.checkState((numWorkersPendingRegistration >= 0 ? 1 : 0) != 0, (Object)"Number of pending workers pending registration should never be below 0.");
        int allAvailableAndPending = this.startedWorkers.size() + numWorkersPending + numWorkersPendingRegistration;
        int missing = this.designatedPoolSize - allAvailableAndPending;
        if (missing > 0) {
            this.requestNewWorkers(missing);
        }
    }

    private void adjustDesignatedNumberOfWorkers(int num) {
        if (num >= 0) {
            this.LOG.info("Adjusting designated worker pool size to {}", (Object)num);
            this.designatedPoolSize = num;
            this.checkWorkersPool();
        } else {
            this.LOG.warn("Ignoring invalid designated worker pool size: " + num);
        }
    }

    public void triggerCheckWorkers() {
        this.self().tell(this.decorateMessage(CheckAndAllocateContainers.get()), ActorRef.noSender());
    }

    public void notifyWorkerFailed(ResourceID resourceID, String message) {
        ResourceIDRetrievable worker = (ResourceIDRetrievable)this.startedWorkers.remove(resourceID);
        if (worker != null) {
            this.jobManager.tell(this.decorateMessage(new ResourceRemoved(resourceID, message)), this.self());
        }
    }

    protected abstract void initialize() throws Exception;

    protected abstract void shutdownApplication(ApplicationStatus var1, String var2);

    protected abstract void fatalError(String var1, Throwable var2);

    protected abstract void requestNewWorkers(int var1);

    protected abstract void releasePendingWorker(ResourceID var1);

    protected abstract void releaseStartedWorker(WorkerType var1);

    protected abstract WorkerType workerStarted(ResourceID var1);

    protected abstract Collection<WorkerType> reacceptRegisteredWorkers(Collection<ResourceID> var1);

    protected abstract int getNumWorkerRequestsPending();

    protected abstract int getNumWorkersPendingRegistration();

    protected void sendInfoMessage(String message) {
        for (ActorRef listener : this.infoMessageListeners) {
            listener.tell(this.decorateMessage(new InfoMessage(message)), this.self());
        }
    }

    public static ActorRef startResourceManagerActors(Configuration configuration, ActorSystem actorSystem, LeaderRetrievalService leaderRetriever, Class<? extends FlinkResourceManager<?>> resourceManagerClass) {
        return FlinkResourceManager.startResourceManagerActors(configuration, actorSystem, leaderRetriever, resourceManagerClass, "resourcemanager-" + UUID.randomUUID());
    }

    public static ActorRef startResourceManagerActors(Configuration configuration, ActorSystem actorSystem, LeaderRetrievalService leaderRetriever, Class<? extends FlinkResourceManager<?>> resourceManagerClass, String resourceManagerActorName) {
        Props resourceMasterProps = Props.create(resourceManagerClass, (Object[])new Object[]{configuration, leaderRetriever});
        return actorSystem.actorOf(resourceMasterProps, resourceManagerActorName);
    }
}

