/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.counters.AbstractCounters;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.LimitExceededException;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.Scope;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGReport;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCommitCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DAGImpl
implements DAG,
EventHandler<DAGEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(DAGImpl.class);
    private static final String LINE_SEPARATOR = System.getProperty("line.separator");
    private final TezDAGID dagId;
    private final Clock clock;
    private final Lock dagStatusLock = new ReentrantLock();
    private final Condition dagCompletionCondition = this.dagStatusLock.newCondition();
    private final AtomicBoolean isFinalState = new AtomicBoolean(false);
    private final Lock readLock;
    private final Lock writeLock;
    private final String dagName;
    private final TaskAttemptListener taskAttemptListener;
    private final TaskHeartbeatHandler taskHeartbeatHandler;
    private final Object tasksSyncHandle = new Object();
    private AtomicBoolean committed = new AtomicBoolean(false);
    private AtomicBoolean aborted = new AtomicBoolean(false);
    private AtomicBoolean commitCanceled = new AtomicBoolean(false);
    boolean commitAllOutputsOnSuccess = true;
    @VisibleForTesting
    DAGScheduler dagScheduler;
    private final EventHandler eventHandler;
    private final String userName;
    private final AppContext appContext;
    private final UserGroupInformation dagUGI;
    private final ACLManager aclManager;
    @VisibleForTesting
    StateChangeNotifier entityUpdateTracker;
    volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
    @VisibleForTesting
    Map<String, Edge> edges = new HashMap<String, Edge>();
    private TezCounters dagCounters = new TezCounters();
    private Object fullCountersLock = new Object();
    @VisibleForTesting
    TezCounters fullCounters = null;
    private TezCounters cachedCounters = null;
    private long cachedCountersTimestamp = 0L;
    private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
    private final Configuration dagConf;
    private final DAGProtos.DAGPlan jobPlan;
    private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false);
    Map<String, LocalResource> localResources;
    long startDAGCpuTime = 0L;
    long startDAGGCTime = 0L;
    private final List<String> diagnostics = new ArrayList<String>();
    boolean recoveryInitEventSeen = false;
    boolean recoveryStartEventSeen = false;
    private TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
    private static final DagStateChangedCallback STATE_CHANGED_CALLBACK = new DagStateChangedCallback();
    @VisibleForTesting
    Map<OutputKey, ListenableFuture<Void>> commitFutures = new HashMap<OutputKey, ListenableFuture<Void>>();
    private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
    private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
    private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION = new CounterUpdateTransition();
    private static final DAGSchedulerUpdateTransition DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition();
    private static final CommitCompletedTransition COMMIT_COMPLETED_TRANSITION = new CommitCompletedTransition();
    protected static final StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent> stateMachineFactory = new StateMachineFactory((Enum)DAGState.NEW).addTransition((Enum)DAGState.NEW, (Enum)DAGState.NEW, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.NEW, EnumSet.of(DAGState.NEW, new DAGState[]{DAGState.INITED, DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED, DAGState.ERROR, DAGState.TERMINATING}), (Enum)DAGEventType.DAG_RECOVER, (MultipleArcTransition)new RecoverTransition()).addTransition((Enum)DAGState.NEW, (Enum)DAGState.NEW, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.NEW, EnumSet.of(DAGState.INITED, DAGState.FAILED), (Enum)DAGEventType.DAG_INIT, (MultipleArcTransition)new InitTransition()).addTransition((Enum)DAGState.NEW, (Enum)DAGState.KILLED, (Enum)DAGEventType.DAG_KILL, (SingleArcTransition)new KillNewJobTransition()).addTransition((Enum)DAGState.NEW, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.INITED, (Enum)DAGState.INITED, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.INITED, (Enum)DAGState.INITED, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.INITED, (Enum)DAGState.RUNNING, (Enum)DAGEventType.DAG_START, (SingleArcTransition)new StartTransition()).addTransition((Enum)DAGState.INITED, (Enum)DAGState.KILLED, (Enum)DAGEventType.DAG_KILL, (SingleArcTransition)new KillInitedJobTransition()).addTransition((Enum)DAGState.INITED, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.RUNNING, EnumSet.of(DAGState.RUNNING, DAGState.COMMITTING, DAGState.SUCCEEDED, DAGState.TERMINATING, DAGState.FAILED), (Enum)DAGEventType.DAG_VERTEX_COMPLETED, (MultipleArcTransition)new VertexCompletedTransition()).addTransition((Enum)DAGState.RUNNING, EnumSet.of(DAGState.RUNNING, DAGState.TERMINATING), (Enum)DAGEventType.DAG_VERTEX_RERUNNING, (MultipleArcTransition)new VertexReRunningTransition()).addTransition((Enum)DAGState.RUNNING, (Enum)DAGState.TERMINATING, (Enum)DAGEventType.DAG_KILL, (SingleArcTransition)new DAGKilledTransition()).addTransition((Enum)DAGState.RUNNING, (Enum)DAGState.RUNNING, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.RUNNING, (Enum)DAGState.RUNNING, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.RUNNING, (Enum)DAGState.RUNNING, (Enum)DAGEventType.DAG_SCHEDULER_UPDATE, (SingleArcTransition)DAG_SCHEDULER_UPDATE_TRANSITION).addTransition((Enum)DAGState.RUNNING, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.RUNNING, EnumSet.of(DAGState.RUNNING, DAGState.TERMINATING), (Enum)DAGEventType.DAG_COMMIT_COMPLETED, (MultipleArcTransition)new CommitCompletedWhileRunning()).addTransition((Enum)DAGState.COMMITTING, EnumSet.of(DAGState.COMMITTING, DAGState.TERMINATING, DAGState.FAILED, DAGState.SUCCEEDED), (Enum)DAGEventType.DAG_COMMIT_COMPLETED, (MultipleArcTransition)COMMIT_COMPLETED_TRANSITION).addTransition((Enum)DAGState.COMMITTING, (Enum)DAGState.TERMINATING, (Enum)DAGEventType.DAG_KILL, (SingleArcTransition)new DAGKilledWhileCommittingTransition()).addTransition((Enum)DAGState.COMMITTING, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.COMMITTING, (Enum)DAGState.COMMITTING, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.COMMITTING, (Enum)DAGState.COMMITTING, (Enum)DAGEventType.DAG_SCHEDULER_UPDATE, (SingleArcTransition)DAG_SCHEDULER_UPDATE_TRANSITION).addTransition((Enum)DAGState.COMMITTING, (Enum)DAGState.TERMINATING, (Enum)DAGEventType.DAG_VERTEX_RERUNNING, (SingleArcTransition)new VertexRerunWhileCommitting()).addTransition((Enum)DAGState.COMMITTING, (Enum)DAGState.COMMITTING, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.TERMINATING, EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED, DAGState.ERROR), (Enum)DAGEventType.DAG_VERTEX_COMPLETED, (MultipleArcTransition)new VertexCompletedTransition()).addTransition((Enum)DAGState.TERMINATING, (Enum)DAGState.TERMINATING, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.TERMINATING, (Enum)DAGState.TERMINATING, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.TERMINATING, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.TERMINATING, EnumSet.of(DAGState.TERMINATING, DAGState.FAILED, DAGState.KILLED, DAGState.ERROR), (Enum)DAGEventType.DAG_COMMIT_COMPLETED, (MultipleArcTransition)COMMIT_COMPLETED_TRANSITION).addTransition((Enum)DAGState.TERMINATING, (Enum)DAGState.TERMINATING, EnumSet.of(DAGEventType.DAG_KILL, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE)).addTransition((Enum)DAGState.SUCCEEDED, (Enum)DAGState.SUCCEEDED, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.SUCCEEDED, (Enum)DAGState.SUCCEEDED, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.SUCCEEDED, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.SUCCEEDED, (Enum)DAGState.SUCCEEDED, EnumSet.of(DAGEventType.DAG_KILL, DAGEventType.DAG_SCHEDULER_UPDATE, DAGEventType.DAG_VERTEX_COMPLETED)).addTransition((Enum)DAGState.FAILED, (Enum)DAGState.FAILED, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.FAILED, (Enum)DAGState.FAILED, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.FAILED, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.FAILED, (Enum)DAGState.FAILED, EnumSet.of(DAGEventType.DAG_KILL, DAGEventType.DAG_START, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE, DAGEventType.DAG_VERTEX_COMPLETED)).addTransition((Enum)DAGState.KILLED, (Enum)DAGState.KILLED, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.KILLED, (Enum)DAGState.KILLED, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.KILLED, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.KILLED, (Enum)DAGState.KILLED, EnumSet.of(DAGEventType.DAG_KILL, DAGEventType.DAG_START, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE, DAGEventType.DAG_VERTEX_COMPLETED)).addTransition((Enum)DAGState.ERROR, (Enum)DAGState.ERROR, EnumSet.of(DAGEventType.DAG_KILL, new DAGEventType[]{DAGEventType.DAG_INIT, DAGEventType.DAG_START, DAGEventType.DAG_VERTEX_COMPLETED, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE, DAGEventType.DAG_DIAGNOSTIC_UPDATE, DAGEventType.INTERNAL_ERROR, DAGEventType.DAG_COUNTER_UPDATE})).addTransition((Enum)DAGState.ERROR, (Enum)DAGState.ERROR, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).installTopology();
    private final StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl> stateMachine;
    @VisibleForTesting
    int numCompletedVertices = 0;
    private int numVertices;
    private int numSuccessfulVertices = 0;
    private int numFailedVertices = 0;
    private int numKilledVertices = 0;
    private boolean isUber = false;
    private DAGTerminationCause terminationCause;
    private Credentials credentials;
    @VisibleForTesting
    long initTime;
    @VisibleForTesting
    long startTime;
    @VisibleForTesting
    long finishTime;
    Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
    Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
    private DAGState recoveredState = DAGState.NEW;
    @VisibleForTesting
    boolean recoveryCommitInProgress = false;
    Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
    LinkedHashMap<String, Vertex> vertexMap = new LinkedHashMap();

    public DAGImpl(TezDAGID dagId, Configuration amConf, DAGProtos.DAGPlan jobPlan, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Credentials dagCredentials, Clock clock, String appUserName, TaskHeartbeatHandler thh, AppContext appContext) {
        this.dagId = dagId;
        this.jobPlan = jobPlan;
        this.dagConf = new Configuration(amConf);
        for (DAGProtos.PlanKeyValuePair keyValPair : jobPlan.getDagConf().getConfKeyValuesList()) {
            TezConfiguration.validateProperty((String)keyValPair.getKey(), (Scope)Scope.DAG);
            this.dagConf.set(keyValPair.getKey(), keyValPair.getValue());
        }
        this.dagName = jobPlan.getName() != null ? jobPlan.getName() : "<missing app name>";
        this.userName = appUserName;
        this.clock = clock;
        this.appContext = appContext;
        this.taskAttemptListener = taskAttemptListener;
        this.taskHeartbeatHandler = thh;
        this.eventHandler = eventHandler;
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
        this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan((List)jobPlan.getLocalResourceList());
        this.credentials = dagCredentials;
        if (this.credentials == null) {
            try {
                this.dagUGI = UserGroupInformation.getCurrentUser();
            }
            catch (IOException e) {
                throw new TezUncheckedException("Failed to set UGI for dag based on currentUser", (Throwable)e);
            }
        } else {
            this.dagUGI = UserGroupInformation.createRemoteUser((String)this.userName);
            this.dagUGI.addCredentials(this.credentials);
        }
        this.aclManager = new ACLManager(appContext.getAMACLManager(), this.dagUGI.getShortUserName(), this.dagConf);
        this.startDAGCpuTime = appContext.getCumulativeCPUTime();
        this.startDAGGCTime = appContext.getCumulativeGCTime();
        this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(this.dagConf);
        this.stateMachine = new StateMachineTez(stateMachineFactory.make((Object)this), this);
        this.augmentStateMachine();
        this.entityUpdateTracker = new StateChangeNotifier(this);
    }

    private void augmentStateMachine() {
        this.stateMachine.registerStateEnteredCallback(DAGState.SUCCEEDED, STATE_CHANGED_CALLBACK).registerStateEnteredCallback(DAGState.FAILED, STATE_CHANGED_CALLBACK).registerStateEnteredCallback(DAGState.KILLED, STATE_CHANGED_CALLBACK).registerStateEnteredCallback(DAGState.ERROR, STATE_CHANGED_CALLBACK);
    }

    protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() {
        return this.stateMachine;
    }

    @Override
    public TezDAGID getID() {
        return this.dagId;
    }

    @Override
    public Map<String, LocalResource> getLocalResources() {
        return this.localResources;
    }

    @Override
    public Configuration getConf() {
        return this.dagConf;
    }

    @Override
    public DAGProtos.DAGPlan getJobPlan() {
        return this.jobPlan;
    }

    @Override
    public EventHandler getEventHandler() {
        return this.eventHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Vertex getVertex(TezVertexID vertexID) {
        this.readLock.lock();
        try {
            Vertex vertex = this.vertices.get(vertexID);
            return vertex;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public boolean isUber() {
        return this.isUber;
    }

    @Override
    public Credentials getCredentials() {
        return this.credentials;
    }

    @Override
    public UserGroupInformation getDagUGI() {
        return this.dagUGI;
    }

    @Override
    public DAGState restoreFromEvent(HistoryEvent historyEvent) {
        this.writeLock.lock();
        try {
            switch (historyEvent.getEventType()) {
                case DAG_INITIALIZED: {
                    this.recoveredState = this.initializeDAG((DAGInitializedEvent)historyEvent);
                    this.recoveryInitEventSeen = true;
                    DAGState dAGState = this.recoveredState;
                    return dAGState;
                }
                case DAG_STARTED: {
                    if (!this.recoveryInitEventSeen) {
                        throw new RuntimeException("Started Event seen but no Init Event was encountered earlier");
                    }
                    this.recoveryStartEventSeen = true;
                    this.startTime = ((DAGStartedEvent)historyEvent).getStartTime();
                    DAGState dAGState = this.recoveredState = DAGState.RUNNING;
                    return dAGState;
                }
                case DAG_COMMIT_STARTED: {
                    this.recoveryCommitInProgress = true;
                    DAGState dAGState = this.recoveredState;
                    return dAGState;
                }
                case VERTEX_GROUP_COMMIT_STARTED: {
                    VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent = (VertexGroupCommitStartedEvent)historyEvent;
                    this.recoveredGroupCommits.put(vertexGroupCommitStartedEvent.getVertexGroupName(), false);
                    DAGState dAGState = this.recoveredState;
                    return dAGState;
                }
                case VERTEX_GROUP_COMMIT_FINISHED: {
                    VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent = (VertexGroupCommitFinishedEvent)historyEvent;
                    this.recoveredGroupCommits.put(vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
                    DAGState dAGState = this.recoveredState;
                    return dAGState;
                }
                case DAG_KILL_REQUEST: {
                    this.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
                    DAGState dAGState = this.recoveredState = DAGState.KILLED;
                    return dAGState;
                }
                case DAG_FINISHED: {
                    this.recoveryCommitInProgress = false;
                    DAGFinishedEvent finishedEvent = (DAGFinishedEvent)historyEvent;
                    this.setFinishTime(finishedEvent.getFinishTime());
                    this.recoveredState = finishedEvent.getState();
                    this.fullCounters = finishedEvent.getTezCounters();
                    DAGState dAGState = this.recoveredState;
                    return dAGState;
                }
            }
            throw new RuntimeException("Unexpected event received for restoring state, eventType=" + (Object)((Object)historyEvent.getEventType()));
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public ACLManager getACLManager() {
        return this.aclManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, TezVertexID> getVertexNameIDMapping() {
        this.readLock.lock();
        try {
            HashMap<String, TezVertexID> idNameMap = new HashMap<String, TezVertexID>();
            for (Vertex v : this.getVertices().values()) {
                idNameMap.put(v.getName(), v.getVertexId());
            }
            HashMap<String, TezVertexID> hashMap = idNameMap;
            return hashMap;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TezCounters getAllCounters() {
        this.readLock.lock();
        try {
            DAGState state = this.getInternalState();
            if (state == DAGState.ERROR || state == DAGState.FAILED || state == DAGState.KILLED || state == DAGState.SUCCEEDED) {
                this.mayBeConstructFinalFullCounters();
                TezCounters tezCounters = this.fullCounters;
                return tezCounters;
            }
            this.updateCpuCounters();
            TezCounters counters = new TezCounters();
            counters.incrAllCounters((AbstractCounters)this.dagCounters);
            TezCounters tezCounters = DAGImpl.incrTaskCounters(counters, this.vertices.values());
            return tezCounters;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TezCounters getCachedCounters() {
        this.readLock.lock();
        try {
            if (this.fullCounters == null && this.cachedCounters != null && this.cachedCountersTimestamp + 10000L > System.currentTimeMillis()) {
                LOG.info("Asked for counters, cachedCountersTimestamp=" + this.cachedCountersTimestamp + ", currentTime=" + System.currentTimeMillis());
                TezCounters tezCounters = this.cachedCounters;
                return tezCounters;
            }
            this.cachedCountersTimestamp = System.currentTimeMillis();
            if (this.inTerminalState()) {
                this.mayBeConstructFinalFullCounters();
                TezCounters tezCounters = this.fullCounters;
                return tezCounters;
            }
            this.updateCpuCounters();
            TezCounters counters = new TezCounters();
            counters.incrAllCounters((AbstractCounters)this.dagCounters);
            TezCounters tezCounters = DAGImpl.incrTaskCounters(counters, this.vertices.values());
            return tezCounters;
        }
        finally {
            this.readLock.unlock();
        }
    }

    boolean inTerminalState() {
        DAGState state = this.getInternalState();
        return state == DAGState.ERROR || state == DAGState.FAILED || state == DAGState.KILLED || state == DAGState.SUCCEEDED;
    }

    public static TezCounters incrTaskCounters(TezCounters counters, Collection<Vertex> vertices) {
        for (Vertex vertex : vertices) {
            counters.incrAllCounters((AbstractCounters)vertex.getAllCounters());
        }
        return counters;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<String> getDiagnostics() {
        this.readLock.lock();
        try {
            List<String> list = this.diagnostics;
            return list;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DAGReport getReport() {
        this.readLock.lock();
        try {
            DAGReport dAGReport;
            StringBuilder diagsb = new StringBuilder();
            for (String s : this.getDiagnostics()) {
                diagsb.append(s).append("\n");
            }
            if (this.getInternalState() == DAGState.NEW) {
                dAGReport = TezBuilderUtils.newDAGReport();
                return dAGReport;
            }
            dAGReport = TezBuilderUtils.newDAGReport();
            return dAGReport;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public float getProgress() {
        this.readLock.lock();
        try {
            float progress = 0.0f;
            for (Vertex v : this.getVertices().values()) {
                progress += v.getProgress();
            }
            float f = progress / (float)this.getTotalVertices();
            return f;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public float getCompletedTaskProgress() {
        this.readLock.lock();
        try {
            int totalTasks = 0;
            int completedTasks = 0;
            for (Vertex v : this.getVertices().values()) {
                int vTotalTasks = v.getTotalTasks();
                int vCompletedTasks = v.getSucceededTasks();
                if (vTotalTasks <= 0) continue;
                totalTasks += vTotalTasks;
                completedTasks += vCompletedTasks;
            }
            if (totalTasks == 0) {
                DAGState state = (DAGState)this.getStateMachine().getCurrentState();
                if (state == DAGState.ERROR || state == DAGState.FAILED || state == DAGState.KILLED || state == DAGState.SUCCEEDED) {
                    float f = 1.0f;
                    return f;
                }
                float f = 0.0f;
                return f;
            }
            float f = (float)completedTasks / (float)totalTasks;
            return f;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<TezVertexID, Vertex> getVertices() {
        Object object = this.tasksSyncHandle;
        synchronized (object) {
            return Collections.unmodifiableMap(this.vertices);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DAGState getState() {
        this.readLock.lock();
        try {
            DAGState dAGState = (DAGState)this.getStateMachine().getCurrentState();
            return dAGState;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions) {
        DAGStatusBuilder status = new DAGStatusBuilder();
        int totalTaskCount = 0;
        int totalSucceededTaskCount = 0;
        int totalRunningTaskCount = 0;
        int totalFailedTaskCount = 0;
        int totalKilledTaskCount = 0;
        int totalFailedTaskAttemptCount = 0;
        int totalKilledTaskAttemptCount = 0;
        this.readLock.lock();
        try {
            for (Map.Entry<String, Vertex> entry : this.vertexMap.entrySet()) {
                ProgressBuilder progress = entry.getValue().getVertexProgress();
                status.addVertexProgress(entry.getKey(), progress);
                totalTaskCount += progress.getTotalTaskCount();
                totalSucceededTaskCount += progress.getSucceededTaskCount();
                totalRunningTaskCount += progress.getRunningTaskCount();
                totalFailedTaskCount += progress.getFailedTaskCount();
                totalKilledTaskCount += progress.getKilledTaskCount();
                totalFailedTaskAttemptCount += progress.getFailedTaskAttemptCount();
                totalKilledTaskAttemptCount += progress.getKilledTaskAttemptCount();
            }
            ProgressBuilder dagProgress = new ProgressBuilder();
            dagProgress.setTotalTaskCount(totalTaskCount);
            dagProgress.setSucceededTaskCount(totalSucceededTaskCount);
            dagProgress.setRunningTaskCount(totalRunningTaskCount);
            dagProgress.setFailedTaskCount(totalFailedTaskCount);
            dagProgress.setKilledTaskCount(totalKilledTaskCount);
            dagProgress.setFailedTaskAttemptCount(totalFailedTaskAttemptCount);
            dagProgress.setKilledTaskAttemptCount(totalKilledTaskAttemptCount);
            status.setState(this.getState());
            status.setDiagnostics(this.diagnostics);
            status.setDAGProgress(dagProgress);
            if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
                status.setDAGCounters(this.getAllCounters());
            }
            DAGStatusBuilder dAGStatusBuilder = status;
            return dAGStatusBuilder;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions, long timeoutMillis) throws TezException {
        long timeoutNanos = timeoutMillis * 1000L * 1000L;
        if (timeoutMillis < 0L) {
            timeoutNanos = Long.MAX_VALUE;
        }
        if (timeoutMillis == 0L || this.isComplete()) {
            return this.getDAGStatus(statusOptions);
        }
        while (true) {
            long nanosLeft;
            this.dagStatusLock.lock();
            try {
                if (this.isFinalState.get()) break;
                nanosLeft = this.dagCompletionCondition.awaitNanos(timeoutNanos);
            }
            catch (InterruptedException e) {
                throw new TezException("Interrupted while waiting for dag to complete", (Throwable)e);
            }
            finally {
                this.dagStatusLock.unlock();
            }
            if (nanosLeft <= 0L) break;
            timeoutNanos = nanosLeft;
        }
        return this.getDAGStatus(statusOptions);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ProgressBuilder getDAGProgress() {
        int totalTaskCount = 0;
        int totalSucceededTaskCount = 0;
        int totalRunningTaskCount = 0;
        int totalFailedTaskCount = 0;
        int totalKilledTaskCount = 0;
        int totalFailedTaskAttemptCount = 0;
        int totalKilledTaskAttemptCount = 0;
        this.readLock.lock();
        try {
            for (Map.Entry<String, Vertex> entry : this.vertexMap.entrySet()) {
                ProgressBuilder progress = entry.getValue().getVertexProgress();
                totalTaskCount += progress.getTotalTaskCount();
                totalSucceededTaskCount += progress.getSucceededTaskCount();
                totalRunningTaskCount += progress.getRunningTaskCount();
                totalFailedTaskCount += progress.getFailedTaskCount();
                totalKilledTaskCount += progress.getKilledTaskCount();
                totalFailedTaskAttemptCount += progress.getFailedTaskAttemptCount();
                totalKilledTaskAttemptCount += progress.getKilledTaskAttemptCount();
            }
            ProgressBuilder dagProgress = new ProgressBuilder();
            dagProgress.setTotalTaskCount(totalTaskCount);
            dagProgress.setSucceededTaskCount(totalSucceededTaskCount);
            dagProgress.setRunningTaskCount(totalRunningTaskCount);
            dagProgress.setFailedTaskCount(totalFailedTaskCount);
            dagProgress.setKilledTaskCount(totalKilledTaskCount);
            dagProgress.setFailedTaskAttemptCount(totalFailedTaskAttemptCount);
            dagProgress.setKilledTaskAttemptCount(totalKilledTaskAttemptCount);
            ProgressBuilder progressBuilder = dagProgress;
            return progressBuilder;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public VertexStatusBuilder getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) {
        Vertex vertex = this.vertexMap.get(vertexName);
        if (vertex == null) {
            return null;
        }
        return vertex.getVertexStatus(statusOptions);
    }

    public TaskAttemptImpl getTaskAttempt(TezTaskAttemptID taId) {
        return (TaskAttemptImpl)this.getVertex(taId.getTaskID().getVertexID()).getTask(taId.getTaskID()).getAttempt(taId);
    }

    public TaskImpl getTask(TezTaskID tId) {
        return (TaskImpl)this.getVertex(tId.getVertexID()).getTask(tId);
    }

    protected void initializeVerticesAndStart() {
        for (Vertex v : this.vertices.values()) {
            if (v.getInputVerticesCount() != 0) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Initing root vertex " + v.getLogIdentifier());
            }
            this.eventHandler.handle((Event)new VertexEvent(v.getVertexId(), VertexEventType.V_INIT));
        }
        for (Vertex v : this.vertices.values()) {
            if (v.getInputVerticesCount() != 0) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Starting root vertex " + v.getLogIdentifier());
            }
            this.eventHandler.handle((Event)new VertexEvent(v.getVertexId(), VertexEventType.V_START));
        }
    }

    private void commitOutput(OutputCommitter outputCommitter) throws Exception {
        final OutputCommitter committer = outputCommitter;
        this.getDagUGI().doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

            @Override
            public Void run() throws Exception {
                committer.commitOutput();
                return null;
            }
        });
    }

    private synchronized DAGState commitOrFinish() {
        HashMap<OutputKey, CallableEvent> commitEvents = new HashMap<OutputKey, CallableEvent>();
        for (VertexGroupInfo vertexGroupInfo : this.vertexGroups.values()) {
            if (vertexGroupInfo.outputs.isEmpty()) continue;
            vertexGroupInfo.commitStarted = true;
            final Vertex v = this.getVertex(vertexGroupInfo.groupMembers.iterator().next());
            for (final String outputName : vertexGroupInfo.outputs) {
                final OutputKey outputKey = new OutputKey(outputName, vertexGroupInfo.groupName, true);
                CommitCallback groupCommitCallback = new CommitCallback(outputKey);
                CallableEvent groupCommitCallableEvent = new CallableEvent(groupCommitCallback){

                    @Override
                    public Void call() throws Exception {
                        OutputCommitter committer = v.getOutputCommitters().get(outputName);
                        LOG.info("Committing output: " + outputKey);
                        DAGImpl.this.commitOutput(committer);
                        return null;
                    }
                };
                commitEvents.put(outputKey, groupCommitCallableEvent);
            }
        }
        for (final Vertex vertex : this.vertices.values()) {
            if (vertex.getOutputCommitters() == null) {
                LOG.info("No output committers for vertex: " + vertex.getLogIdentifier());
                continue;
            }
            HashMap<String, OutputCommitter> outputCommitters = new HashMap<String, OutputCommitter>(vertex.getOutputCommitters());
            Set<String> sharedOutputs = vertex.getSharedOutputs();
            if (sharedOutputs != null) {
                Iterator iter = outputCommitters.entrySet().iterator();
                while (iter.hasNext()) {
                    if (!sharedOutputs.contains(iter.next().getKey())) continue;
                    iter.remove();
                }
            }
            if (outputCommitters.isEmpty()) {
                LOG.info("No exclusive output committers for vertex: " + vertex.getLogIdentifier());
                continue;
            }
            for (final Map.Entry entry : outputCommitters.entrySet()) {
                if (vertex.getState() != VertexState.SUCCEEDED) {
                    throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() + " not in SUCCEEDED state. State= " + (Object)((Object)vertex.getState()));
                }
                OutputKey outputKey = new OutputKey((String)entry.getKey(), vertex.getName(), false);
                CommitCallback commitCallback = new CommitCallback(outputKey);
                CallableEvent commitCallableEvent = new CallableEvent(commitCallback){

                    @Override
                    public Void call() throws Exception {
                        LOG.info("Committing output: " + (String)entry.getKey() + " for vertex: " + vertex.getLogIdentifier() + ", outputName: " + (String)entry.getKey());
                        DAGImpl.this.commitOutput((OutputCommitter)entry.getValue());
                        return null;
                    }
                };
                commitEvents.put(outputKey, commitCallableEvent);
            }
        }
        if (!commitEvents.isEmpty()) {
            try {
                LOG.info("Start writing dag commit event, " + this.getID());
                this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.getID(), new DAGCommitStartedEvent(this.getID(), this.clock.getTime())));
            }
            catch (IOException e) {
                LOG.error("Failed to send commit event to history/recovery handler", (Throwable)e);
                this.trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE);
                return this.finished(DAGState.FAILED);
            }
            for (Map.Entry entry : commitEvents.entrySet()) {
                ListenableFuture commitFuture = this.appContext.getExecService().submit((Callable)entry.getValue());
                Futures.addCallback((ListenableFuture)commitFuture, ((CallableEvent)entry.getValue()).getCallback());
                this.commitFutures.put((OutputKey)entry.getKey(), (ListenableFuture<Void>)commitFuture);
            }
        }
        if (this.commitFutures.isEmpty()) {
            return this.finished(DAGState.SUCCEEDED);
        }
        return DAGState.COMMITTING;
    }

    private void abortOutputs() {
        if (this.aborted.getAndSet(true)) {
            LOG.info("Ignoring multiple output abort");
            return;
        }
        for (Vertex vertex : this.vertices.values()) {
            ((VertexImpl)vertex).abortVertex(VertexStatus.State.FAILED);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handle(DAGEvent event) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processing DAGEvent " + event.getDAGId() + " of type " + event.getType() + " while in state " + (Object)((Object)this.getInternalState()) + ". Event: " + (Object)((Object)event));
        }
        try {
            DAGState oldState;
            block8: {
                this.writeLock.lock();
                oldState = this.getInternalState();
                try {
                    this.getStateMachine().doTransition(event.getType(), (Object)event);
                }
                catch (InvalidStateTransitonException e) {
                    String message = "Invalid event " + event.getType() + " on Dag " + this.dagId + " at currentState=" + (Object)((Object)oldState);
                    LOG.error("Can't handle " + message, (Throwable)e);
                    this.addDiagnostic(message);
                    this.eventHandler.handle((Event)new DAGEvent(this.dagId, DAGEventType.INTERNAL_ERROR));
                }
                catch (RuntimeException e) {
                    String message = "Uncaught Exception when handling event " + event.getType() + " on Dag " + this.dagId + " at currentState=" + (Object)((Object)oldState);
                    LOG.error(message, (Throwable)e);
                    this.addDiagnostic(message);
                    if (this.internalErrorTriggered.getAndSet(true)) break block8;
                    this.eventHandler.handle((Event)new DAGEvent(this.dagId, DAGEventType.INTERNAL_ERROR));
                }
            }
            if (oldState != this.getInternalState()) {
                LOG.info(this.dagId + " transitioned from " + (Object)((Object)oldState) + " to " + (Object)((Object)this.getInternalState()) + " due to event " + event.getType());
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @InterfaceAudience.Private
    public DAGState getInternalState() {
        this.readLock.lock();
        try {
            DAGState dAGState = (DAGState)this.getStateMachine().getCurrentState();
            return dAGState;
        }
        finally {
            this.readLock.unlock();
        }
    }

    void setFinishTime() {
        this.finishTime = this.clock.getTime();
    }

    synchronized void setFinishTime(long finishTime) {
        this.finishTime = finishTime;
    }

    private Map<String, Integer> constructTaskStats(ProgressBuilder progressBuilder) {
        HashMap<String, Integer> taskStats = new HashMap<String, Integer>();
        taskStats.put("numCompletedTasks", progressBuilder.getTotalTaskCount());
        taskStats.put("numSucceededTasks", progressBuilder.getSucceededTaskCount());
        taskStats.put("numFailedTasks", progressBuilder.getFailedTaskCount());
        taskStats.put("numKilledTasks", progressBuilder.getKilledTaskCount());
        taskStats.put("numFailedTaskAttempts", progressBuilder.getFailedTaskAttemptCount());
        taskStats.put("numKilledTaskAttempts", progressBuilder.getKilledTaskAttemptCount());
        return taskStats;
    }

    void logJobHistoryFinishedEvent() throws IOException {
        this.setFinishTime();
        Map<String, Integer> taskStats = this.constructTaskStats(this.getDAGProgress());
        DAGFinishedEvent finishEvt = new DAGFinishedEvent(this.dagId, this.startTime, this.finishTime, DAGState.SUCCEEDED, "", this.getAllCounters(), this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId());
        this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.dagId, finishEvt));
    }

    void logJobHistoryInitedEvent() {
        DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId, this.initTime, this.userName, this.dagName, this.getVertexNameIDMapping());
        this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.dagId, initEvt));
    }

    void logJobHistoryStartedEvent() {
        DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId, this.startTime, this.userName, this.dagName);
        this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.dagId, startEvt));
    }

    void logJobHistoryFinishedEvent(TezCounters counters) throws IOException {
        Map<String, Integer> taskStats = this.constructTaskStats(this.getDAGProgress());
        if (this.finishTime < this.startTime) {
            LOG.warn("DAG finish time is smaller than start time. startTime=" + this.startTime + ", finishTime=" + this.finishTime);
        }
        DAGFinishedEvent finishEvt = new DAGFinishedEvent(this.dagId, this.startTime, this.finishTime, DAGState.SUCCEEDED, "", counters, this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId());
        this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.dagId, finishEvt));
    }

    void logJobHistoryUnsuccesfulEvent(DAGState state, TezCounters counters) throws IOException {
        Map<String, Integer> taskStats = this.constructTaskStats(this.getDAGProgress());
        if (this.finishTime < this.startTime) {
            LOG.warn("DAG finish time is smaller than start time. startTime=" + this.startTime + ", finishTime=" + this.finishTime);
        }
        DAGFinishedEvent finishEvt = new DAGFinishedEvent(this.dagId, this.startTime, this.clock.getTime(), state, StringUtils.join(this.getDiagnostics(), (String)LINE_SEPARATOR), counters, this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId());
        this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.dagId, finishEvt));
    }

    static DAGState checkVerticesForCompletion(DAGImpl dag) {
        LOG.info("Checking vertices for DAG completion, numCompletedVertices=" + dag.numCompletedVertices + ", numSuccessfulVertices=" + dag.numSuccessfulVertices + ", numFailedVertices=" + dag.numFailedVertices + ", numKilledVertices=" + dag.numKilledVertices + ", numVertices=" + dag.numVertices + ", commitInProgress=" + dag.commitFutures.size() + ", terminationCause=" + (Object)((Object)dag.terminationCause));
        if (dag.numCompletedVertices > dag.numVertices) {
            LOG.error("vertex completion accounting issue: numCompletedVertices > numVertices, numCompletedVertices=" + dag.numCompletedVertices + ", numVertices=" + dag.numVertices);
        }
        if (dag.numCompletedVertices == dag.numVertices) {
            if (dag.numSuccessfulVertices == dag.numVertices && dag.isCommittable()) {
                if (dag.commitAllOutputsOnSuccess && !dag.committed.getAndSet(true)) {
                    return dag.commitOrFinish();
                }
                if (!dag.commitFutures.isEmpty()) {
                    return DAGState.COMMITTING;
                }
                return dag.finished(DAGState.SUCCEEDED);
            }
            if (dag.commitFutures.isEmpty()) {
                return DAGImpl.finishWithTerminationCause(dag);
            }
            return DAGState.TERMINATING;
        }
        return dag.getInternalState();
    }

    private boolean isCommittable() {
        return this.terminationCause == null && (this.getState() == DAGState.RUNNING || this.getState() == DAGState.COMMITTING);
    }

    static DAGState checkCommitsForCompletion(DAGImpl dag) {
        LOG.info("Checking commits for DAG completion, numCompletedVertices=" + dag.numCompletedVertices + ", numSuccessfulVertices=" + dag.numSuccessfulVertices + ", numFailedVertices=" + dag.numFailedVertices + ", numKilledVertices=" + dag.numKilledVertices + ", numVertices=" + dag.numVertices + ", commitInProgress=" + dag.commitFutures.size() + ", terminationCause=" + (Object)((Object)dag.terminationCause));
        if (dag.isCommittable()) {
            Preconditions.checkState((dag.getState() == DAGState.COMMITTING ? 1 : 0) != 0, (Object)("DAG should be in COMMITTING state, but in " + (Object)((Object)dag.getState())));
            if (!dag.commitFutures.isEmpty()) {
                return DAGState.COMMITTING;
            }
            return dag.finished(DAGState.SUCCEEDED);
        }
        Preconditions.checkState((dag.getState() == DAGState.TERMINATING || dag.getState() == DAGState.COMMITTING ? 1 : 0) != 0, (Object)("DAG should be in COMMITTING/TERMINATING state, but in " + (Object)((Object)dag.getState())));
        if (!dag.commitFutures.isEmpty() || dag.numCompletedVertices != dag.numVertices) {
            return DAGState.TERMINATING;
        }
        return DAGImpl.finishWithTerminationCause(dag);
    }

    private static DAGState finishWithTerminationCause(DAGImpl dag) {
        Preconditions.checkArgument((dag.getTerminationCause() != null ? 1 : 0) != 0, (Object)"TerminationCause is not set.");
        String diagnosticMsg = "DAG did not succeed due to " + (Object)((Object)dag.terminationCause) + ". failedVertices:" + dag.numFailedVertices + " killedVertices:" + dag.numKilledVertices;
        LOG.info(diagnosticMsg);
        dag.addDiagnostic(diagnosticMsg);
        return dag.finished(dag.getTerminationCause().getFinishedState());
    }

    private void updateCpuCounters() {
        long stopDAGCpuTime = this.appContext.getCumulativeCPUTime();
        long totalDAGCpuTime = stopDAGCpuTime - this.startDAGCpuTime;
        long stopDAGGCTime = this.appContext.getCumulativeGCTime();
        long totalDAGGCTime = stopDAGGCTime - this.startDAGGCTime;
        this.dagCounters.findCounter((Enum)DAGCounter.AM_CPU_MILLISECONDS).setValue(totalDAGCpuTime);
        this.dagCounters.findCounter((Enum)DAGCounter.AM_GC_TIME_MILLIS).setValue(totalDAGGCTime);
    }

    private DAGState finished(DAGState finalState) {
        if (this.finishTime == 0L) {
            this.setFinishTime();
        }
        this.entityUpdateTracker.stop();
        boolean recoveryError = false;
        this.updateCpuCounters();
        TezCounters counters = null;
        try {
            counters = this.getAllCounters();
        }
        catch (LimitExceededException e) {
            this.addDiagnostic("Counters limit exceeded: " + e.getMessage());
            finalState = DAGState.FAILED;
        }
        try {
            if (finalState == DAGState.SUCCEEDED) {
                this.logJobHistoryFinishedEvent(counters);
            } else {
                this.logJobHistoryUnsuccesfulEvent(finalState, counters);
            }
        }
        catch (IOException e) {
            LOG.warn("Failed to persist recovery event for DAG completion, dagId=" + this.dagId + ", finalState=" + (Object)((Object)finalState));
            recoveryError = true;
        }
        if (finalState != DAGState.SUCCEEDED) {
            this.abortOutputs();
        }
        if (recoveryError) {
            this.eventHandler.handle((Event)new DAGAppMasterEventDAGFinished(this.getID(), DAGState.ERROR));
        } else {
            this.eventHandler.handle((Event)new DAGAppMasterEventDAGFinished(this.getID(), finalState));
        }
        LOG.info("DAG: " + this.getID() + " finished with state: " + (Object)((Object)finalState));
        return finalState;
    }

    @Override
    public String getUserName() {
        return this.userName;
    }

    @Override
    public String getName() {
        return this.dagName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getTotalVertices() {
        this.readLock.lock();
        try {
            int n = this.numVertices;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getSuccessfulVertices() {
        this.readLock.lock();
        try {
            int n = this.numSuccessfulVertices;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    boolean trySetTerminationCause(DAGTerminationCause trigger) {
        if (this.terminationCause == null) {
            this.terminationCause = trigger;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    DAGTerminationCause getTerminationCause() {
        this.readLock.lock();
        try {
            DAGTerminationCause dAGTerminationCause = this.terminationCause;
            return dAGTerminationCause;
        }
        finally {
            this.readLock.unlock();
        }
    }

    public DAGState initializeDAG() {
        return this.initializeDAG(null);
    }

    DAGState initializeDAG(DAGInitializedEvent event) {
        this.initTime = event != null ? event.getInitTime() : this.clock.getTime();
        this.commitAllOutputsOnSuccess = this.dagConf.getBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.numVertices = this.getJobPlan().getVertexCount();
        if (this.numVertices == 0) {
            this.addDiagnostic("No vertices for dag");
            this.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
            if (event != null) {
                return DAGState.FAILED;
            }
            return this.finished(DAGState.FAILED);
        }
        if (this.jobPlan.getVertexGroupsCount() > 0) {
            for (DAGProtos.PlanVertexGroupInfo planVertexGroupInfo : this.jobPlan.getVertexGroupsList()) {
                this.vertexGroups.put(planVertexGroupInfo.getGroupName(), new VertexGroupInfo(planVertexGroupInfo));
            }
            for (VertexGroupInfo vertexGroupInfo : this.vertexGroups.values()) {
                for (String vertexName : vertexGroupInfo.groupMembers) {
                    LinkedList groupList = this.vertexGroupInfo.get(vertexName);
                    if (groupList == null) {
                        groupList = Lists.newLinkedList();
                        this.vertexGroupInfo.put(vertexName, groupList);
                    }
                    groupList.add(vertexGroupInfo);
                }
            }
        }
        for (int i = 0; i < this.numVertices; ++i) {
            String string = this.getJobPlan().getVertex(i).getName();
            VertexImpl vertexImpl = DAGImpl.createVertex(this, string, i);
            this.addVertex(vertexImpl);
        }
        if (!this.appContext.isLocal()) {
            for (Vertex vertex : this.vertexMap.values()) {
                if (vertex.getTaskResource().compareTo((Object)this.appContext.getClusterInfo().getMaxContainerCapability()) <= 0) continue;
                String string = "Vertex's TaskResource is beyond the cluster container capability,Vertex=" + vertex.getLogIdentifier() + ", Requested TaskResource=" + vertex.getTaskResource() + ", Cluster MaxContainerCapability=" + this.appContext.getClusterInfo().getMaxContainerCapability();
                LOG.error(string);
                this.addDiagnostic(string);
                this.finished(DAGState.FAILED);
                return DAGState.FAILED;
            }
        }
        try {
            this.createDAGEdges(this);
        }
        catch (TezException e2) {
            String string = "Fail to create edges, " + ExceptionUtils.getStackTrace((Throwable)e2);
            this.addDiagnostic(string);
            LOG.error(string);
            this.trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
            this.finished(DAGState.FAILED);
            return DAGState.FAILED;
        }
        Map edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan((List)this.getJobPlan().getEdgeList());
        for (Vertex vertex : this.vertices.values()) {
            DAGImpl.parseVertexEdges(this, edgePlans, vertex);
        }
        for (Edge edge : this.edges.values()) {
            try {
                edge.initialize();
            }
            catch (AMUserCodeException ex) {
                String msg3 = "Exception in " + (Object)((Object)ex.getSource());
                LOG.error(msg3, (Throwable)((Object)ex));
                this.addDiagnostic(msg3 + ", " + ex.getMessage() + ", " + ExceptionUtils.getStackTrace((Throwable)ex.getCause()));
                this.finished(DAGState.FAILED);
                return DAGState.FAILED;
            }
        }
        try {
            DAGImpl.assignDAGScheduler(this);
        }
        catch (TezException tezException) {
            String string = "Fail to assign DAGScheduler for dag:" + this.dagName + " due to " + ExceptionUtils.getStackTrace((Throwable)tezException);
            LOG.error(string);
            this.addDiagnostic(string);
            this.trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
            this.finished(DAGState.FAILED);
            return DAGState.FAILED;
        }
        for (Map.Entry<String, VertexGroupInfo> entry : this.vertexGroups.entrySet()) {
            String groupName = entry.getKey();
            VertexGroupInfo groupInfo = entry.getValue();
            if (groupInfo.outputs.isEmpty()) continue;
            for (String vertexName : groupInfo.groupMembers) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Setting shared outputs for group: " + groupName + " on vertex: " + vertexName);
                }
                Vertex v = this.getVertex(vertexName);
                v.addSharedOutputs(groupInfo.outputs);
            }
        }
        return DAGState.INITED;
    }

    private void createDAGEdges(DAGImpl dag) throws TezException {
        for (DAGProtos.EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
            EdgeProperty edgeProperty = DagTypeConverters.createEdgePropertyMapFromDAGPlan((DAGProtos.EdgePlan)edgePlan);
            dag.edges.put(edgePlan.getId(), new Edge(edgeProperty, dag.getEventHandler(), this.dagConf));
        }
    }

    private static void assignDAGScheduler(DAGImpl dag) throws TezException {
        String dagSchedulerClassName = dag.dagConf.get("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrder");
        LOG.info("Using DAG Scheduler: " + dagSchedulerClassName);
        dag.dagScheduler = (DAGScheduler)ReflectionUtils.createClazzInstance((String)dagSchedulerClassName, (Class[])new Class[]{DAG.class, EventHandler.class}, (Object[])new Object[]{dag, dag.eventHandler});
        for (Vertex v : dag.vertices.values()) {
            dag.dagScheduler.addVertexConcurrencyLimit(v.getVertexId(), v.getMaxTaskConcurrency());
        }
    }

    private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
        TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
        DAGProtos.VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
        VertexLocationHint vertexLocationHint = DagTypeConverters.convertFromDAGPlan((List)vertexPlan.getTaskLocationHintList());
        VertexImpl v = new VertexImpl(vertexId, vertexPlan, vertexName, dag.dagConf, dag.eventHandler, dag.taskAttemptListener, dag.clock, dag.taskHeartbeatHandler, !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint, dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker);
        return v;
    }

    private static void parseVertexEdges(DAGImpl dag, Map<String, DAGProtos.EdgePlan> edgePlans, Vertex vertex) {
        Edge edge;
        DAGProtos.EdgePlan edgePlan;
        DAGProtos.VertexPlan vertexPlan = vertex.getVertexPlan();
        HashMap<Vertex, Edge> inVertices = new HashMap<Vertex, Edge>();
        HashMap<Vertex, Edge> outVertices = new HashMap<Vertex, Edge>();
        for (String inEdgeId : vertexPlan.getInEdgeIdList()) {
            edgePlan = edgePlans.get(inEdgeId);
            Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
            edge = dag.edges.get(inEdgeId);
            edge.setSourceVertex(inVertex);
            edge.setDestinationVertex(vertex);
            inVertices.put(inVertex, edge);
        }
        for (String outEdgeId : vertexPlan.getOutEdgeIdList()) {
            edgePlan = edgePlans.get(outEdgeId);
            Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
            edge = dag.edges.get(outEdgeId);
            edge.setSourceVertex(vertex);
            edge.setDestinationVertex(outVertex);
            outVertices.put(outVertex, edge);
        }
        vertex.setInputVertices(inVertices);
        vertex.setOutputVertices(outVertices);
    }

    void addVertex(Vertex v) {
        this.vertices.put(v.getVertexId(), v);
        this.vertexMap.put(v.getName(), v);
    }

    @Override
    public Vertex getVertex(String vertexName) {
        return this.vertexMap.get(vertexName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mayBeConstructFinalFullCounters() {
        Object object = this.fullCountersLock;
        synchronized (object) {
            if (this.fullCounters != null) {
                return;
            }
            this.constructFinalFullcounters();
        }
    }

    @InterfaceAudience.Private
    public void constructFinalFullcounters() {
        this.fullCounters = new TezCounters();
        this.fullCounters.incrAllCounters((AbstractCounters)this.dagCounters);
        for (Vertex v : this.vertices.values()) {
            this.fullCounters.incrAllCounters((AbstractCounters)v.getAllCounters());
        }
    }

    void enactKill(DAGTerminationCause dagTerminationCause, VertexTerminationCause vertexTerminationCause) {
        if (this.trySetTerminationCause(dagTerminationCause)) {
            for (Vertex v : this.vertices.values()) {
                this.eventHandler.handle((Event)new VertexEventTermination(v.getVertexId(), vertexTerminationCause));
            }
        }
    }

    private void cancelCommits() {
        if (!this.commitCanceled.getAndSet(true)) {
            for (Map.Entry<OutputKey, ListenableFuture<Void>> entry : this.commitFutures.entrySet()) {
                OutputKey outputKey = entry.getKey();
                LOG.info("Canceling commit of output=" + outputKey);
                entry.getValue().cancel(true);
            }
        }
    }

    private boolean vertexSucceeded(Vertex vertex) {
        List<VertexGroupInfo> groupsList;
        ++this.numSuccessfulVertices;
        boolean recoveryFailed = false;
        if (!this.commitAllOutputsOnSuccess && this.isCommittable() && (groupsList = this.vertexGroupInfo.get(vertex.getName())) != null) {
            ArrayList commitList = Lists.newArrayListWithCapacity((int)groupsList.size());
            for (VertexGroupInfo groupInfo : groupsList) {
                ++groupInfo.successfulMembers;
                if (groupInfo.groupMembers.size() != groupInfo.successfulMembers || groupInfo.outputs.isEmpty()) continue;
                LOG.info("All members of group: " + groupInfo.groupName + " are succeeded. Commiting outputs");
                commitList.add(groupInfo);
            }
            for (VertexGroupInfo groupInfo : commitList) {
                if (this.recoveredGroupCommits.containsKey(groupInfo.groupName)) {
                    LOG.info("VertexGroup was already committed as per recovery data, groupName=" + groupInfo.groupName);
                    continue;
                }
                groupInfo.commitStarted = true;
                final Vertex v = this.getVertex(groupInfo.groupMembers.iterator().next());
                try {
                    this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.getID(), new VertexGroupCommitStartedEvent(this.dagId, groupInfo.groupName, this.clock.getTime())));
                }
                catch (IOException e) {
                    LOG.error("Failed to send commit recovery event to handler", (Throwable)e);
                    recoveryFailed = true;
                }
                if (recoveryFailed) continue;
                for (final String outputName : groupInfo.outputs) {
                    OutputKey outputKey = new OutputKey(outputName, groupInfo.groupName, true);
                    CommitCallback groupCommitCallback = new CommitCallback(outputKey);
                    CallableEvent groupCommitCallableEvent = new CallableEvent(groupCommitCallback){

                        @Override
                        public Void call() throws Exception {
                            OutputCommitter committer = v.getOutputCommitters().get(outputName);
                            LOG.info("Committing output: " + outputName);
                            DAGImpl.this.commitOutput(committer);
                            return null;
                        }
                    };
                    ListenableFuture groupCommitFuture = this.appContext.getExecService().submit((Callable)groupCommitCallableEvent);
                    Futures.addCallback((ListenableFuture)groupCommitFuture, groupCommitCallableEvent.getCallback());
                    this.commitFutures.put(outputKey, (ListenableFuture<Void>)groupCommitFuture);
                }
            }
        }
        if (recoveryFailed) {
            LOG.info("Recovery failure occurred during commit");
            this.enactKill(DAGTerminationCause.RECOVERY_FAILURE, VertexTerminationCause.COMMIT_FAILURE);
        }
        return !recoveryFailed;
    }

    private boolean vertexReRunning(Vertex vertex) {
        List<VertexGroupInfo> groupList;
        this.reRunningVertices.add(vertex.getVertexId());
        --this.numSuccessfulVertices;
        this.addDiagnostic("Vertex re-running, vertexName=" + vertex.getName() + ", vertexId=" + vertex.getVertexId());
        if (!this.commitAllOutputsOnSuccess && (groupList = this.vertexGroupInfo.get(vertex.getName())) != null) {
            for (VertexGroupInfo groupInfo : groupList) {
                if (groupInfo.isInCommitting()) {
                    String msg = "Aborting job as committing vertex: " + vertex.getLogIdentifier() + " is re-running";
                    LOG.info(msg);
                    this.addDiagnostic(msg);
                    this.enactKill(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING);
                    return true;
                }
                if (groupInfo.isCommitted()) {
                    String msg = "Aborting job as committed vertex: " + vertex.getLogIdentifier() + " is re-running";
                    LOG.info(msg);
                    this.addDiagnostic(msg);
                    this.enactKill(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT);
                    return true;
                }
                --groupInfo.successfulMembers;
            }
        }
        return false;
    }

    private void vertexFailed(Vertex vertex) {
        ++this.numFailedVertices;
        this.addDiagnostic("Vertex failed, vertexName=" + vertex.getName() + ", vertexId=" + vertex.getVertexId() + ", diagnostics=" + vertex.getDiagnostics());
    }

    private void vertexKilled(Vertex vertex) {
        ++this.numKilledVertices;
        this.addDiagnostic("Vertex killed, vertexName=" + vertex.getName() + ", vertexId=" + vertex.getVertexId() + ", diagnostics=" + vertex.getDiagnostics());
    }

    private void addDiagnostic(String diag) {
        this.diagnostics.add(diag);
    }

    private boolean commitCompleted(DAGEventCommitCompleted commitCompletedEvent) {
        Preconditions.checkState((this.commitFutures.remove(commitCompletedEvent.getOutputKey()) != null ? 1 : 0) != 0, (Object)("Unknown commit:" + commitCompletedEvent.getOutputKey()));
        boolean commitFailed = false;
        boolean recoveryFailed = false;
        if (commitCompletedEvent.isSucceeded()) {
            LOG.info("Commit succeeded for output:" + commitCompletedEvent.getOutputKey());
            OutputKey outputKey = commitCompletedEvent.getOutputKey();
            if (outputKey.isVertexGroupOutput) {
                VertexGroupInfo vertexGroup = this.vertexGroups.get(outputKey.getEntityName());
                ++vertexGroup.successfulCommits;
                if (vertexGroup.isCommitted() && !this.commitAllOutputsOnSuccess) {
                    try {
                        this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.getID(), new VertexGroupCommitFinishedEvent(this.getID(), commitCompletedEvent.getOutputKey().getEntityName(), this.clock.getTime())));
                    }
                    catch (IOException e) {
                        String diag = "Failed to send commit recovery event to handler, " + ExceptionUtils.getStackTrace((Throwable)e);
                        this.addDiagnostic(diag);
                        LOG.error(diag);
                        recoveryFailed = true;
                    }
                }
            }
        } else {
            String diag = "Commit failed for output: " + commitCompletedEvent.getOutputKey() + ", " + ExceptionUtils.getStackTrace((Throwable)commitCompletedEvent.getException());
            this.addDiagnostic(diag);
            LOG.error(diag);
            commitFailed = true;
        }
        if (commitFailed) {
            this.enactKill(DAGTerminationCause.COMMIT_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
            this.cancelCommits();
        }
        if (recoveryFailed) {
            this.enactKill(DAGTerminationCause.RECOVERY_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
            this.cancelCommits();
        }
        return !commitFailed && !recoveryFailed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isComplete() {
        this.readLock.lock();
        try {
            DAGState state = this.getState();
            if (state.equals((Object)DAGState.SUCCEEDED) || state.equals((Object)DAGState.FAILED) || state.equals((Object)DAGState.KILLED) || state.equals((Object)DAGState.ERROR)) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    private class CommitCallback
    implements FutureCallback<Void> {
        private OutputKey outputKey;

        public CommitCallback(OutputKey outputKey) {
            this.outputKey = outputKey;
        }

        public void onSuccess(Void result) {
            DAGImpl.this.eventHandler.handle((Event)new DAGEventCommitCompleted(DAGImpl.this.dagId, this.outputKey, true, null));
        }

        public void onFailure(Throwable t) {
            DAGImpl.this.eventHandler.handle((Event)new DAGEventCommitCompleted(DAGImpl.this.dagId, this.outputKey, false, t));
        }
    }

    public static class OutputKey {
        String outputName;
        String entityName;
        boolean isVertexGroupOutput;

        public OutputKey(String outputName, String entityName, boolean isVertexGroupOutput) {
            this.outputName = outputName;
            this.entityName = entityName;
            this.isVertexGroupOutput = isVertexGroupOutput;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (this.entityName == null ? 0 : this.entityName.hashCode());
            result = 31 * result + (this.isVertexGroupOutput ? 1231 : 1237);
            result = 31 * result + (this.outputName == null ? 0 : this.outputName.hashCode());
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            OutputKey other = (OutputKey)obj;
            if (this.entityName == null ? other.entityName != null : !this.entityName.equals(other.entityName)) {
                return false;
            }
            if (this.isVertexGroupOutput != other.isVertexGroupOutput) {
                return false;
            }
            return !(this.outputName == null ? other.outputName != null : !this.outputName.equals(other.outputName));
        }

        public String getEntityName() {
            return this.entityName;
        }

        public String toString() {
            return "outputName:" + this.outputName + " of vertex/vertexGroup:" + this.entityName + " isVertexGroupOutput:" + this.isVertexGroupOutput;
        }
    }

    private static class InternalErrorTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private InternalErrorTransition() {
        }

        public void transition(DAGImpl job, DAGEvent event) {
            LOG.info(job.getID() + " terminating due to internal error");
            job.enactKill(DAGTerminationCause.INTERNAL_ERROR, VertexTerminationCause.INTERNAL_ERROR);
            job.setFinishTime();
            job.cancelCommits();
            job.finished(DAGState.ERROR);
        }
    }

    private static class VertexRerunWhileCommitting
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private VertexRerunWhileCommitting() {
        }

        public void transition(DAGImpl dag, DAGEvent event) {
            LOG.info("Vertex rerun while dag it is COMMITTING");
            DAGEventVertexReRunning rerunEvent = (DAGEventVertexReRunning)event;
            Vertex vertex = dag.getVertex(rerunEvent.getVertexId());
            dag.reRunningVertices.add(vertex.getVertexId());
            dag.numSuccessfulVertices--;
            --dag.numCompletedVertices;
            dag.addDiagnostic("Vertex re-running, vertexName=" + vertex.getName() + ", vertexId=" + vertex.getVertexId());
            dag.cancelCommits();
            dag.enactKill(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING);
        }
    }

    private static class CommitCompletedTransition
    implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
        private CommitCompletedTransition() {
        }

        public DAGState transition(DAGImpl dag, DAGEvent event) {
            DAGEventCommitCompleted commitCompletedEvent = (DAGEventCommitCompleted)event;
            dag.commitCompleted(commitCompletedEvent);
            return DAGImpl.checkCommitsForCompletion(dag);
        }
    }

    private static class CommitCompletedWhileRunning
    implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
        private CommitCompletedWhileRunning() {
        }

        public DAGState transition(DAGImpl dag, DAGEvent event) {
            DAGEventCommitCompleted commitCompletedEvent = (DAGEventCommitCompleted)event;
            if (dag.commitCompleted(commitCompletedEvent)) {
                return DAGState.RUNNING;
            }
            return DAGState.TERMINATING;
        }
    }

    private static class DAGSchedulerUpdateTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private DAGSchedulerUpdateTransition() {
        }

        public void transition(DAGImpl dag, DAGEvent event) {
            DAGEventSchedulerUpdate sEvent = (DAGEventSchedulerUpdate)event;
            switch (sEvent.getUpdateType()) {
                case TA_SCHEDULE: {
                    dag.dagScheduler.scheduleTask(sEvent);
                    break;
                }
                case TA_COMPLETED: {
                    dag.dagScheduler.taskCompleted(sEvent);
                    break;
                }
                default: {
                    throw new TezUncheckedException("Unknown DAGEventSchedulerUpdate:" + (Object)((Object)sEvent.getUpdateType()));
                }
            }
        }
    }

    private static class CounterUpdateTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private CounterUpdateTransition() {
        }

        public void transition(DAGImpl job, DAGEvent event) {
            DAGEventCounterUpdate jce = (DAGEventCounterUpdate)event;
            for (DAGEventCounterUpdate.CounterIncrementalUpdate ci : jce.getCounterUpdates()) {
                job.dagCounters.findCounter(ci.getCounterKey()).increment(ci.getIncrementValue());
            }
        }
    }

    private static class DiagnosticsUpdateTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private DiagnosticsUpdateTransition() {
        }

        public void transition(DAGImpl job, DAGEvent event) {
            job.addDiagnostic(((DAGEventDiagnosticsUpdate)event).getDiagnosticUpdate());
        }
    }

    private static class VertexReRunningTransition
    implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
        private VertexReRunningTransition() {
        }

        public DAGState transition(DAGImpl job, DAGEvent event) {
            DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning)event;
            Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
            boolean failed = job.vertexReRunning(vertex);
            if (!failed) {
                --job.numCompletedVertices;
            }
            LOG.info("Vertex " + vertex.getLogIdentifier() + " re-running." + ", numCompletedVertices=" + job.numCompletedVertices + ", numSuccessfulVertices=" + job.numSuccessfulVertices + ", numFailedVertices=" + job.numFailedVertices + ", numKilledVertices=" + job.numKilledVertices + ", numVertices=" + job.numVertices);
            if (failed) {
                return DAGState.TERMINATING;
            }
            return DAGState.RUNNING;
        }
    }

    private static class VertexCompletedTransition
    implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
        private VertexCompletedTransition() {
        }

        public DAGState transition(DAGImpl job, DAGEvent event) {
            boolean forceTransitionToKillWait = false;
            DAGEventVertexCompleted vertexEvent = (DAGEventVertexCompleted)event;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received a vertex completion event, vertexId=" + vertexEvent.getVertexId() + ", vertexState=" + (Object)((Object)vertexEvent.getVertexState()));
            }
            Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
            ++job.numCompletedVertices;
            if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
                forceTransitionToKillWait = !job.vertexSucceeded(vertex);
            } else if (vertexEvent.getVertexState() == VertexState.FAILED) {
                job.enactKill(DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
                job.cancelCommits();
                job.vertexFailed(vertex);
                forceTransitionToKillWait = true;
            } else if (vertexEvent.getVertexState() == VertexState.KILLED) {
                job.vertexKilled(vertex);
                job.cancelCommits();
                forceTransitionToKillWait = true;
            }
            job.reRunningVertices.remove(vertex.getVertexId());
            LOG.info("Vertex " + vertex.getLogIdentifier() + " completed." + ", numCompletedVertices=" + job.numCompletedVertices + ", numSuccessfulVertices=" + job.numSuccessfulVertices + ", numFailedVertices=" + job.numFailedVertices + ", numKilledVertices=" + job.numKilledVertices + ", numVertices=" + job.numVertices);
            DAGState state = DAGImpl.checkVerticesForCompletion(job);
            if (state == DAGState.RUNNING && forceTransitionToKillWait) {
                job.cancelCommits();
                return DAGState.TERMINATING;
            }
            return state;
        }
    }

    private static class DAGKilledWhileCommittingTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private DAGKilledWhileCommittingTransition() {
        }

        public void transition(DAGImpl dag, DAGEvent event) {
            String diag = "DAG received Kill while in COMMITTING state.";
            LOG.info(diag);
            dag.addDiagnostic(diag);
            dag.cancelCommits();
            dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
        }
    }

    private static class DAGKilledTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private DAGKilledTransition() {
        }

        public void transition(DAGImpl job, DAGEvent event) {
            String msg = "Job received Kill while in RUNNING state.";
            LOG.info(msg);
            job.addDiagnostic(msg);
            job.enactKill(DAGTerminationCause.DAG_KILL, VertexTerminationCause.DAG_KILL);
            job.cancelCommits();
        }
    }

    private static class KillInitedJobTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private KillInitedJobTransition() {
        }

        public void transition(DAGImpl dag, DAGEvent dagEvent) {
            dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
            dag.addDiagnostic("Job received Kill in INITED state.");
            dag.finished(DAGState.KILLED);
        }
    }

    private static class KillNewJobTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private KillNewJobTransition() {
        }

        public void transition(DAGImpl dag, DAGEvent dagEvent) {
            dag.setFinishTime();
            dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
            dag.finished(DAGState.KILLED);
        }
    }

    public static class StartTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        public void transition(DAGImpl dag, DAGEvent event) {
            DAGEventStartDag startEvent = (DAGEventStartDag)event;
            dag.startTime = dag.clock.getTime();
            dag.logJobHistoryStartedEvent();
            List<URL> additionalUrlsForClasspath = startEvent.getAdditionalUrlsForClasspath();
            if (additionalUrlsForClasspath != null) {
                LOG.info("Added additional resources : [" + additionalUrlsForClasspath + "] to classpath");
                RelocalizationUtils.addUrlsToClassPath(additionalUrlsForClasspath);
            }
            dag.initializeVerticesAndStart();
        }
    }

    private static class InitTransition
    implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
        private InitTransition() {
        }

        public DAGState transition(DAGImpl dag, DAGEvent event) {
            dag.startDAGCpuTime = dag.appContext.getCumulativeCPUTime();
            dag.startDAGGCTime = dag.appContext.getCumulativeGCTime();
            DAGState state = dag.initializeDAG();
            if (state != DAGState.INITED) {
                dag.trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
                return state;
            }
            dag.logJobHistoryInitedEvent();
            return DAGState.INITED;
        }
    }

    private static class RecoverTransition
    implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
        private RecoverTransition() {
        }

        public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
            DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent)dagEvent;
            if (recoverEvent.hasDesiredState()) {
                dag.recoveredState = recoverEvent.getDesiredState();
            }
            if (recoverEvent.getAdditionalUrlsForClasspath() != null) {
                LOG.info("Added additional resources : [" + recoverEvent.getAdditionalUrlsForClasspath() + "] to classpath");
                RelocalizationUtils.addUrlsToClassPath(recoverEvent.getAdditionalUrlsForClasspath());
            }
            switch (dag.recoveredState) {
                case NEW: {
                    dag.eventHandler.handle((Event)new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
                    dag.eventHandler.handle((Event)new DAGEventStartDag(dag.getID(), null));
                    return DAGState.NEW;
                }
                case INITED: {
                    for (Vertex v : dag.vertices.values()) {
                        if (v.getInputVerticesCount() != 0) continue;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Sending Running Recovery event to root vertex " + v.getLogIdentifier());
                        }
                        dag.eventHandler.handle((Event)new VertexEventRecoverVertex(v.getVertexId(), VertexState.RUNNING));
                    }
                    return DAGState.RUNNING;
                }
                case RUNNING: {
                    boolean groupCommitInProgress = false;
                    if (!dag.recoveredGroupCommits.isEmpty()) {
                        for (Map.Entry<String, Boolean> entry : dag.recoveredGroupCommits.entrySet()) {
                            if (entry.getValue().booleanValue()) continue;
                            LOG.info("Found a pending Vertex Group commit, vertexGroup=" + entry.getKey());
                            groupCommitInProgress = true;
                            break;
                        }
                    }
                    if (groupCommitInProgress || dag.recoveryCommitInProgress) {
                        dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
                        dag.setFinishTime();
                        for (Vertex v : dag.vertices.values()) {
                            VertexState desiredState = VertexState.SUCCEEDED;
                            if (dag.recoveredState.equals((Object)DAGState.KILLED)) {
                                desiredState = VertexState.KILLED;
                            } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains((Object)dag.recoveredState)) {
                                desiredState = VertexState.FAILED;
                            }
                            dag.eventHandler.handle((Event)new VertexEventRecoverVertex(v.getVertexId(), desiredState));
                        }
                        DAGState endState = DAGState.FAILED;
                        try {
                            dag.logJobHistoryUnsuccesfulEvent(endState, dag.getAllCounters());
                        }
                        catch (IOException e) {
                            LOG.warn("Failed to persist recovery event for DAG completion, dagId=" + dag.dagId + ", finalState=" + (Object)((Object)endState));
                        }
                        dag.eventHandler.handle((Event)new DAGAppMasterEventDAGFinished(dag.getID(), endState));
                        return endState;
                    }
                    for (Vertex v : dag.vertices.values()) {
                        if (v.getInputVerticesCount() != 0) continue;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Sending Running Recovery event to root vertex " + v.getLogIdentifier());
                        }
                        dag.eventHandler.handle((Event)new VertexEventRecoverVertex(v.getVertexId(), VertexState.RUNNING));
                    }
                    return DAGState.RUNNING;
                }
                case SUCCEEDED: 
                case ERROR: 
                case FAILED: 
                case KILLED: {
                    for (Vertex v : dag.vertices.values()) {
                        VertexState desiredState = VertexState.SUCCEEDED;
                        if (dag.recoveredState.equals((Object)DAGState.KILLED)) {
                            desiredState = VertexState.KILLED;
                        } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains((Object)dag.recoveredState)) {
                            desiredState = VertexState.FAILED;
                        }
                        dag.eventHandler.handle((Event)new VertexEventRecoverVertex(v.getVertexId(), desiredState));
                    }
                    dag.eventHandler.handle((Event)new DAGAppMasterEventDAGFinished(dag.getID(), dag.recoveredState));
                    LOG.info("Recovered DAG: " + dag.getID() + " finished with state: " + (Object)((Object)dag.recoveredState));
                    return dag.recoveredState;
                }
            }
            LOG.warn("Trying to recover DAG, failed to recover from non-handled state" + (Object)((Object)dag.recoveredState));
            dag.eventHandler.handle((Event)new DAGAppMasterEventDAGFinished(dag.getID(), DAGState.ERROR));
            return DAGState.FAILED;
        }
    }

    private static class DagStateChangedCallback
    implements OnStateChangedCallback<DAGState, DAGImpl> {
        private DagStateChangedCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onStateChanged(DAGImpl dag, DAGState dagState) {
            dag.isFinalState.set(true);
            dag.dagStatusLock.lock();
            try {
                dag.dagCompletionCondition.signal();
            }
            finally {
                dag.dagStatusLock.unlock();
            }
        }
    }

    static class VertexGroupInfo {
        String groupName;
        Set<String> groupMembers;
        Set<String> outputs;
        Map<String, InputDescriptor> edgeMergedInputs;
        int successfulMembers;
        int successfulCommits;
        boolean commitStarted;

        VertexGroupInfo(DAGProtos.PlanVertexGroupInfo groupInfo) {
            this.groupName = groupInfo.getGroupName();
            this.groupMembers = Sets.newHashSet((Iterable)groupInfo.getGroupMembersList());
            this.edgeMergedInputs = Maps.newHashMapWithExpectedSize((int)groupInfo.getEdgeMergedInputsCount());
            for (DAGProtos.PlanGroupInputEdgeInfo edgInfo : groupInfo.getEdgeMergedInputsList()) {
                this.edgeMergedInputs.put(edgInfo.getDestVertexName(), DagTypeConverters.convertInputDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)edgInfo.getMergedInput()));
            }
            this.outputs = Sets.newHashSet((Iterable)groupInfo.getOutputsList());
            this.successfulMembers = 0;
            this.successfulCommits = 0;
            this.commitStarted = false;
        }

        public boolean isInCommitting() {
            return this.commitStarted && this.successfulCommits < this.outputs.size();
        }

        public boolean isCommitted() {
            return this.commitStarted && this.successfulCommits == this.outputs.size();
        }
    }
}

