/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;

public class VertexManager {
    VertexManagerPluginDescriptor pluginDesc;
    VertexManagerPlugin plugin;
    Vertex managedVertex;
    VertexManagerPluginContextImpl pluginContext;
    UserPayload payload = null;
    AppContext appContext;
    BlockingQueue<TezEvent> rootInputInitEventQueue;
    StateChangeNotifier stateChangeNotifier;
    private static final Log LOG = LogFactory.getLog(VertexManager.class);

    public VertexManager(VertexManagerPluginDescriptor pluginDesc, Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) {
        Preconditions.checkNotNull((Object)pluginDesc, (Object)"pluginDesc is null");
        Preconditions.checkNotNull((Object)managedVertex, (Object)"managedVertex is null");
        Preconditions.checkNotNull((Object)appContext, (Object)"appContext is null");
        Preconditions.checkNotNull((Object)stateChangeNotifier, (Object)"notifier is null");
        this.pluginDesc = pluginDesc;
        this.managedVertex = managedVertex;
        this.appContext = appContext;
        this.stateChangeNotifier = stateChangeNotifier;
        this.rootInputInitEventQueue = new LinkedBlockingQueue<TezEvent>();
    }

    public VertexManagerPlugin getPlugin() {
        return this.plugin;
    }

    public void initialize() throws AMUserCodeException {
        this.pluginContext = new VertexManagerPluginContextImpl();
        if (this.pluginDesc != null) {
            this.plugin = (VertexManagerPlugin)ReflectionUtils.createClazzInstance((String)this.pluginDesc.getClassName(), (Class[])new Class[]{VertexManagerPluginContext.class}, (Object[])new Object[]{this.pluginContext});
            this.payload = this.pluginDesc.getUserPayload();
        }
        try {
            if (!this.pluginContext.isComplete()) {
                this.plugin.initialize();
            }
        }
        catch (Exception e) {
            throw new AMUserCodeException(AMUserCodeException.Source.VertexManager, (Throwable)e);
        }
    }

    public void onVertexStarted(List<TezTaskAttemptID> completions) throws AMUserCodeException {
        HashMap pluginCompletionsMap = Maps.newHashMap();
        if (completions != null && !completions.isEmpty()) {
            for (TezTaskAttemptID tezTaskAttemptID : completions) {
                Integer taskId = new Integer(tezTaskAttemptID.getTaskID().getId());
                String vertexName = this.appContext.getCurrentDAG().getVertex(tezTaskAttemptID.getTaskID().getVertexID()).getName();
                List taskIdList = (List)pluginCompletionsMap.get(vertexName);
                if (taskIdList == null) {
                    taskIdList = Lists.newArrayList();
                    pluginCompletionsMap.put(vertexName, taskIdList);
                }
                taskIdList.add(taskId);
            }
        }
        try {
            if (!this.pluginContext.isComplete()) {
                this.plugin.onVertexStarted((Map)pluginCompletionsMap);
            }
        }
        catch (Exception e) {
            throw new AMUserCodeException(AMUserCodeException.Source.VertexManager, (Throwable)e);
        }
    }

    public void onSourceTaskCompleted(TezTaskID tezTaskId) throws AMUserCodeException {
        Integer taskId = new Integer(tezTaskId.getId());
        String vertexName = this.appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
        try {
            if (!this.pluginContext.isComplete()) {
                this.plugin.onSourceTaskCompleted(vertexName, taskId);
            }
        }
        catch (Exception e) {
            throw new AMUserCodeException(AMUserCodeException.Source.VertexManager, (Throwable)e);
        }
    }

    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws AMUserCodeException {
        try {
            if (!this.pluginContext.isComplete()) {
                this.plugin.onVertexManagerEventReceived(vmEvent);
            }
        }
        catch (Exception e) {
            throw new AMUserCodeException(AMUserCodeException.Source.VertexManager, (Throwable)e);
        }
    }

