/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.rm;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.Utils;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.app.rm.AMSchedulerEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.LocalTaskSchedulerService;
import org.apache.tez.dag.app.rm.TaskSchedulerContextImpl;
import org.apache.tez.dag.app.rm.TaskSchedulerContextImplWrapper;
import org.apache.tez.dag.app.rm.TaskSchedulerWrapper;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerService;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerServiceError;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
import org.apache.tez.dag.app.web.WebUIService;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskSchedulerManager
extends AbstractService
implements EventHandler<AMSchedulerEvent> {
    static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerManager.class);
    static final String APPLICATION_ID_PLACEHOLDER = "__APPLICATION_ID__";
    static final String HISTORY_URL_BASE = "__HISTORY_URL_BASE__";
    protected final AppContext appContext;
    private final EventHandler eventHandler;
    private final String historyUrl;
    private DAGAppMaster dagAppMaster;
    private Map<ApplicationAccessType, String> appAcls = null;
    private Thread eventHandlingThread;
    private volatile boolean stopEventHandling;
    protected volatile boolean isSignalled = false;
    final DAGClientServer clientService;
    private final ContainerSignatureMatcher containerSignatureMatcher;
    private int cachedNodeCount = -1;
    private AtomicBoolean shouldUnregisterFlag = new AtomicBoolean(false);
    private final WebUIService webUI;
    private final NamedEntityDescriptor[] taskSchedulerDescriptors;
    protected final TaskSchedulerWrapper[] taskSchedulers;
    protected final ServicePluginLifecycleAbstractService[] taskSchedulerServiceWrappers;
    @VisibleForTesting
    final ExecutorService appCallbackExecutor;
    private final boolean isLocalMode;
    private final long SCHEDULER_APP_ID_BASE = 111101111L;
    private final long SCHEDULER_APP_ID_INCREMENT = 111111111L;
    BlockingQueue<AMSchedulerEvent> eventQueue = new LinkedBlockingQueue<AMSchedulerEvent>();

    @InterfaceAudience.Private
    @VisibleForTesting
    public TaskSchedulerManager(TaskScheduler taskScheduler, AppContext appContext, ContainerSignatureMatcher containerSignatureMatcher, DAGClientServer clientService, ExecutorService appCallbackExecutor) {
        super(TaskSchedulerManager.class.getName());
        this.appContext = appContext;
        this.containerSignatureMatcher = containerSignatureMatcher;
        this.clientService = clientService;
        this.eventHandler = appContext.getEventHandler();
        this.appCallbackExecutor = appCallbackExecutor;
        this.taskSchedulers = new TaskSchedulerWrapper[]{new TaskSchedulerWrapper(taskScheduler)};
        this.taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[]{new ServicePluginLifecycleAbstractService<TaskScheduler>(taskScheduler)};
        this.taskSchedulerDescriptors = null;
        this.webUI = null;
        this.historyUrl = null;
        this.isLocalMode = false;
    }

    public TaskSchedulerManager(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI, List<NamedEntityDescriptor> schedulerDescriptors, boolean isLocalMode) {
        super(TaskSchedulerManager.class.getName());
        Preconditions.checkArgument((schedulerDescriptors != null && !schedulerDescriptors.isEmpty() ? 1 : 0) != 0, (Object)"TaskSchedulerDescriptors must be specified");
        this.appContext = appContext;
        this.eventHandler = eventHandler;
        this.clientService = clientService;
        this.containerSignatureMatcher = containerSignatureMatcher;
        this.webUI = webUI;
        this.historyUrl = this.getHistoryUrl();
        this.isLocalMode = isLocalMode;
        this.appCallbackExecutor = this.createAppCallbackExecutorService();
        if (this.webUI != null) {
            this.webUI.setHistoryUrl(this.historyUrl);
        }
        this.taskSchedulerDescriptors = schedulerDescriptors.toArray(new NamedEntityDescriptor[schedulerDescriptors.size()]);
        this.taskSchedulers = new TaskSchedulerWrapper[this.taskSchedulerDescriptors.length];
        this.taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length];
    }

    public Map<ApplicationAccessType, String> getApplicationAcls() {
        return this.appAcls;
    }

    public void setSignalled(boolean isSignalled) {
        this.isSignalled = isSignalled;
        LOG.info("TaskScheduler notified that iSignalled was : " + isSignalled);
    }

    public int getNumClusterNodes() {
        return this.cachedNodeCount;
    }

    public Resource getAvailableResources(int schedulerId) {
        try {
            return this.taskSchedulers[schedulerId].getAvailableResources();
        }
        catch (Exception e) {
            String msg = "Error in TaskScheduler while getting available resources, schedule=" + Utils.getTaskSchedulerIdentifierString(schedulerId, this.appContext);
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
            throw new RuntimeException(e);
        }
    }

    public Resource getTotalResources(int schedulerId) {
        try {
            return this.taskSchedulers[schedulerId].getTotalResources();
        }
        catch (Exception e) {
            String msg = "Error in TaskScheduler while getting total resources, scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, this.appContext);
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
            throw new RuntimeException(e);
        }
    }

    private ExecutorService createAppCallbackExecutorService() {
        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d").setDaemon(true).build());
    }

    public synchronized void handleEvent(AMSchedulerEvent sEvent) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processing the event " + sEvent.toString());
        }
        block0 : switch ((AMSchedulerEventType)sEvent.getType()) {
            case S_TA_LAUNCH_REQUEST: {
                this.handleTaLaunchRequest((AMSchedulerEventTALaunchRequest)sEvent);
                break;
            }
            case S_TA_ENDED: {
                AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
                switch (event.getState()) {
                    case FAILED: 
                    case KILLED: {
                        this.handleTAUnsuccessfulEnd(event);
                        break block0;
                    }
                    case SUCCEEDED: {
                        this.handleTASucceeded(event);
                        break block0;
                    }
                }
                throw new TezUncheckedException("Unexpected TA_ENDED state: " + (Object)((Object)event.getState()));
            }
            case S_CONTAINER_DEALLOCATE: {
                this.handleContainerDeallocate((AMSchedulerEventDeallocateContainer)sEvent);
                break;
            }
            case S_NODE_UNBLACKLISTED: 
            case S_NODE_BLACKLISTED: {
                this.handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate)sEvent);
                break;
            }
            case S_NODE_UNHEALTHY: {
                break;
            }
            case S_NODE_HEALTHY: {
                break;
            }
        }
    }

    public void handle(AMSchedulerEvent event) {
        int remCapacity;
        int qSize = this.eventQueue.size();
        if (qSize != 0 && qSize % 1000 == 0) {
            LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
        }
        if ((remCapacity = this.eventQueue.remainingCapacity()) < 1000) {
            LOG.warn("Very low remaining capacity in the event-queue of RMContainerAllocator: " + remCapacity);
        }
        try {
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new TezUncheckedException((Throwable)e);
        }
    }

    private void sendEvent(Event<?> event) {
        this.eventHandler.handle(event);
    }

    private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
        boolean invalidEventType = false;
        try {
            if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
                this.taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
            } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
                this.taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
            } else {
                invalidEventType = true;
            }
        }
        catch (Exception e) {
            String msg = "Error in TaskScheduler for handling node blacklisting, eventType=" + event.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), this.appContext);
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
            return;
        }
        if (invalidEventType) {
            throw new TezUncheckedException("Invalid event type: " + event.getType());
        }
    }

    private void handleContainerDeallocate(AMSchedulerEventDeallocateContainer event) {
        ContainerId containerId = event.getContainerId();
        try {
            this.taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
        }
        catch (Exception e) {
            String msg = "Error in TaskScheduler for handling Container De-allocation, eventType=" + event.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), this.appContext) + ", containerId=" + containerId;
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
            return;
        }
        this.sendEvent((Event<?>)new AMContainerEventStopRequest(containerId));
    }

    private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
        TaskAttempt attempt = event.getAttempt();
        boolean wasContainerAllocated = false;
        try {
            wasContainerAllocated = this.taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics());
        }
        catch (Exception e) {
            String msg = "Error in TaskScheduler for handling Task De-allocation, eventType=" + event.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), this.appContext) + ", taskAttemptId=" + attempt.getID();
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
            return;
        }
        ContainerId attemptContainerId = attempt.getAssignedContainerID();
        if (!wasContainerAllocated) {
            LOG.info("Task: " + attempt.getID() + " has no container assignment in the scheduler");
            if (attemptContainerId != null) {
                LOG.error("No container allocated to task: " + attempt.getID() + " according to scheduler. Task reported container id: " + attemptContainerId);
            }
        }
        if (attemptContainerId != null) {
            this.sendEvent((Event<?>)new AMContainerEventStopRequest(attemptContainerId));
            this.sendEvent((Event<?>)new AMNodeEventTaskAttemptEnded(this.appContext.getAllContainers().get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(), attemptContainerId, attempt.getID(), event.getState() == TaskAttemptState.FAILED));
        }
    }

    private void handleTASucceeded(AMSchedulerEventTAEnded event) {
        TaskAttempt attempt = event.getAttempt();
        ContainerId usedContainerId = event.getUsedContainerId();
        if (event.getUsedContainerId() != null) {
            this.sendEvent((Event<?>)new AMContainerEventTASucceeded(usedContainerId, event.getAttemptID()));
            this.sendEvent((Event<?>)new AMNodeEventTaskAttemptSucceeded(this.appContext.getAllContainers().get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId, event.getAttemptID()));
        }
        boolean wasContainerAllocated = false;
        try {
            wasContainerAllocated = this.taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, true, null, event.getDiagnostics());
        }
        catch (Exception e) {
            String msg = "Error in TaskScheduler for handling Task De-allocation, eventType=" + event.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), this.appContext) + ", taskAttemptId=" + attempt.getID();
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
            return;
        }
        if (!wasContainerAllocated) {
            LOG.error("De-allocated successful task: " + attempt.getID() + ", but TaskScheduler reported no container assigned to task");
        }
    }

    private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
        TaskAttempt taskAttempt = event.getTaskAttempt();
        TaskLocationHint locationHint = event.getLocationHint();
        String[] hosts = null;
        String[] racks = null;
        if (locationHint != null) {
            TaskLocationHint.TaskBasedLocationAffinity taskAffinity = locationHint.getAffinitizedTask();
            if (taskAffinity != null) {
                Vertex vertex = this.appContext.getCurrentDAG().getVertex(taskAffinity.getVertexName());
                Preconditions.checkNotNull((Object)vertex, (Object)("Invalid vertex in task based affinity " + taskAffinity + " for attempt: " + taskAttempt.getID()));
                int taskIndex = taskAffinity.getTaskIndex();
                Preconditions.checkState((taskIndex >= 0 && taskIndex < vertex.getTotalTasks() ? 1 : 0) != 0, (Object)("Invalid taskIndex in task based affinity " + taskAffinity + " for attempt: " + taskAttempt.getID()));
                TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
                if (affinityAttempt != null) {
                    Preconditions.checkNotNull((Object)affinityAttempt.getAssignedContainerID(), (Object)affinityAttempt.getID());
                    try {
                        this.taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt, event.getCapability(), affinityAttempt.getAssignedContainerID(), Priority.newInstance((int)event.getPriority()), event.getContainerContext(), (Object)event);
                    }
                    catch (Exception e) {
                        String msg = "Error in TaskScheduler for handling Task Allocation, eventType=" + event.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), this.appContext) + ", taskAttemptId=" + taskAttempt.getID();
                        LOG.error(msg, (Throwable)e);
                        this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
                    }
                    return;
                }
                LOG.info("No attempt for task affinity to " + taskAffinity + " for attempt " + taskAttempt.getID() + " Ignoring.");
            } else {
                hosts = locationHint.getHosts() != null ? locationHint.getHosts().toArray(new String[locationHint.getHosts().size()]) : null;
                racks = locationHint.getRacks() != null ? locationHint.getRacks().toArray(new String[locationHint.getRacks().size()]) : null;
            }
        }
        try {
            this.taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt, event.getCapability(), hosts, racks, Priority.newInstance((int)event.getPriority()), event.getContainerContext(), (Object)event);
        }
        catch (Exception e) {
            String msg = "Error in TaskScheduler for handling Task Allocation, eventType=" + event.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), this.appContext) + ", taskAttemptId=" + taskAttempt.getID();
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
        }
    }

    @VisibleForTesting
    TaskScheduler createTaskScheduler(String host, int port, String trackingUrl, AppContext appContext, NamedEntityDescriptor taskSchedulerDescriptor, long customAppIdIdentifier, int schedulerId) throws TezException {
        TaskSchedulerContextImpl rawContext = new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl, customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
        TaskSchedulerContext wrappedContext = this.wrapTaskSchedulerContext(rawContext);
        String schedulerName = taskSchedulerDescriptor.getEntityName();
        if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) {
            return this.createYarnTaskScheduler(wrappedContext, schedulerId);
        }
        if (schedulerName.equals(TezConstants.getTezUberServicePluginName())) {
            return this.createUberTaskScheduler(wrappedContext, schedulerId);
        }
        return this.createCustomTaskScheduler(wrappedContext, taskSchedulerDescriptor, schedulerId);
    }

    @VisibleForTesting
    TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) {
        return new TaskSchedulerContextImplWrapper(rawContext, this.appCallbackExecutor);
    }

    @VisibleForTesting
    TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext, int schedulerId) {
        LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
        return new YarnTaskSchedulerService(taskSchedulerContext);
    }

    @VisibleForTesting
    TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext, int schedulerId) {
        LOG.info("Creating TaskScheduler: Local TaskScheduler with clusterIdentifier={}", (Object)taskSchedulerContext.getCustomClusterIdentifier());
        return new LocalTaskSchedulerService(taskSchedulerContext);
    }

    TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext, NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId) throws TezException {
        LOG.info("Creating custom TaskScheduler {}:{} with clusterIdentifier={}", new Object[]{taskSchedulerDescriptor.getEntityName(), taskSchedulerDescriptor.getClassName(), taskSchedulerContext.getCustomClusterIdentifier()});
        return (TaskScheduler)ReflectionUtils.createClazzInstance((String)taskSchedulerDescriptor.getClassName(), (Class[])new Class[]{TaskSchedulerContext.class}, (Object[])new Object[]{taskSchedulerContext});
    }

    @VisibleForTesting
    protected void instantiateSchedulers(String host, int port, String trackingUrl, AppContext appContext) throws TezException {
        int j = 0;
        for (int i = 0; i < this.taskSchedulerDescriptors.length; ++i) {
            long customAppIdIdentifier = this.isLocalMode && this.taskSchedulerDescriptors[i].getEntityName().equals(TezConstants.getTezUberServicePluginName()) || this.taskSchedulerDescriptors[i].getEntityName().equals(TezConstants.getTezYarnServicePluginName()) ? appContext.getApplicationID().getClusterTimestamp() : 111101111L + (long)j++ * 111111111L;
            this.taskSchedulers[i] = new TaskSchedulerWrapper(this.createTaskScheduler(host, port, trackingUrl, appContext, this.taskSchedulerDescriptors[i], customAppIdIdentifier, i));
            this.taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<TaskScheduler>(this.taskSchedulers[i].getTaskScheduler());
        }
    }

    public synchronized void serviceStart() throws Exception {
        InetSocketAddress serviceAddr = this.clientService.getBindAddress();
        this.dagAppMaster = this.appContext.getAppMaster();
        String trackingUrl = this.webUI != null ? this.webUI.getTrackingURL() : "";
        this.instantiateSchedulers(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, this.appContext);
        for (int i = 0; i < this.taskSchedulers.length; ++i) {
            this.taskSchedulerServiceWrappers[i].init(this.getConfig());
            this.taskSchedulerServiceWrappers[i].start();
            if (!this.shouldUnregisterFlag.get()) continue;
            this.taskSchedulers[i].setShouldUnregister();
        }
        this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!TaskSchedulerManager.this.stopEventHandling && !Thread.currentThread().isInterrupted()) {
                    AMSchedulerEvent event;
                    try {
                        if (TaskSchedulerManager.this.eventQueue.peek() == null) {
                            TaskSchedulerManager.this.notifyForTest();
                        }
                        event = TaskSchedulerManager.this.eventQueue.take();
                    }
                    catch (InterruptedException e) {
                        if (TaskSchedulerManager.this.stopEventHandling) continue;
                        LOG.warn("Continuing after interrupt : ", (Throwable)e);
                        continue;
                    }
                    try {
                        TaskSchedulerManager.this.handleEvent(event);
                    }
                    catch (Throwable t) {
                        LOG.error("Error in handling event type " + event.getType() + " to the TaskScheduler", t);
                        TaskSchedulerManager.this.sendEvent((Event)new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
                        return;
                    }
                    finally {
                        TaskSchedulerManager.this.notifyForTest();
                    }
                }
            }
        };
        this.eventHandlingThread.start();
    }

    protected void notifyForTest() {
    }

    public void initiateStop() {
        for (int i = 0; i < this.taskSchedulers.length; ++i) {
            try {
                this.taskSchedulers[i].getTaskScheduler().initiateStop();
                continue;
            }
            catch (Exception e) {
                LOG.error("Failed to do a clean initiateStop for Scheduler: " + Utils.getTaskSchedulerIdentifierString(i, this.appContext), (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void serviceStop() throws InterruptedException {
        TaskSchedulerManager taskSchedulerManager = this;
        synchronized (taskSchedulerManager) {
            this.stopEventHandling = true;
            if (this.eventHandlingThread != null) {
                this.eventHandlingThread.interrupt();
            }
        }
        for (int i = 0; i < this.taskSchedulers.length; ++i) {
            if (this.taskSchedulers[i] == null) continue;
            this.taskSchedulerServiceWrappers[i].stop();
        }
        LOG.info("Shutting down AppCallbackExecutor");
        this.appCallbackExecutor.shutdownNow();
        this.appCallbackExecutor.awaitTermination(1000L, TimeUnit.MILLISECONDS);
    }

    public synchronized void taskAllocated(int schedulerId, Object task, Object appCookie, Container container) {
        AMSchedulerEventTALaunchRequest event = (AMSchedulerEventTALaunchRequest)((Object)appCookie);
        ContainerId containerId = container.getId();
        if (this.appContext.getAllContainers().addContainerIfNew(container, schedulerId, event.getLauncherId(), event.getTaskCommId())) {
            this.appContext.getNodeTracker().nodeSeen(container.getNodeId(), schedulerId);
            this.sendEvent((Event<?>)new AMNodeEventContainerAllocated(container.getNodeId(), schedulerId, container.getId()));
        }
        TaskAttempt taskAttempt = event.getTaskAttempt();
        assert (task.equals(taskAttempt));
        if (this.appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
            this.sendEvent((Event<?>)new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(), event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
        }
        this.sendEvent((Event<?>)new AMContainerEventAssignTA(containerId, taskAttempt.getID(), event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event.getContainerContext().getCredentials(), event.getPriority()));
    }

    public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) {
        AMContainer amContainer = this.appContext.getAllContainers().get(containerStatus.getContainerId());
        if (amContainer != null) {
            String message = "Container completed. ";
            TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.CONTAINER_EXITED;
            int exitStatus = containerStatus.getExitStatus();
            if (exitStatus == -102) {
                message = "Container preempted externally. ";
                errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
            } else if (exitStatus == -101) {
                message = "Container disk failed. ";
                errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
            } else if (exitStatus != 0) {
                message = "Container failed, exitCode=" + exitStatus + ". ";
            }
            if (containerStatus.getDiagnostics() != null) {
                message = message + containerStatus.getDiagnostics();
            }
            this.sendEvent((Event<?>)new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message, errCause));
        }
    }

    public synchronized void containerBeingReleased(int schedulerId, ContainerId containerId) {
        AMContainer amContainer = this.appContext.getAllContainers().get(containerId);
        if (amContainer != null) {
            this.sendEvent((Event<?>)new AMContainerEventStopRequest(containerId));
        }
    }

    public synchronized void nodesUpdated(int schedulerId, List<NodeReport> updatedNodes) {
        for (NodeReport nr : updatedNodes) {
            this.eventHandler.handle((Event)new AMNodeEventStateChanged(nr, schedulerId));
        }
    }

    public synchronized void appShutdownRequested(int schedulerId) {
        LOG.info("App shutdown requested by scheduler {}", (Object)schedulerId);
        this.sendEvent((Event<?>)new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
    }

    public synchronized void setApplicationRegistrationData(int schedulerId, Resource maxContainerCapability, Map<ApplicationAccessType, String> appAcls, ByteBuffer clientAMSecretKey) {
        this.appContext.getClusterInfo().setMaxContainerCapability(maxContainerCapability);
        this.appAcls = appAcls;
        this.clientService.setClientAMSecretKey(clientAMSecretKey);
    }

    public TaskSchedulerContext.AppFinalStatus getFinalAppStatus() {
        FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
        StringBuffer sb = new StringBuffer();
        if (this.dagAppMaster == null) {
            finishState = FinalApplicationStatus.UNDEFINED;
            sb.append("App not yet initialized");
        } else {
            DAGAppMasterState appMasterState = this.dagAppMaster.getState();
            finishState = appMasterState == DAGAppMasterState.SUCCEEDED ? FinalApplicationStatus.SUCCEEDED : (appMasterState == DAGAppMasterState.KILLED || appMasterState == DAGAppMasterState.RUNNING && this.isSignalled ? FinalApplicationStatus.KILLED : (appMasterState == DAGAppMasterState.FAILED || appMasterState == DAGAppMasterState.ERROR ? FinalApplicationStatus.FAILED : FinalApplicationStatus.UNDEFINED));
            List<String> diagnostics = this.dagAppMaster.getDiagnostics();
            if (diagnostics != null) {
                for (String s : diagnostics) {
                    sb.append(s).append("\n");
                }
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Setting job diagnostics to " + sb.toString());
        }
        return new TaskSchedulerContext.AppFinalStatus(finishState, sb.toString(), this.historyUrl);
    }

    public float getProgress(int schedulerId) {
        int nodeCount = 0;
        try {
            nodeCount = this.taskSchedulers[0].getClusterNodeCount();
        }
        catch (Exception e) {
            String msg = "Error in TaskScheduler while getting node count, scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, this.appContext);
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
            throw new RuntimeException(e);
        }
        if (nodeCount != this.cachedNodeCount) {
            this.cachedNodeCount = nodeCount;
            this.sendEvent((Event<?>)new AMNodeEventNodeCountUpdated(this.cachedNodeCount, schedulerId));
        }
        return this.dagAppMaster.getProgress();
    }

    public void reportError(int taskSchedulerIndex, ServicePluginError servicePluginError, String diagnostics, DagInfo dagInfo) {
        if (servicePluginError == YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR) {
            LOG.info("Error reported by scheduler {} - {}", (Object)(Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, this.appContext) + ": " + diagnostics));
            if (this.taskSchedulerDescriptors[taskSchedulerIndex].getClassName().equals(YarnTaskSchedulerService.class.getName())) {
                LOG.warn("Reporting a SchedulerServiceError to the DAGAppMaster since the error was reported by the default YARN Task Scheduler");
                this.sendEvent((Event<?>)new DAGAppMasterEventSchedulingServiceError(diagnostics));
            }
        } else if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) {
            String msg = "Fatal error reported by TaskScheduler, scheduler=" + Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, this.appContext) + ", servicePluginError=" + servicePluginError + ", diagnostics= " + (diagnostics == null ? "" : diagnostics);
            LOG.error(msg);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, null));
        } else {
            Utils.processNonFatalServiceErrorReport(Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, this.appContext), servicePluginError, diagnostics, dagInfo, this.appContext, "TaskScheduler");
        }
    }

    public void dagCompleted() {
        for (int i = 0; i < this.taskSchedulers.length; ++i) {
            try {
                this.taskSchedulers[i].dagComplete();
                continue;
            }
            catch (Exception e) {
                String msg = "Error in TaskScheduler when notified for Dag Completion, scheduler=" + Utils.getTaskSchedulerIdentifierString(i, this.appContext);
                LOG.error(msg, (Throwable)e);
                this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
            }
        }
    }

    public void dagSubmitted() {
    }

    public void preemptContainer(int schedulerId, ContainerId containerId) {
        AMContainer amContainer = this.appContext.getAllContainers().get(containerId);
        try {
            this.taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
        }
        catch (Exception e) {
            String msg = "Error in TaskScheduler when preempting container, scheduler=" + Utils.getTaskSchedulerIdentifierString(amContainer.getTaskSchedulerIdentifier(), this.appContext) + ", containerId=" + containerId;
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
        }
        this.sendEvent((Event<?>)new AMContainerEventCompleted(containerId, -1000, "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
    }

    public void setShouldUnregisterFlag() {
        LOG.info("TaskScheduler notified that it should unregister from RM");
        this.shouldUnregisterFlag.set(true);
        for (int i = 0; i < this.taskSchedulers.length; ++i) {
            if (this.taskSchedulers[i] == null) continue;
            try {
                this.taskSchedulers[i].setShouldUnregister();
                continue;
            }
            catch (Exception e) {
                String msg = "Error in TaskScheduler when setting Unregister Flag, scheduler=" + Utils.getTaskSchedulerIdentifierString(i, this.appContext);
                LOG.error(msg, (Throwable)e);
                this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
            }
        }
    }

    public ContainerSignatureMatcher getContainerSignatureMatcher() {
        return this.containerSignatureMatcher;
    }

    public boolean hasUnregistered() {
        boolean result = true;
        for (int i = 0; i < this.taskSchedulers.length; ++i) {
            try {
                result &= this.taskSchedulers[i].hasUnregistered();
            }
            catch (Exception e) {
                String msg = "Error in TaskScheduler when checking if a scheduler has unregistered, scheduler=" + Utils.getTaskSchedulerIdentifierString(i, this.appContext);
                LOG.error(msg, (Throwable)e);
                this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e));
            }
            if (result) continue;
            return result;
        }
        return result;
    }

    @VisibleForTesting
    public String getHistoryUrl() {
        Configuration config = this.appContext.getAMConf();
        String historyUrl = "";
        String historyUrlTemplate = config.get("tez.am.tez-ui.history-url.template", "__HISTORY_URL_BASE__/#/tez-app/__APPLICATION_ID__");
        String historyUrlBase = config.get("tez.tez-ui.history-url.base", "");
        if (!(historyUrlTemplate.isEmpty() || historyUrlBase.isEmpty() || (historyUrl = historyUrlTemplate.replaceAll(APPLICATION_ID_PLACEHOLDER, this.appContext.getApplicationID().toString()).replaceAll(HISTORY_URL_BASE, historyUrlBase).replaceAll("([^:])/{2,}", "$1/")).startsWith("http"))) {
            historyUrl = "http://" + historyUrl;
        }
        return historyUrl;
    }
}