    public List<TezEvent> onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<org.apache.tez.runtime.api.Event> events) throws AMUserCodeException {
        try {
            if (!this.pluginContext.isComplete()) {
                this.plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
            }
        }
        catch (Exception e) {
            throw new AMUserCodeException(AMUserCodeException.Source.VertexManager, (Throwable)e);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("vertex:" + this.managedVertex.getName() + "; after call of VertexManagerPlugin.onRootVertexInitialized" + " on input:" + inputName + ", current task events size is " + this.rootInputInitEventQueue.size()));
        }
        ArrayList<TezEvent> resultEvents = new ArrayList<TezEvent>();
        this.rootInputInitEventQueue.drainTo(resultEvents);
        return resultEvents;
    }

    class VertexManagerPluginContextImpl
    implements VertexManagerPluginContext,
    VertexStateUpdateListener {
        private EventMetaData rootEventSourceMetadata;
        private Map<String, EventMetaData> destinationEventMetadataMap;
        private final List<String> notificationRegisteredVertices;
        AtomicBoolean isComplete;

        VertexManagerPluginContextImpl() {
            this.rootEventSourceMetadata = new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, VertexManager.this.managedVertex.getName(), "NULL_VERTEX", null);
            this.destinationEventMetadataMap = Maps.newHashMap();
            this.notificationRegisteredVertices = Lists.newArrayList();
            this.isComplete = new AtomicBoolean(false);
        }

        private void checkAndThrowIfDone() {
            if (this.isComplete()) {
                throw new TezUncheckedException("Cannot invoke context methods after reporting done");
            }
        }

        public synchronized Map<String, EdgeProperty> getInputVertexEdgeProperties() {
            this.checkAndThrowIfDone();
            Map<Vertex, Edge> inputs = VertexManager.this.managedVertex.getInputVertices();
            HashMap vertexEdgeMap = Maps.newHashMapWithExpectedSize((int)inputs.size());
            for (Map.Entry<Vertex, Edge> entry : inputs.entrySet()) {
                vertexEdgeMap.put(entry.getKey().getName(), entry.getValue().getEdgeProperty());
            }
            return vertexEdgeMap;
        }

        public synchronized String getVertexName() {
            this.checkAndThrowIfDone();
            return VertexManager.this.managedVertex.getName();
        }

        public synchronized int getVertexNumTasks(String vertexName) {
            this.checkAndThrowIfDone();
            return VertexManager.this.appContext.getCurrentDAG().getVertex(vertexName).getTotalTasks();
        }

        public synchronized void setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, Map<String, InputSpecUpdate> rootInputSpecUpdate) {
            this.checkAndThrowIfDone();
            try {
                VertexManager.this.managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdate, true);
            }
            catch (AMUserCodeException e) {
                throw new TezUncheckedException((Throwable)e);
            }
        }

        public synchronized void scheduleVertexTasks(List<VertexManagerPluginContext.TaskWithLocationHint> tasks) {
            this.checkAndThrowIfDone();
            VertexManager.this.managedVertex.scheduleTasks(tasks);
        }

        @Nullable
        public synchronized Set<String> getVertexInputNames() {
            this.checkAndThrowIfDone();
            Set<String> inputNames = null;
            Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs = VertexManager.this.managedVertex.getAdditionalInputs();
            if (inputs != null) {
                inputNames = inputs.keySet();
            }
            return inputNames;
        }

        public synchronized UserPayload getUserPayload() {
            this.checkAndThrowIfDone();
            return VertexManager.this.payload;
        }

        public synchronized void addRootInputEvents(final String inputName, Collection<InputDataInformationEvent> events) {
            this.checkAndThrowIfDone();
            this.verifyIsRootInput(inputName);
            Collection tezEvents = Collections2.transform(events, (Function)new Function<InputDataInformationEvent, TezEvent>(){

                public TezEvent apply(InputDataInformationEvent riEvent) {
                    TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)riEvent, VertexManagerPluginContextImpl.this.rootEventSourceMetadata);
                    tezEvent.setDestinationInfo(VertexManagerPluginContextImpl.this.getDestinationMetaData(inputName));
                    return tezEvent;
                }
            });
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("vertex:" + VertexManager.this.managedVertex.getName() + "; Added " + events.size() + " for input " + "name " + inputName));
            }
            VertexManager.this.rootInputInitEventQueue.addAll(tezEvents);
        }

        public synchronized void setVertexLocationHint(VertexLocationHint locationHint) {
            this.checkAndThrowIfDone();
            Preconditions.checkNotNull((Object)locationHint, (Object)"locationHint is null");
            VertexManager.this.managedVertex.setVertexLocationHint(locationHint);
        }

        public synchronized int getDAGAttemptNumber() {
            this.checkAndThrowIfDone();
            return VertexManager.this.appContext.getApplicationAttemptId().getAttemptId();
        }

        private void verifyIsRootInput(String inputName) {
            Preconditions.checkState((VertexManager.this.managedVertex.getAdditionalInputs().get(inputName) != null ? 1 : 0) != 0, (Object)"Cannot add events for non-root inputs");
        }

        private EventMetaData getDestinationMetaData(String inputName) {
            EventMetaData destMeta = this.destinationEventMetadataMap.get(inputName);
            if (destMeta == null) {
                destMeta = new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, VertexManager.this.managedVertex.getName(), inputName, null);
                this.destinationEventMetadataMap.put(inputName, destMeta);
            }
            return destMeta;
        }

        public synchronized Resource getVertexTaskResource() {
            this.checkAndThrowIfDone();
            return VertexManager.this.managedVertex.getTaskResource();
        }

        public synchronized Resource getTotalAvailableResource() {
            this.checkAndThrowIfDone();
            return VertexManager.this.appContext.getTaskScheduler().getTotalResources();
        }

        public synchronized int getNumClusterNodes() {
            this.checkAndThrowIfDone();
            return VertexManager.this.appContext.getTaskScheduler().getNumClusterNodes();
        }

        public synchronized Container getTaskContainer(String vertexName, Integer taskIndex) {
            this.checkAndThrowIfDone();
            Vertex vertex = VertexManager.this.appContext.getCurrentDAG().getVertex(vertexName);
            Task task = vertex.getTask(taskIndex);
            TaskAttempt attempt = task.getSuccessfulAttempt();
            if (attempt != null) {
                return attempt.getAssignedContainer();
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public synchronized void registerForVertexStateUpdates(String vertexName, Set<VertexState> stateSet) {
            this.checkAndThrowIfDone();
            List<String> list = this.notificationRegisteredVertices;
            synchronized (list) {
                this.notificationRegisteredVertices.add(vertexName);
            }
            VertexManager.this.stateChangeNotifier.registerForVertexUpdates(vertexName, stateSet, this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void unregisterForVertexStateUpdates() {
            List<String> list = this.notificationRegisteredVertices;
            synchronized (list) {
                for (String vertexName : this.notificationRegisteredVertices) {
                    VertexManager.this.stateChangeNotifier.unregisterForVertexUpdates(vertexName, this);
                }
            }
        }

        boolean isComplete() {
            return this.isComplete.get();
        }

        public synchronized void vertexManagerDone() {
            this.checkAndThrowIfDone();
            LOG.info((Object)("Vertex Manager reported done for : " + VertexManager.this.managedVertex.getLogIdentifier()));
            this.isComplete.set(true);
            this.unregisterForVertexStateUpdates();
        }

        public synchronized void vertexReconfigurationPlanned() {
            this.checkAndThrowIfDone();
            VertexManager.this.managedVertex.vertexReconfigurationPlanned();
        }

        public synchronized void doneReconfiguringVertex() {
            this.checkAndThrowIfDone();
            VertexManager.this.managedVertex.doneReconfiguringVertex();
        }

        @Override
        public synchronized void onStateUpdated(VertexStateUpdate event) {
            if (this.isComplete()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Dropping state update for vertex=" + event.getVertexName() + ", state=" + event.getVertexState() + " since vertexmanager for " + VertexManager.this.managedVertex.getLogIdentifier() + " is complete."));
                }
            } else {
                try {
                    VertexManager.this.plugin.onVertexStateUpdated(event);
                }
                catch (Exception e) {
                    VertexManager.this.appContext.getEventHandler().handle((Event)new VertexEventManagerUserCodeError(VertexManager.this.managedVertex.getVertexId(), new AMUserCodeException(AMUserCodeException.Source.VertexManager, (Throwable)e)));
                }
            }
        }
    }
}

