/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.counters.AbstractCounters;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EntityDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.app.dag.impl.ImmediateStartVertexManager;
import org.apache.tez.dag.app.dag.impl.OutputCommitterContextImpl;
import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.VertexManager;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;

public class VertexImpl
implements Vertex,
EventHandler<VertexEvent> {
    private static final String LINE_SEPARATOR = System.getProperty("line.separator");
    private static final Log LOG = LogFactory.getLog(VertexImpl.class);
    private final Clock clock;
    private final Lock readLock;
    private final Lock writeLock;
    private final TaskAttemptListener taskAttemptListener;
    private final TaskHeartbeatHandler taskHeartbeatHandler;
    private final Object tasksSyncHandle = new Object();
    private final EventHandler eventHandler;
    private final AppContext appContext;
    private boolean lazyTasksCopyNeeded = false;
    volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap();
    private Object fullCountersLock = new Object();
    private TezCounters fullCounters = null;
    private Resource taskResource;
    private Configuration conf;
    @VisibleForTesting
    int numStartedSourceVertices = 0;
    @VisibleForTesting
    int numInitedSourceVertices = 0;
    @VisibleForTesting
    int numRecoveredSourceVertices = 0;
    private int distanceFromRoot = 0;
    private final List<String> diagnostics = new ArrayList<String>();
    protected final StateChangeNotifier stateChangeNotifier;
    @VisibleForTesting
    int numSuccessSourceAttemptCompletions = 0;
    List<GroupInputSpec> groupInputSpecList;
    Set<String> sharedOutputs = Sets.newHashSet();
    private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
    private static final RouteEventTransition ROUTE_EVENT_TRANSITION = new RouteEventTransition();
    private static final TaskAttemptCompletedEventTransition TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = new TaskAttemptCompletedEventTransition();
    private static final SourceTaskAttemptCompletedEventTransition SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = new SourceTaskAttemptCompletedEventTransition();
    private static final VertexStateChangedCallback STATE_CHANGED_CALLBACK = new VertexStateChangedCallback();
    private VertexState recoveredState = VertexState.NEW;
    @VisibleForTesting
    List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
    private boolean vertexAlreadyInitialized = false;
    @VisibleForTesting
    final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
    protected static final StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent> stateMachineFactory = new StateMachineFactory((Enum)VertexState.NEW).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW, VertexState.INITED, VertexState.INITIALIZING, VertexState.FAILED), (Enum)VertexEventType.V_INIT, (MultipleArcTransition)new InitTransition()).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW), (Enum)VertexEventType.V_NULL_EDGE_INITIALIZED, (MultipleArcTransition)new NullEdgeInitializedTransition()).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW), (Enum)VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, (MultipleArcTransition)SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW, new VertexState[]{VertexState.INITED, VertexState.INITIALIZING, VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.ERROR, VertexState.RECOVERING}), (Enum)VertexEventType.V_RECOVER, (MultipleArcTransition)new StartRecoverTransition()).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.INITED, new VertexState[]{VertexState.INITIALIZING, VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.ERROR, VertexState.RECOVERING}), (Enum)VertexEventType.V_SOURCE_VERTEX_RECOVERED, (MultipleArcTransition)new RecoverTransition()).addTransition((Enum)VertexState.NEW, (Enum)VertexState.NEW, (Enum)VertexEventType.V_SOURCE_VERTEX_STARTED, (SingleArcTransition)new SourceVertexStartedTransition()).addTransition((Enum)VertexState.NEW, (Enum)VertexState.KILLED, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new TerminateNewVertexTransition()).addTransition((Enum)VertexState.NEW, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.RECOVERING, EnumSet.of(VertexState.NEW, new VertexState[]{VertexState.INITED, VertexState.INITIALIZING, VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.ERROR, VertexState.RECOVERING}), (Enum)VertexEventType.V_SOURCE_VERTEX_RECOVERED, (MultipleArcTransition)new RecoverTransition()).addTransition((Enum)VertexState.RECOVERING, (Enum)VertexState.RECOVERING, EnumSet.of(VertexEventType.V_INIT, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED), (SingleArcTransition)new BufferDataRecoverTransition()).addTransition((Enum)VertexState.RECOVERING, (Enum)VertexState.RECOVERING, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new TerminateDuringRecoverTransition()).addTransition((Enum)VertexState.RECOVERING, EnumSet.of(VertexState.RECOVERING), (Enum)VertexEventType.V_MANAGER_USER_CODE_ERROR, (MultipleArcTransition)new VertexManagerUserCodeErrorTransition()).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, VertexState.FAILED), (Enum)VertexEventType.V_ROOT_INPUT_INITIALIZED, (MultipleArcTransition)new RootInputInitializedTransition()).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING), (Enum)VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, (MultipleArcTransition)new OneToOneSourceSplitTransition()).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITED, VertexState.FAILED), (Enum)VertexEventType.V_READY_TO_INIT, (MultipleArcTransition)new VertexInitializedTransition()).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.FAILED), (Enum)VertexEventType.V_ROOT_INPUT_FAILED, (MultipleArcTransition)new RootInputInitFailedTransition()).addTransition((Enum)VertexState.INITIALIZING, (Enum)VertexState.INITIALIZING, (Enum)VertexEventType.V_START, (SingleArcTransition)new StartWhileInitializingTransition()).addTransition((Enum)VertexState.INITIALIZING, (Enum)VertexState.INITIALIZING, (Enum)VertexEventType.V_SOURCE_VERTEX_STARTED, (SingleArcTransition)new SourceVertexStartedTransition()).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING), (Enum)VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, (MultipleArcTransition)SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING, VertexState.FAILED), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.FAILED), (Enum)VertexEventType.V_MANAGER_USER_CODE_ERROR, (MultipleArcTransition)new VertexManagerUserCodeErrorTransition()).addTransition((Enum)VertexState.INITIALIZING, (Enum)VertexState.KILLED, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new TerminateInitingVertexTransition()).addTransition((Enum)VertexState.INITIALIZING, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, VertexState.FAILED), (Enum)VertexEventType.V_NULL_EDGE_INITIALIZED, (MultipleArcTransition)new NullEdgeInitializedTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.FAILED), (Enum)VertexEventType.V_ROOT_INPUT_FAILED, (MultipleArcTransition)new RootInputInitFailedTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.INITED, VertexState.ERROR), (Enum)VertexEventType.V_INIT, (MultipleArcTransition)new IgnoreInitInInitedTransition()).addTransition((Enum)VertexState.INITED, (Enum)VertexState.INITED, (Enum)VertexEventType.V_SOURCE_VERTEX_STARTED, (SingleArcTransition)new SourceVertexStartedTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.INITED), (Enum)VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, (MultipleArcTransition)new OneToOneSourceSplitTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.INITED), (Enum)VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, (MultipleArcTransition)SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.RUNNING, VertexState.INITED, VertexState.TERMINATING), (Enum)VertexEventType.V_START, (MultipleArcTransition)new StartTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.INITED, VertexState.FAILED), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.INITED, (Enum)VertexState.KILLED, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new TerminateInitedVertexTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.FAILED), (Enum)VertexEventType.V_MANAGER_USER_CODE_ERROR, (MultipleArcTransition)new VertexManagerUserCodeErrorTransition()).addTransition((Enum)VertexState.INITED, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.TERMINATING), (Enum)VertexEventType.V_ROOT_INPUT_FAILED, (MultipleArcTransition)new RootInputInitFailedTransition()).addTransition((Enum)VertexState.RUNNING, (Enum)VertexState.RUNNING, (Enum)VertexEventType.V_TASK_ATTEMPT_COMPLETED, (SingleArcTransition)TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING), (Enum)VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, (MultipleArcTransition)SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.TERMINATING, VertexState.FAILED, VertexState.ERROR), (Enum)VertexEventType.V_TASK_COMPLETED, (MultipleArcTransition)new TaskCompletedTransition()).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.RUNNING), (Enum)VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, (MultipleArcTransition)new OneToOneSourceSplitTransition()).addTransition((Enum)VertexState.RUNNING, (Enum)VertexState.TERMINATING, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new VertexKilledTransition()).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.TERMINATING), (Enum)VertexEventType.V_MANAGER_USER_CODE_ERROR, (MultipleArcTransition)new VertexManagerUserCodeErrorTransition()).addTransition((Enum)VertexState.RUNNING, (Enum)VertexState.RUNNING, (Enum)VertexEventType.V_TASK_RESCHEDULED, (SingleArcTransition)new TaskRescheduledTransition()).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED), (Enum)VertexEventType.V_COMPLETED, (MultipleArcTransition)new VertexNoTasksCompletedTransition()).addTransition((Enum)VertexState.RUNNING, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.TERMINATING, EnumSet.of(VertexState.TERMINATING, VertexState.KILLED, VertexState.FAILED), (Enum)VertexEventType.V_TASK_COMPLETED, (MultipleArcTransition)new TaskCompletedTransition()).addTransition((Enum)VertexState.TERMINATING, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.TERMINATING, (Enum)VertexState.TERMINATING, EnumSet.of(VertexEventType.V_TERMINATE, new VertexEventType[]{VertexEventType.V_MANAGER_USER_CODE_ERROR, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_RESCHEDULED})).addTransition((Enum)VertexState.SUCCEEDED, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.SUCCEEDED, EnumSet.of(VertexState.RUNNING, VertexState.FAILED), (Enum)VertexEventType.V_TASK_RESCHEDULED, (MultipleArcTransition)new TaskRescheduledAfterVertexSuccessTransition()).addTransition((Enum)VertexState.SUCCEEDED, EnumSet.of(VertexState.SUCCEEDED, VertexState.FAILED), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.SUCCEEDED, EnumSet.of(VertexState.FAILED, VertexState.ERROR), (Enum)VertexEventType.V_TASK_COMPLETED, (MultipleArcTransition)new TaskCompletedAfterVertexSuccessTransition()).addTransition((Enum)VertexState.SUCCEEDED, (Enum)VertexState.SUCCEEDED, EnumSet.of(VertexEventType.V_TERMINATE, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)).addTransition((Enum)VertexState.SUCCEEDED, (Enum)VertexState.SUCCEEDED, (Enum)VertexEventType.V_TASK_ATTEMPT_COMPLETED, (SingleArcTransition)new TaskAttemptCompletedEventTransition()).addTransition((Enum)VertexState.FAILED, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.FAILED, (Enum)VertexState.FAILED, EnumSet.of(VertexEventType.V_TERMINATE, new VertexEventType[]{VertexEventType.V_MANAGER_USER_CODE_ERROR, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_TASK_RESCHEDULED, VertexEventType.V_START, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_COMPLETED, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_NULL_EDGE_INITIALIZED, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_RECOVERED})).addTransition((Enum)VertexState.KILLED, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.KILLED, (Enum)VertexState.KILLED, EnumSet.of(VertexEventType.V_TERMINATE, new VertexEventType[]{VertexEventType.V_MANAGER_USER_CODE_ERROR, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_INIT, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_START, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TASK_RESCHEDULED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_COMPLETED, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_RECOVERED})).addTransition((Enum)VertexState.ERROR, (Enum)VertexState.ERROR, EnumSet.of(VertexEventType.V_INIT, new VertexEventType[]{VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_START, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TERMINATE, VertexEventType.V_MANAGER_USER_CODE_ERROR, VertexEventType.V_TASK_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_RESCHEDULED, VertexEventType.V_INTERNAL_ERROR, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_RECOVERED})).installTopology();
    private final StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl> stateMachine;
    @VisibleForTesting
    int numTasks;
    @VisibleForTesting
    int completedTaskCount = 0;
    @VisibleForTesting
    int succeededTaskCount = 0;
    @VisibleForTesting
    int failedTaskCount = 0;
    @VisibleForTesting
    int killedTaskCount = 0;
    @VisibleForTesting
    AtomicInteger failedTaskAttemptCount = new AtomicInteger(0);
    @VisibleForTesting
    AtomicInteger killedTaskAttemptCount = new AtomicInteger(0);
    @VisibleForTesting
    long initTimeRequested;
    @VisibleForTesting
    long initedTime;
    @VisibleForTesting
    long startTimeRequested;
    @VisibleForTesting
    long startedTime;
    @VisibleForTesting
    long finishTime;
    private float progress;
    private final TezVertexID vertexId;
    private final DAGProtos.VertexPlan vertexPlan;
    private boolean initWaitsForRootInitializers = false;
    private final String vertexName;
    private final ProcessorDescriptor processorDescriptor;
    private boolean vertexToBeReconfiguredByManager = false;
    AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
    AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
    @VisibleForTesting
    Map<Vertex, Edge> sourceVertices;
    private Map<Vertex, Edge> targetVertices;
    Set<Edge> uninitializedEdges = Sets.newHashSet();
    private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> rootInputDescriptors;
    private Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> additionalOutputs;
    private Map<String, OutputCommitter> outputCommitters;
    private Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>();
    private static final InputSpecUpdate DEFAULT_ROOT_INPUT_SPECS = InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate();
    private final List<OutputSpec> additionalOutputSpecs = new ArrayList<OutputSpec>();
    private Set<String> inputsWithInitializers;
    private int numInitializedInputs;
    private boolean startSignalPending = false;
    private boolean tasksNotYetScheduled = true;
    List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
    List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>();
    List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
    private RootInputInitializerManager rootInputInitializerManager;
    VertexManager vertexManager;
    private final UserGroupInformation dagUgi;
    private boolean parallelismSet = false;
    private TezVertexID originalOneToOneSplitSource = null;
    private AtomicBoolean committed = new AtomicBoolean(false);
    private AtomicBoolean aborted = new AtomicBoolean(false);
    private boolean commitVertexOutputs = false;
    private Map<String, DAGImpl.VertexGroupInfo> dagVertexGroups;
    private TaskLocationHint[] taskLocationHints;
    private Map<String, LocalResource> localResources;
    private Map<String, String> environment;
    private final String javaOpts;
    private final ContainerContext containerContext;
    private VertexTerminationCause terminationCause;
    private String logIdentifier;
    @VisibleForTesting
    boolean recoveryCommitInProgress = false;
    private boolean summaryCompleteSeen = false;
    @VisibleForTesting
    boolean hasCommitter = false;
    private boolean vertexCompleteSeen = false;
    private Map<String, EdgeManagerPluginDescriptor> recoveredSourceEdgeManagers = null;
    private Map<String, InputSpecUpdate> recoveredRootInputSpecUpdates = null;
    boolean recoveryInitEventSeen = false;
    boolean recoveryStartEventSeen = false;
    private VertexStats vertexStats = null;
    private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts;

    private void augmentStateMachine() {
        this.stateMachine.registerStateEnteredCallback(VertexState.SUCCEEDED, STATE_CHANGED_CALLBACK).registerStateEnteredCallback(VertexState.FAILED, STATE_CHANGED_CALLBACK).registerStateEnteredCallback(VertexState.KILLED, STATE_CHANGED_CALLBACK).registerStateEnteredCallback(VertexState.RUNNING, STATE_CHANGED_CALLBACK);
    }

    public VertexImpl(TezVertexID vertexId, DAGProtos.VertexPlan vertexPlan, String vertexName, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Clock clock, TaskHeartbeatHandler thh, boolean commitVertexOutputs, AppContext appContext, VertexLocationHint vertexLocationHint, Map<String, DAGImpl.VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption, StateChangeNotifier entityStatusTracker) {
        this.vertexId = vertexId;
        this.vertexPlan = vertexPlan;
        this.vertexName = StringInterner.weakIntern((String)vertexName);
        this.conf = conf;
        this.clock = clock;
        this.appContext = appContext;
        this.commitVertexOutputs = commitVertexOutputs;
        this.taskAttemptListener = taskAttemptListener;
        this.taskHeartbeatHandler = thh;
        this.eventHandler = eventHandler;
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
        if (LOG.isDebugEnabled()) {
            VertexImpl.logLocationHints(this.vertexName, vertexLocationHint);
        }
        this.setTaskLocationHints(vertexLocationHint);
        this.dagUgi = appContext.getCurrentDAG().getDagUGI();
        this.taskResource = DagTypeConverters.createResourceRequestFromTaskConfig((DAGProtos.PlanTaskConfiguration)vertexPlan.getTaskConfig());
        this.processorDescriptor = DagTypeConverters.convertProcessorDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)vertexPlan.getProcessorDescriptor());
        this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan((List)vertexPlan.getTaskConfig().getLocalResourceList());
        this.environment = DagTypeConverters.createEnvironmentMapFromDAGPlan((List)vertexPlan.getTaskConfig().getEnvironmentSettingList());
        this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
        this.taskSpecificLaunchCmdOpts = taskSpecificLaunchCmdOption;
        this.containerContext = new ContainerContext(this.localResources, appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this);
        if (vertexPlan.getInputsCount() > 0) {
            this.setAdditionalInputs(vertexPlan.getInputsList());
        }
        if (vertexPlan.getOutputsCount() > 0) {
            this.setAdditionalOutputs(vertexPlan.getOutputsList());
        }
        this.stateChangeNotifier = entityStatusTracker;
        this.numTasks = vertexPlan.getTaskConfig().getNumTasks();
        this.dagVertexGroups = dagVertexGroups;
        this.logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
        this.stateMachine = new StateMachineTez(stateMachineFactory.make((Object)this), this);
        this.augmentStateMachine();
    }

    protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() {
        return this.stateMachine;
    }

    @Override
    public TezVertexID getVertexId() {
        return this.vertexId;
    }

    @Override
    public DAGProtos.VertexPlan getVertexPlan() {
        return this.vertexPlan;
    }

    @Override
    public int getDistanceFromRoot() {
        return this.distanceFromRoot;
    }

    @Override
    public String getName() {
        return this.vertexName;
    }

    EventHandler getEventHandler() {
        return this.eventHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Task getTask(TezTaskID taskID) {
        this.readLock.lock();
        try {
            Task task = this.tasks.get(taskID);
            return task;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public Task getTask(int taskIndex) {
        return this.getTask(TezTaskID.getInstance((TezVertexID)this.vertexId, (int)taskIndex));
    }

    @Override
    public int getTotalTasks() {
        return this.numTasks;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getCompletedTasks() {
        this.readLock.lock();
        try {
            int n = this.succeededTaskCount + this.failedTaskCount + this.killedTaskCount;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getSucceededTasks() {
        this.readLock.lock();
        try {
            int n = this.succeededTaskCount;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getRunningTasks() {
        this.readLock.lock();
        try {
            int num = 0;
            for (Task task : this.tasks.values()) {
                if (task.getState() != TaskState.RUNNING) continue;
                ++num;
            }
            int n = num;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TezCounters getAllCounters() {
        this.readLock.lock();
        try {
            VertexState state = this.getInternalState();
            if (state == VertexState.ERROR || state == VertexState.FAILED || state == VertexState.KILLED || state == VertexState.SUCCEEDED) {
                this.mayBeConstructFinalFullCounters();
                TezCounters tezCounters = this.fullCounters;
                return tezCounters;
            }
            TezCounters counters = new TezCounters();
            TezCounters tezCounters = VertexImpl.incrTaskCounters(counters, this.tasks.values());
            return tezCounters;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public VertexStats getVertexStats() {
        this.readLock.lock();
        try {
            VertexState state = this.getInternalState();
            if (state == VertexState.ERROR || state == VertexState.FAILED || state == VertexState.KILLED || state == VertexState.SUCCEEDED) {
                this.mayBeConstructFinalFullCounters();
                VertexStats vertexStats = this.vertexStats;
                return vertexStats;
            }
            VertexStats stats = new VertexStats();
            VertexStats vertexStats = VertexImpl.updateVertexStats(stats, this.tasks.values());
            return vertexStats;
        }
        finally {
            this.readLock.unlock();
        }
    }

    public static TezCounters incrTaskCounters(TezCounters counters, Collection<Task> tasks) {
        for (Task task : tasks) {
            counters.incrAllCounters((AbstractCounters)task.getCounters());
        }
        return counters;
    }

    public static VertexStats updateVertexStats(VertexStats stats, Collection<Task> tasks) {
        for (Task task : tasks) {
            stats.updateStats(task.getReport());
        }
        return stats;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<String> getDiagnostics() {
        this.readLock.lock();
        try {
            List<String> list = this.diagnostics;
            return list;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public float getProgress() {
        this.readLock.lock();
        try {
            this.computeProgress();
            float f = this.progress;
            return f;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ProgressBuilder getVertexProgress() {
        this.readLock.lock();
        try {
            ProgressBuilder progress = new ProgressBuilder();
            progress.setTotalTaskCount(this.numTasks);
            progress.setSucceededTaskCount(this.succeededTaskCount);
            progress.setRunningTaskCount(this.getRunningTasks());
            progress.setFailedTaskCount(this.failedTaskCount);
            progress.setKilledTaskCount(this.killedTaskCount);
            progress.setFailedTaskAttemptCount(this.failedTaskAttemptCount.get());
            progress.setKilledTaskAttemptCount(this.killedTaskAttemptCount.get());
            ProgressBuilder progressBuilder = progress;
            return progressBuilder;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VertexStatusBuilder getVertexStatus(Set<StatusGetOpts> statusOptions) {
        this.readLock.lock();
        try {
            VertexStatusBuilder status = new VertexStatusBuilder();
            status.setState(this.getInternalState());
            status.setDiagnostics(this.diagnostics);
            status.setProgress(this.getVertexProgress());
            if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
                status.setVertexCounters(this.getAllCounters());
            }
            VertexStatusBuilder vertexStatusBuilder = status;
            return vertexStatusBuilder;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TaskLocationHint getTaskLocationHint(TezTaskID taskId) {
        this.readLock.lock();
        try {
            if (this.taskLocationHints == null || this.taskLocationHints.length <= taskId.getId()) {
                TaskLocationHint taskLocationHint = null;
                return taskLocationHint;
            }
            TaskLocationHint taskLocationHint = this.taskLocationHints[taskId.getId()];
            return taskLocationHint;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void computeProgress() {
        this.readLock.lock();
        try {
            float progress = 0.0f;
            for (Task task : this.tasks.values()) {
                progress += task.isFinished() ? 1.0f : task.getProgress();
            }
            if (this.numTasks != 0) {
                progress /= (float)this.numTasks;
            }
            this.progress = progress;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<TezTaskID, Task> getTasks() {
        Object object = this.tasksSyncHandle;
        synchronized (object) {
            this.lazyTasksCopyNeeded = true;
            return Collections.unmodifiableMap(this.tasks);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VertexState getState() {
        this.readLock.lock();
        try {
            VertexState vertexState = (VertexState)this.getStateMachine().getCurrentState();
            return vertexState;
        }
        finally {
            this.readLock.unlock();
        }
    }

    boolean trySetTerminationCause(VertexTerminationCause trigger) {
        if (this.terminationCause == null) {
            this.terminationCause = trigger;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VertexTerminationCause getTerminationCause() {
        this.readLock.lock();
        try {
            VertexTerminationCause vertexTerminationCause = this.terminationCause;
            return vertexTerminationCause;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public AppContext getAppContext() {
        return this.appContext;
    }

    private void handleParallelismUpdate(int newParallelism, Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, Map<String, InputSpecUpdate> rootInputSpecUpdates) {
        LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
        Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet().iterator();
        int i = 0;
        while (iter.hasNext()) {
            iter.next();
            if (++i <= newParallelism) continue;
            iter.remove();
        }
        this.recoveredSourceEdgeManagers = sourceEdgeManagers;
        this.recoveredRootInputSpecUpdates = rootInputSpecUpdates;
    }

    @Override
    public VertexState restoreFromEvent(HistoryEvent historyEvent) {
        switch (historyEvent.getEventType()) {
            case VERTEX_INITIALIZED: {
                this.recoveryInitEventSeen = true;
                this.recoveredState = this.setupVertex((VertexInitializedEvent)historyEvent);
                this.createTasks();
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Recovered state for vertex after Init event, vertex=" + this.logIdentifier + ", recoveredState=" + (Object)((Object)this.recoveredState)));
                }
                return this.recoveredState;
            }
            case VERTEX_STARTED: {
                if (!this.recoveryInitEventSeen) {
                    throw new RuntimeException("Started Event seen but no Init Event was encountered earlier");
                }
                this.recoveryStartEventSeen = true;
                VertexStartedEvent startedEvent = (VertexStartedEvent)historyEvent;
                this.startTimeRequested = startedEvent.getStartRequestedTime();
                this.startedTime = startedEvent.getStartTime();
                this.recoveredState = VertexState.RUNNING;
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Recovered state for vertex after Started event, vertex=" + this.logIdentifier + ", recoveredState=" + (Object)((Object)this.recoveredState)));
                }
                return this.recoveredState;
            }
            case VERTEX_PARALLELISM_UPDATED: {
                VertexParallelismUpdatedEvent updatedEvent = (VertexParallelismUpdatedEvent)historyEvent;
                if (updatedEvent.getVertexLocationHint() != null) {
                    this.setTaskLocationHints(updatedEvent.getVertexLocationHint());
                }
                int oldNumTasks = this.numTasks;
                this.numTasks = updatedEvent.getNumTasks();
                this.stateChangeNotifier.stateChanged(this.vertexId, (VertexStateUpdate)new VertexStateUpdateParallelismUpdated(this.vertexName, this.numTasks, oldNumTasks));
                this.handleParallelismUpdate(this.numTasks, updatedEvent.getSourceEdgeManagers(), updatedEvent.getRootInputSpecUpdates());
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Recovered state for vertex after parallelism updated event, vertex=" + this.logIdentifier + ", recoveredState=" + (Object)((Object)this.recoveredState)));
                }
                return this.recoveredState;
            }
            case VERTEX_COMMIT_STARTED: {
                this.recoveryCommitInProgress = true;
                this.hasCommitter = true;
                return this.recoveredState;
            }
            case VERTEX_FINISHED: {
                VertexFinishedEvent finishedEvent = (VertexFinishedEvent)historyEvent;
                if (finishedEvent.isFromSummary()) {
                    this.summaryCompleteSeen = true;
                } else {
                    this.vertexCompleteSeen = true;
                }
                this.recoveryCommitInProgress = false;
                this.recoveredState = finishedEvent.getState();
                this.diagnostics.add(finishedEvent.getDiagnostics());
                this.finishTime = finishedEvent.getFinishTime();
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Recovered state for vertex after finished event, vertex=" + this.logIdentifier + ", recoveredState=" + (Object)((Object)this.recoveredState)));
                }
                return this.recoveredState;
            }
            case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: {
                VertexRecoverableEventsGeneratedEvent vEvent = (VertexRecoverableEventsGeneratedEvent)historyEvent;
                this.recoveredEvents.addAll(vEvent.getTezEvents());
                return this.recoveredState;
            }
        }
        throw new RuntimeException("Unexpected event received for restoring state, eventType=" + (Object)((Object)historyEvent.getEventType()));
    }

    @Override
    public String getLogIdentifier() {
        return this.logIdentifier;
    }

    @Override
    public void incrementFailedTaskAttemptCount() {
        this.failedTaskAttemptCount.incrementAndGet();
    }

    @Override
    public void incrementKilledTaskAttemptCount() {
        this.killedTaskAttemptCount.incrementAndGet();
    }

    @Override
    public int getFailedTaskAttemptCount() {
        return this.failedTaskAttemptCount.get();
    }

    @Override
    public int getKilledTaskAttemptCount() {
        return this.killedTaskAttemptCount.get();
    }

    private void setTaskLocationHints(VertexLocationHint vertexLocationHint) {
        if (vertexLocationHint != null && vertexLocationHint.getTaskLocationHints() != null && !vertexLocationHint.getTaskLocationHints().isEmpty()) {
            List locHints = vertexLocationHint.getTaskLocationHints();
            this.taskLocationHints = locHints.toArray(new TaskLocationHint[locHints.size()]);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void scheduleTasks(List<VertexManagerPluginContext.TaskWithLocationHint> tasksToSchedule) {
        this.writeLock.lock();
        try {
            this.tasksNotYetScheduled = false;
            if (!this.pendingTaskEvents.isEmpty()) {
                LOG.info((Object)("Routing pending task events for vertex: " + this.logIdentifier));
                try {
                    VertexImpl.handleRoutedTezEvents(this, this.pendingTaskEvents, false);
                }
                catch (AMUserCodeException e) {
                    String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + this.logIdentifier;
                    LOG.error((Object)msg, (Throwable)e);
                    this.addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                    this.eventHandler.handle((Event)new VertexEventTermination(this.vertexId, VertexTerminationCause.AM_USERCODE_FAILURE));
                    this.writeLock.unlock();
                    return;
                }
                this.pendingTaskEvents.clear();
            }
            for (VertexManagerPluginContext.TaskWithLocationHint task : tasksToSchedule) {
                if (this.numTasks <= task.getTaskIndex()) {
                    throw new TezUncheckedException("Invalid taskId: " + task.getTaskIndex() + " for vertex: " + this.logIdentifier);
                }
                TaskLocationHint locationHint = task.getTaskLocationHint();
                if (locationHint != null) {
                    if (this.taskLocationHints == null) {
                        this.taskLocationHints = new TaskLocationHint[this.numTasks];
                    }
                    this.taskLocationHints[task.getTaskIndex().intValue()] = locationHint;
                }
                this.eventHandler.handle((Event)new TaskEvent(TezTaskID.getInstance((TezVertexID)this.vertexId, (int)task.getTaskIndex()), TaskEventType.T_SCHEDULE));
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean fromVertexManager) throws AMUserCodeException {
        this.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, false, fromVertexManager);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean recovering, boolean fromVertexManager) throws AMUserCodeException {
        if (recovering) {
            this.writeLock.lock();
            try {
                if (sourceEdgeManagers != null) {
                    for (Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) {
                        LOG.info((Object)("Recovering edge manager for source:" + entry.getKey() + " destination: " + this.getVertexId()));
                        Vertex sourceVertex = this.appContext.getCurrentDAG().getVertex(entry.getKey());
                        Edge edge = this.sourceVertices.get(sourceVertex);
                        try {
                            edge.setCustomEdgeManager(entry.getValue());
                        }
                        catch (Exception e) {
                            throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge,sourceVertex:" + edge.getSourceVertexName() + "destinationVertex:" + edge.getDestinationVertexName(), (Throwable)e);
                        }
                    }
                }
                if (rootInputSpecUpdates != null) {
                    LOG.info((Object)("Got updated RootInputsSpecs during recovery: " + rootInputSpecUpdates.toString()));
                    this.rootInputSpecs.putAll(rootInputSpecUpdates);
                }
                return;
            }
            finally {
                this.writeLock.unlock();
            }
        }
        Preconditions.checkArgument((parallelism >= 0 ? 1 : 0) != 0, (Object)("Parallelism must be >=0. Value: " + parallelism + " for vertex: " + this.logIdentifier));
        this.setVertexLocationHint(vertexLocationHint);
        this.writeLock.lock();
        try {
            if (this.parallelismSet) {
                String msg = "Parallelism can only be set dynamically once per vertex: " + this.logIdentifier;
                LOG.info((Object)msg);
                throw new TezUncheckedException(msg);
            }
            if (fromVertexManager && this.canInitVertex()) {
                Preconditions.checkState((boolean)this.vertexToBeReconfiguredByManager, (Object)"Vertex is fully configured but still the reconfiguration API has been called. VertexManager must notify the framework using  context.vertexReconfigurationPlanned() before re-configuring the vertex.");
            }
            this.parallelismSet = true;
            if (this.numTasks == -1) {
                if (this.getState() != VertexState.INITIALIZING) {
                    throw new TezUncheckedException("Vertex state is not Initializing. Value: " + (Object)((Object)this.getState()) + " for vertex: " + this.logIdentifier);
                }
                if (sourceEdgeManagers != null) {
                    for (Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) {
                        LOG.info((Object)("Replacing edge manager for source:" + entry.getKey() + " destination: " + this.getVertexId()));
                        Vertex sourceVertex = this.appContext.getCurrentDAG().getVertex(entry.getKey());
                        Edge edge = this.sourceVertices.get(sourceVertex);
                        try {
                            edge.setCustomEdgeManager(entry.getValue());
                        }
                        catch (Exception e) {
                            throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge,sourceVertex:" + edge.getSourceVertexName() + "destinationVertex:" + edge.getDestinationVertexName(), (Throwable)e);
                        }
                    }
                }
                if (rootInputSpecUpdates != null) {
                    LOG.info((Object)("Got updated RootInputsSpecs: " + rootInputSpecUpdates.toString()));
                    for (Map.Entry<String, EdgeManagerPluginDescriptor> entry : rootInputSpecUpdates.entrySet()) {
                        Preconditions.checkState((((InputSpecUpdate)entry.getValue()).isForAllWorkUnits() || ((InputSpecUpdate)entry.getValue()).getAllNumPhysicalInputs() != null && ((InputSpecUpdate)entry.getValue()).getAllNumPhysicalInputs().size() == parallelism ? 1 : 0) != 0, (Object)("Not enough input spec updates for root input named " + entry.getKey()));
                    }
                    this.rootInputSpecs.putAll(rootInputSpecUpdates);
                }
                int oldNumTasks = this.numTasks;
                this.numTasks = parallelism;
                this.stateChangeNotifier.stateChanged(this.vertexId, (VertexStateUpdate)new VertexStateUpdateParallelismUpdated(this.vertexName, this.numTasks, oldNumTasks));
                this.createTasks();
                LOG.info((Object)("Vertex " + this.getVertexId() + " parallelism set to " + parallelism));
                if (this.canInitVertex()) {
                    this.getEventHandler().handle((Event)new VertexEvent(this.getVertexId(), VertexEventType.V_READY_TO_INIT));
                }
            } else {
                Preconditions.checkState((rootInputSpecUpdates == null ? 1 : 0) != 0, (Object)"Root Input specs can only be updated when the vertex is configured with -1 tasks");
                if (parallelism >= this.numTasks) {
                    String msg = "Increasing parallelism is not supported, vertexId=" + this.logIdentifier;
                    LOG.warn((Object)msg);
                    throw new TezUncheckedException(msg);
                }
                if (parallelism == this.numTasks) {
                    LOG.info((Object)("setParallelism same as current value: " + parallelism + " for vertex: " + this.logIdentifier));
                    Preconditions.checkArgument((sourceEdgeManagers != null ? 1 : 0) != 0, (Object)"Source edge managers or RootInputSpecs must be set when not changing parallelism");
                } else {
                    LOG.info((Object)("Resetting vertex location hints due to change in parallelism for vertex: " + this.logIdentifier));
                    vertexLocationHint = null;
                }
                for (Edge edge : this.sourceVertices.values()) {
                    edge.startEventBuffering();
                }
                LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
                Iterator<Map.Entry<TezTaskID, Task>> iterator = currentTasks.entrySet().iterator();
                int i = 0;
                while (iterator.hasNext()) {
                    ++i;
                    Map.Entry<TezTaskID, Task> entry = iterator.next();
                    Task task = entry.getValue();
                    if (task.getState() != TaskState.NEW) {
                        String msg = "All tasks must be in initial state when changing parallelism for vertex: " + this.getVertexId() + " name: " + this.getName();
                        LOG.warn((Object)msg);
                        throw new TezUncheckedException(msg);
                    }
                    if (i <= parallelism) continue;
                    LOG.info((Object)("Removing task: " + entry.getKey()));
                    iterator.remove();
                }
                LOG.info((Object)("Vertex " + this.logIdentifier + " parallelism set to " + parallelism + " from " + this.numTasks));
                int oldNumTasks = this.numTasks;
                this.numTasks = parallelism;
                this.stateChangeNotifier.stateChanged(this.vertexId, (VertexStateUpdate)new VertexStateUpdateParallelismUpdated(this.vertexName, this.numTasks, oldNumTasks));
                assert (this.tasks.size() == this.numTasks);
                if (sourceEdgeManagers != null) {
                    for (Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) {
                        LOG.info((Object)("Replacing edge manager for source:" + entry.getKey() + " destination: " + this.getVertexId()));
                        Vertex sourceVertex = this.appContext.getCurrentDAG().getVertex(entry.getKey());
                        Edge edge = this.sourceVertices.get(sourceVertex);
                        try {
                            edge.setCustomEdgeManager(entry.getValue());
                        }
                        catch (Exception e) {
                            throw new TezUncheckedException((Throwable)e);
                        }
                    }
                }
                VertexParallelismUpdatedEvent parallelismUpdatedEvent = new VertexParallelismUpdatedEvent(this.vertexId, this.numTasks, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, oldNumTasks);
                this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.getDAGId(), parallelismUpdatedEvent));
                for (Edge edge : this.sourceVertices.values()) {
                    edge.stopEventBuffering();
                }
            }
            for (Map.Entry<Vertex, Edge> entry : this.targetVertices.entrySet()) {
                Edge edge = entry.getValue();
                if (edge.getEdgeProperty().getDataMovementType() != EdgeProperty.DataMovementType.ONE_TO_ONE) continue;
                VertexEventOneToOneSourceSplit event = new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(), this.getVertexId(), this.originalOneToOneSplitSource != null ? this.originalOneToOneSplitSource : this.getVertexId(), this.numTasks);
                this.getEventHandler().handle((Event)event);
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setVertexLocationHint(VertexLocationHint vertexLocationHint) {
        this.writeLock.lock();
        try {
            if (LOG.isDebugEnabled()) {
                VertexImpl.logLocationHints(this.vertexName, vertexLocationHint);
            }
            this.setTaskLocationHints(vertexLocationHint);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public void vertexReconfigurationPlanned() {
        this.vertexReconfigurationPlanned(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void vertexReconfigurationPlanned(boolean testOverride) {
        this.writeLock.lock();
        try {
            if (testOverride) {
                Preconditions.checkState((this.vmIsInitialized.get() && this.completelyConfiguredSent.get() ? 1 : 0) != 0, (Object)"test should override only failed cases");
            } else {
                Preconditions.checkState((!this.vmIsInitialized.get() ? 1 : 0) != 0, (Object)"context.vertexReconfigurationPlanned() cannot be called after initialize()");
                Preconditions.checkState((!this.completelyConfiguredSent.get() ? 1 : 0) != 0, (Object)"vertexReconfigurationPlanned()  cannot be invoked after the vertex has been configured.");
            }
            this.vertexToBeReconfiguredByManager = true;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void doneReconfiguringVertex() {
        this.writeLock.lock();
        try {
            Preconditions.checkState((boolean)this.vertexToBeReconfiguredByManager, (Object)"doneReconfiguringVertex() can be invoked only after vertexReconfigurationPlanned() is invoked");
            this.vertexToBeReconfiguredByManager = false;
            if (this.completelyConfiguredSent.compareAndSet(false, true)) {
                this.stateChangeNotifier.stateChanged(this.vertexId, new VertexStateUpdate(this.vertexName, org.apache.tez.dag.api.event.VertexState.CONFIGURED));
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handle(VertexEvent event) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Processing VertexEvent " + event.getVertexId() + " of type " + event.getType() + " while in state " + (Object)((Object)this.getInternalState()) + ". Event: " + (Object)((Object)event)));
        }
        try {
            this.writeLock.lock();
            VertexState oldState = this.getInternalState();
            try {
                this.getStateMachine().doTransition(event.getType(), (Object)event);
            }
            catch (InvalidStateTransitonException e) {
                String message = "Invalid event " + event.getType() + " on vertex " + this.vertexName + " with vertexId " + this.vertexId + " at current state " + (Object)((Object)oldState);
                LOG.error((Object)("Can't handle " + message), (Throwable)e);
                this.addDiagnostic(message);
                this.eventHandler.handle((Event)new VertexEvent(this.vertexId, VertexEventType.V_INTERNAL_ERROR));
            }
            if (oldState != this.getInternalState()) {
                LOG.info((Object)(this.logIdentifier + " transitioned from " + (Object)((Object)oldState) + " to " + (Object)((Object)this.getInternalState()) + " due to event " + event.getType()));
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private VertexState getInternalState() {
        this.readLock.lock();
        try {
            VertexState vertexState = (VertexState)this.getStateMachine().getCurrentState();
            return vertexState;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addTask(Task task) {
        Object object = this.tasksSyncHandle;
        synchronized (object) {
            if (this.lazyTasksCopyNeeded) {
                LinkedHashMap<TezTaskID, Task> newTasks = new LinkedHashMap<TezTaskID, Task>();
                newTasks.putAll(this.tasks);
                this.tasks = newTasks;
                this.lazyTasksCopyNeeded = false;
            }
        }
        this.tasks.put(task.getTaskId(), task);
    }

    void setFinishTime() {
        this.finishTime = this.clock.getTime();
    }

    void logJobHistoryVertexInitializedEvent() {
        VertexInitializedEvent initEvt = new VertexInitializedEvent(this.vertexId, this.vertexName, this.initTimeRequested, this.initedTime, this.numTasks, this.getProcessorName(), this.getAdditionalInputs());
        this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.getDAGId(), initEvt));
    }

    void logJobHistoryVertexStartedEvent() {
        VertexStartedEvent startEvt = new VertexStartedEvent(this.vertexId, this.startTimeRequested, this.startedTime);
        this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.getDAGId(), startEvt));
    }

    void logJobHistoryVertexFinishedEvent() throws IOException {
        this.setFinishTime();
        this.logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, this.finishTime, "");
    }

    void logJobHistoryVertexFailedEvent(VertexState state) throws IOException {
        this.logJobHistoryVertexCompletedHelper(state, this.clock.getTime(), StringUtils.join(this.getDiagnostics(), (String)LINE_SEPARATOR));
    }

    private void logJobHistoryVertexCompletedHelper(VertexState finalState, long finishTime, String diagnostics) throws IOException {
        HashMap<String, Integer> taskStats = new HashMap<String, Integer>();
        taskStats.put("numCompletedTasks", this.completedTaskCount);
        taskStats.put("numSucceededTasks", this.succeededTaskCount);
        taskStats.put("numFailedTasks", this.failedTaskCount);
        taskStats.put("numKilledTasks", this.killedTaskCount);
        taskStats.put("numFailedTaskAttempts", this.failedTaskAttemptCount.get());
        taskStats.put("numKilledTaskAttempts", this.killedTaskAttemptCount.get());
        VertexFinishedEvent finishEvt = new VertexFinishedEvent(this.vertexId, this.vertexName, this.initTimeRequested, this.initedTime, this.startTimeRequested, this.startedTime, finishTime, finalState, diagnostics, this.getAllCounters(), this.getVertexStats(), taskStats);
        this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.getDAGId(), finishEvt));
    }

    static VertexState checkVertexForCompletion(final VertexImpl vertex) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Checking for vertex completion for " + vertex.logIdentifier + ", numTasks=" + vertex.numTasks + ", failedTaskCount=" + vertex.failedTaskCount + ", killedTaskCount=" + vertex.killedTaskCount + ", successfulTaskCount=" + vertex.succeededTaskCount + ", completedTaskCount=" + vertex.completedTaskCount + ", terminationCause=" + (Object)((Object)vertex.terminationCause)));
        }
        if (vertex.completedTaskCount > vertex.tasks.size()) {
            LOG.error((Object)("task completion accounting issue: completedTaskCount > nTasks: for vertex " + vertex.logIdentifier + ", numTasks=" + vertex.numTasks + ", failedTaskCount=" + vertex.failedTaskCount + ", killedTaskCount=" + vertex.killedTaskCount + ", successfulTaskCount=" + vertex.succeededTaskCount + ", completedTaskCount=" + vertex.completedTaskCount + ", terminationCause=" + (Object)((Object)vertex.terminationCause)));
        }
        if (vertex.completedTaskCount == vertex.tasks.size()) {
            if (vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
                LOG.info((Object)("Vertex succeeded: " + vertex.logIdentifier));
                try {
                    if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
                        LOG.info((Object)("Invoking committer commit for vertex, vertexId=" + vertex.logIdentifier));
                        if (vertex.outputCommitters != null && !vertex.outputCommitters.isEmpty()) {
                            boolean firstCommit = true;
                            for (Map.Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
                                final OutputCommitter committer = entry.getValue();
                                final String outputName = entry.getKey();
                                if (vertex.sharedOutputs.contains(outputName)) continue;
                                if (firstCommit) {
                                    try {
                                        vertex.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(vertex.getDAGId(), new VertexCommitStartedEvent(vertex.vertexId, vertex.clock.getTime())));
                                    }
                                    catch (IOException e) {
                                        LOG.error((Object)("Failed to persist commit start event to recovery, vertex=" + vertex.logIdentifier), (Throwable)e);
                                        vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
                                        return vertex.finished(VertexState.FAILED);
                                    }
                                } else {
                                    firstCommit = false;
                                }
                                vertex.dagUgi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                                    @Override
                                    public Void run() throws Exception {
                                        LOG.info((Object)("Invoking committer commit for output=" + outputName + ", vertexId=" + vertex.logIdentifier));
                                        committer.commitOutput();
                                        return null;
                                    }
                                });
                            }
                        }
                    }
                }
                catch (Exception e) {
                    LOG.error((Object)("Failed to do commit on vertex, vertexId=" + vertex.logIdentifier), (Throwable)e);
                    vertex.trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE);
                    return vertex.finished(VertexState.FAILED);
                }
                return vertex.finished(VertexState.SUCCEEDED);
            }
            if (vertex.terminationCause == VertexTerminationCause.DAG_KILL) {
                vertex.setFinishTime();
                String diagnosticMsg = "Vertex killed due to user-initiated job kill. failedTasks:" + vertex.failedTaskCount;
                LOG.info((Object)diagnosticMsg);
                vertex.addDiagnostic(diagnosticMsg);
                vertex.abortVertex(VertexStatus.State.KILLED);
                return vertex.finished(VertexState.KILLED);
            }
            if (vertex.terminationCause == VertexTerminationCause.OTHER_VERTEX_FAILURE) {
                vertex.setFinishTime();
                String diagnosticMsg = "Vertex killed as other vertex failed. failedTasks:" + vertex.failedTaskCount;
                LOG.info((Object)diagnosticMsg);
                vertex.addDiagnostic(diagnosticMsg);
                vertex.abortVertex(VertexStatus.State.KILLED);
                return vertex.finished(VertexState.KILLED);
            }
            if (vertex.terminationCause == VertexTerminationCause.OWN_TASK_FAILURE) {
                if (vertex.failedTaskCount == 0) {
                    LOG.error((Object)"task failure accounting error.  terminationCause=TASK_FAILURE but vertex.failedTaskCount == 0");
                }
                vertex.setFinishTime();
                String diagnosticMsg = "Vertex failed as one or more tasks failed. failedTasks:" + vertex.failedTaskCount;
                LOG.info((Object)diagnosticMsg);
                vertex.addDiagnostic(diagnosticMsg);
                vertex.abortVertex(VertexStatus.State.FAILED);
                return vertex.finished(VertexState.FAILED);
            }
            if (vertex.terminationCause == VertexTerminationCause.INTERNAL_ERROR) {
                vertex.setFinishTime();
                String diagnosticMsg = "Vertex failed/killed due to internal error. failedTasks:" + vertex.failedTaskCount + " killedTasks:" + vertex.killedTaskCount;
                LOG.info((Object)diagnosticMsg);
                vertex.abortVertex(VertexStatus.State.FAILED);
                return vertex.finished(VertexState.FAILED);
            }
            if (vertex.terminationCause == VertexTerminationCause.AM_USERCODE_FAILURE) {
                vertex.setFinishTime();
                String diagnosticMsg = "Vertex failed/killed due to VertexManagerPlugin/EdgeManagerPlugin failed. failedTasks:" + vertex.failedTaskCount + " killedTasks:" + vertex.killedTaskCount;
                LOG.info((Object)diagnosticMsg);
                vertex.abortVertex(VertexStatus.State.FAILED);
                return vertex.finished(VertexState.FAILED);
            }
            if (vertex.terminationCause == VertexTerminationCause.ROOT_INPUT_INIT_FAILURE) {
                vertex.setFinishTime();
                String diagnosticMsg = "Vertex failed/killed due to ROOT_INPUT_INIT_FAILURE failed. failedTasks:" + vertex.failedTaskCount + " killedTasks:" + vertex.killedTaskCount;
                LOG.info((Object)diagnosticMsg);
                vertex.abortVertex(VertexStatus.State.FAILED);
                return vertex.finished(VertexState.FAILED);
            }
            throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex:" + vertex.logIdentifier + ", failedTaskCount=" + vertex.failedTaskCount + ", killedTaskCount=" + vertex.killedTaskCount + ", successfulTaskCount=" + vertex.succeededTaskCount + ", completedTaskCount=" + vertex.completedTaskCount + ", terminationCause=" + (Object)((Object)vertex.terminationCause));
        }
        return vertex.getInternalState();
    }

    void tryEnactKill(VertexTerminationCause trigger, TaskTerminationCause taskterminationCause) {
        if (this.trySetTerminationCause(trigger)) {
            LOG.info((Object)("Killing tasks in vertex: " + this.logIdentifier + " due to trigger: " + (Object)((Object)trigger)));
            for (Task task : this.tasks.values()) {
                this.eventHandler.handle((Event)new TaskEventTermination(task.getTaskId(), taskterminationCause));
            }
        }
    }

    VertexState finished(VertexState finalState, VertexTerminationCause terminationCause, String diag) {
        if (this.finishTime == 0L) {
            this.setFinishTime();
        }
        if (terminationCause != null) {
            this.trySetTerminationCause(terminationCause);
        }
        if (this.rootInputInitializerManager != null) {
            this.rootInputInitializerManager.shutdown();
            this.rootInputInitializerManager = null;
        }
        switch (finalState) {
            case ERROR: {
                this.addDiagnostic("Vertex: " + this.logIdentifier + " error due to:" + (Object)((Object)terminationCause));
                if (!StringUtils.isEmpty((String)diag)) {
                    this.addDiagnostic(diag);
                }
                this.eventHandler.handle((Event)new DAGEvent(this.getDAGId(), DAGEventType.INTERNAL_ERROR));
                try {
                    this.logJobHistoryVertexFailedEvent(finalState);
                }
                catch (IOException e) {
                    LOG.error((Object)"Failed to send vertex finished event to recovery", (Throwable)e);
                }
                break;
            }
            case KILLED: 
            case FAILED: {
                this.addDiagnostic("Vertex " + this.logIdentifier + " killed/failed due to:" + (Object)((Object)terminationCause));
                if (!StringUtils.isEmpty((String)diag)) {
                    this.addDiagnostic(diag);
                }
                this.eventHandler.handle((Event)new DAGEventVertexCompleted(this.getVertexId(), finalState, terminationCause));
                try {
                    this.logJobHistoryVertexFailedEvent(finalState);
                }
                catch (IOException e) {
                    LOG.error((Object)"Failed to send vertex finished event to recovery", (Throwable)e);
                }
                break;
            }
            case SUCCEEDED: {
                try {
                    this.logJobHistoryVertexFinishedEvent();
                    this.eventHandler.handle((Event)new DAGEventVertexCompleted(this.getVertexId(), finalState));
                }
                catch (IOException e) {
                    LOG.error((Object)"Failed to send vertex finished event to recovery", (Throwable)e);
                    finalState = VertexState.FAILED;
                    this.terminationCause = VertexTerminationCause.INTERNAL_ERROR;
                    this.eventHandler.handle((Event)new DAGEventVertexCompleted(this.getVertexId(), finalState));
                }
                break;
            }
            default: {
                throw new TezUncheckedException("Unexpected VertexState: " + (Object)((Object)finalState));
            }
        }
        return finalState;
    }

    VertexState finished(VertexState finalState) {
        return this.finished(finalState, null, null);
    }

    private void initializeCommitters() throws Exception {
        if (!this.additionalOutputSpecs.isEmpty()) {
            LOG.info((Object)("Invoking committer inits for vertex, vertexId=" + this.logIdentifier));
            for (Map.Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry : this.additionalOutputs.entrySet()) {
                final String outputName = entry.getKey();
                final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue();
                if (od.getControllerDescriptor() == null || ((OutputCommitterDescriptor)od.getControllerDescriptor()).getClassName() == null) {
                    LOG.info((Object)("Ignoring committer as none specified for output=" + outputName + ", vertexId=" + this.logIdentifier));
                    continue;
                }
                LOG.info((Object)("Instantiating committer for output=" + outputName + ", vertexId=" + this.logIdentifier + ", committerClass=" + ((OutputCommitterDescriptor)od.getControllerDescriptor()).getClassName()));
                this.dagUgi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                    @Override
                    public Void run() throws Exception {
                        OutputCommitterContextImpl outputCommitterContext = new OutputCommitterContextImpl(VertexImpl.this.appContext.getApplicationID(), VertexImpl.this.appContext.getApplicationAttemptId().getAttemptId(), VertexImpl.this.appContext.getCurrentDAG().getName(), VertexImpl.this.vertexName, (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>)od, VertexImpl.this.vertexId.getId());
                        OutputCommitter outputCommitter = (OutputCommitter)ReflectionUtils.createClazzInstance((String)((OutputCommitterDescriptor)od.getControllerDescriptor()).getClassName(), (Class[])new Class[]{OutputCommitterContext.class}, (Object[])new Object[]{outputCommitterContext});
                        LOG.info((Object)("Invoking committer init for output=" + outputName + ", vertexId=" + VertexImpl.this.logIdentifier));
                        outputCommitter.initialize();
                        VertexImpl.this.outputCommitters.put(outputName, outputCommitter);
                        LOG.info((Object)("Invoking committer setup for output=" + outputName + ", vertexId=" + VertexImpl.this.logIdentifier));
                        outputCommitter.setupOutput();
                        return null;
                    }
                });
            }
        }
    }

    private boolean initializeVertex() {
        try {
            this.initializeCommitters();
        }
        catch (Exception e) {
            LOG.warn((Object)("Vertex Committer init failed, vertex=" + this.logIdentifier), (Throwable)e);
            this.addDiagnostic("Vertex init failed : " + ExceptionUtils.getStackTrace((Throwable)e));
            this.trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
            this.abortVertex(VertexStatus.State.FAILED);
            this.finished(VertexState.FAILED);
            return false;
        }
        this.initedTime = this.clock.getTime();
        this.logJobHistoryVertexInitializedEvent();
        return true;
    }

    private void checkTaskLimits() {
    }

    @VisibleForTesting
    ContainerContext getContainerContext(int taskIdx) {
        if (this.taskSpecificLaunchCmdOpts.addTaskSpecificLaunchCmdOption(this.vertexName, taskIdx)) {
            String jvmOpts = this.taskSpecificLaunchCmdOpts.getTaskSpecificOption(this.javaOpts, this.vertexName, taskIdx);
            ContainerContext context = new ContainerContext(this.localResources, this.appContext.getCurrentDAG().getCredentials(), this.environment, jvmOpts);
            return context;
        }
        return this.containerContext;
    }

    private void createTasks() {
        for (int i = 0; i < this.numTasks; ++i) {
            ContainerContext conContext = this.getContainerContext(i);
            TaskImpl task = new TaskImpl(this.getVertexId(), i, this.eventHandler, this.conf, this.taskAttemptListener, this.clock, this.taskHeartbeatHandler, this.appContext, this.targetVertices != null ? this.targetVertices.isEmpty() : true, this.taskResource, conContext, this.stateChangeNotifier);
            this.addTask(task);
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug((Object)("Created task for vertex " + this.logIdentifier + ": " + task.getTaskId()));
        }
    }

    private VertexState setupVertex() {
        return this.setupVertex(null);
    }

    private VertexState setupVertex(VertexInitializedEvent event) {
        if (event == null) {
            this.initTimeRequested = this.clock.getTime();
        } else {
            this.initTimeRequested = event.getInitRequestedTime();
            this.initedTime = event.getInitedTime();
        }
        if (this.dagVertexGroups != null && !this.dagVertexGroups.isEmpty()) {
            LinkedList groupSpecList = Lists.newLinkedList();
            for (DAGImpl.VertexGroupInfo groupInfo : this.dagVertexGroups.values()) {
                if (!groupInfo.edgeMergedInputs.containsKey(this.getName())) continue;
                InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(this.getName());
                groupSpecList.add(new GroupInputSpec(groupInfo.groupName, (List)Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
            }
            if (!groupSpecList.isEmpty()) {
                this.groupInputSpecList = groupSpecList;
            }
        }
        if (event != null) {
            this.rootInputDescriptors = event.getAdditionalInputs();
        } else if (this.rootInputDescriptors != null) {
            LOG.info((Object)("Root Inputs exist for Vertex: " + this.getName() + " : " + this.rootInputDescriptors));
            for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : this.rootInputDescriptors.values()) {
                if (input.getControllerDescriptor() == null || ((InputInitializerDescriptor)input.getControllerDescriptor()).getClassName() == null) continue;
                if (this.inputsWithInitializers == null) {
                    this.inputsWithInitializers = Sets.newHashSet();
                }
                this.inputsWithInitializers.add(input.getName());
                LOG.info((Object)("Starting root input initializer for input: " + input.getName() + ", with class: [" + ((InputInitializerDescriptor)input.getControllerDescriptor()).getClassName() + "]"));
            }
        }
        boolean hasBipartite = false;
        if (this.sourceVertices != null) {
            for (Edge edge : this.sourceVertices.values()) {
                if (edge.getEdgeProperty().getDataMovementType() != EdgeProperty.DataMovementType.SCATTER_GATHER) continue;
                hasBipartite = true;
                break;
            }
        }
        if (hasBipartite && this.inputsWithInitializers != null) {
            LOG.fatal((Object)"A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
            if (event != null) {
                return VertexState.FAILED;
            }
            return this.finished(VertexState.FAILED);
        }
        this.assignVertexManager();
        try {
            this.vertexManager.initialize();
            this.vmIsInitialized.set(true);
        }
        catch (AMUserCodeException e) {
            String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + this.logIdentifier;
            LOG.error((Object)msg, (Throwable)e);
            this.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
            return VertexState.FAILED;
        }
        if (event != null) {
            int oldNumTasks = this.numTasks;
            this.numTasks = event.getNumTasks();
            this.stateChangeNotifier.stateChanged(this.vertexId, (VertexStateUpdate)new VertexStateUpdateParallelismUpdated(this.vertexName, this.numTasks, oldNumTasks));
        } else {
            this.numTasks = this.getVertexPlan().getTaskConfig().getNumTasks();
        }
        if (this.numTasks != -1 && this.numTasks < 0) {
            this.addDiagnostic("Invalid task count for vertex, numTasks=" + this.numTasks);
            this.trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
            if (event != null) {
                this.abortVertex(VertexStatus.State.FAILED);
                return this.finished(VertexState.FAILED);
            }
            return VertexState.FAILED;
        }
        this.checkTaskLimits();
        return VertexState.INITED;
    }

    private void assignVertexManager() {
        boolean hasUserVertexManager;
        boolean hasBipartite = false;
        boolean hasOneToOne = false;
        boolean hasCustom = false;
        if (this.sourceVertices != null) {
            block6: for (Edge edge : this.sourceVertices.values()) {
                switch (edge.getEdgeProperty().getDataMovementType()) {
                    case SCATTER_GATHER: {
                        hasBipartite = true;
                        continue block6;
                    }
                    case ONE_TO_ONE: {
                        hasOneToOne = true;
                        continue block6;
                    }
                    case BROADCAST: {
                        continue block6;
                    }
                    case CUSTOM: {
                        hasCustom = true;
                        continue block6;
                    }
                }
                throw new TezUncheckedException("Unknown data movement type: " + edge.getEdgeProperty().getDataMovementType());
            }
        }
        if (hasUserVertexManager = this.vertexPlan.hasVertexManagerPlugin()) {
            VertexManagerPluginDescriptor pluginDesc = DagTypeConverters.convertVertexManagerPluginDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)this.vertexPlan.getVertexManagerPlugin());
            LOG.info((Object)("Setting user vertex manager plugin: " + pluginDesc.getClassName() + " on vertex: " + this.getName()));
            this.vertexManager = new VertexManager(pluginDesc, this, this.appContext, this.stateChangeNotifier);
        } else if (this.inputsWithInitializers != null) {
            LOG.info((Object)("Setting vertexManager to RootInputVertexManager for " + this.logIdentifier));
            this.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create((String)RootInputVertexManager.class.getName()), this, this.appContext, this.stateChangeNotifier);
        } else if (hasOneToOne && !hasCustom) {
            LOG.info((Object)("Setting vertexManager to InputReadyVertexManager for " + this.logIdentifier));
            this.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create((String)InputReadyVertexManager.class.getName()), this, this.appContext, this.stateChangeNotifier);
        } else if (hasBipartite && !hasCustom) {
            LOG.info((Object)("Setting vertexManager to ShuffleVertexManager for " + this.logIdentifier));
            this.vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder((Configuration)this.conf).build(), this, this.appContext, this.stateChangeNotifier);
        } else {
            LOG.info((Object)("Setting vertexManager to ImmediateStartVertexManager for " + this.logIdentifier));
            this.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create((String)ImmediateStartVertexManager.class.getName()), this, this.appContext, this.stateChangeNotifier);
        }
    }

    private void recoveryCodeSimulatingStart() throws AMUserCodeException {
        this.vertexManager.onVertexStarted(this.pendingReportedSrcCompletions);
        this.maybeSendConfiguredEvent();
    }

    private void routeRecoveredEvents(VertexState vertexState, List<TezEvent> tezEvents) {
        for (TezEvent tezEvent : tezEvents) {
            EventMetaData sourceMeta = tezEvent.getSourceInfo();
            TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
            if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
                ((DataMovementEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
            } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
                ((CompositeDataMovementEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
            } else if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
                ((InputFailedEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
            } else {
                if (tezEvent.getEventType() == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
                    if (vertexState != VertexState.RUNNING && vertexState != VertexState.INITED) continue;
                    this.eventHandler.handle((Event)new VertexEventRouteEvent(this.getVertexId(), Collections.singletonList(tezEvent), true));
                    continue;
                }
                if (tezEvent.getEventType() == EventType.ROOT_INPUT_INITIALIZER_EVENT) {
                    InputInitializerEvent iiEvent = (InputInitializerEvent)tezEvent.getEvent();
                    iiEvent.setSourceVertexName(this.vertexName);
                    this.eventHandler.handle((Event)new VertexEventRouteEvent(this.getDAG().getVertex(iiEvent.getTargetVertexName()).getVertexId(), Collections.singletonList(tezEvent), true));
                    continue;
                }
            }
            Vertex destVertex = this.getDAG().getVertex(sourceMeta.getEdgeVertexName());
            Edge destEdge = this.targetVertices.get(destVertex);
            if (destEdge == null) {
                throw new TezUncheckedException("Bad destination vertex: " + sourceMeta.getEdgeVertexName() + " for event vertex: " + this.getVertexId());
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Routing recovered event, vertex=" + this.logIdentifier + ", eventType=" + tezEvent.getEventType() + ", sourceInfo=" + sourceMeta + ", destinationVertex" + destVertex.getName()));
            }
            this.eventHandler.handle((Event)new VertexEventRouteEvent(destVertex.getVertexId(), Collections.singletonList(tezEvent), true));
        }
    }

    @VisibleForTesting
    protected RootInputInitializerManager createRootInputInitializerManager(String dagName, String vertexName, TezVertexID vertexID, EventHandler eventHandler, int numTasks, int numNodes, Resource vertexTaskResource, Resource totalResource) {
        return new RootInputInitializerManager(this, this.appContext, this.dagUgi, this.stateChangeNotifier);
    }

    private boolean initializeVertexInInitializingState() {
        boolean isInitialized = this.initializeVertex();
        return isInitialized;
    }

    void startIfPossible() {
        if (this.startSignalPending) {
            LOG.info((Object)("Triggering start event for vertex: " + this.logIdentifier + " with distanceFromRoot: " + this.distanceFromRoot));
            this.eventHandler.handle((Event)new VertexEvent(this.vertexId, VertexEventType.V_START));
        }
    }

    boolean canInitVertex() {
        if (this.numTasks >= 0 && this.uninitializedEdges.isEmpty() && !this.initWaitsForRootInitializers) {
            return true;
        }
        LOG.info((Object)("Cannot init vertex: " + this.logIdentifier + " numTasks: " + this.numTasks + " numUnitializedEdges: " + this.uninitializedEdges.size() + " numInitializedInputs: " + this.numInitializedInputs + " initWaitsForRootInitializers: " + this.initWaitsForRootInitializers));
        return false;
    }

    private void maybeSendConfiguredEvent() {
        Preconditions.checkState((boolean)this.canInitVertex());
        if (!this.vertexToBeReconfiguredByManager && this.completelyConfiguredSent.compareAndSet(false, true)) {
            this.stateChangeNotifier.stateChanged(this.vertexId, new VertexStateUpdate(this.vertexName, org.apache.tez.dag.api.event.VertexState.CONFIGURED));
        }
    }

    private VertexState startVertex() {
        Preconditions.checkState((this.getState() == VertexState.INITED ? 1 : 0) != 0, (Object)("Vertex must be inited " + this.logIdentifier));
        this.startedTime = this.clock.getTime();
        try {
            this.vertexManager.onVertexStarted(this.pendingReportedSrcCompletions);
        }
        catch (AMUserCodeException e) {
            String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + this.logIdentifier;
            LOG.error((Object)msg, (Throwable)e);
            this.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
            this.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
            return VertexState.TERMINATING;
        }
        this.pendingReportedSrcCompletions.clear();
        this.logJobHistoryVertexStartedEvent();
        this.maybeSendConfiguredEvent();
        if (this.targetVertices != null) {
            for (Vertex targetVertex : this.targetVertices.keySet()) {
                this.eventHandler.handle((Event)new VertexEventSourceVertexStarted(targetVertex.getVertexId(), this.getVertexId(), this.distanceFromRoot));
            }
        }
        if (this.numTasks == 0) {
            this.eventHandler.handle((Event)new VertexEvent(this.vertexId, VertexEventType.V_COMPLETED));
        }
        return VertexState.RUNNING;
    }

    private void abortVertex(final VertexStatus.State finalState) {
        if (this.aborted.getAndSet(true)) {
            LOG.info((Object)("Ignoring multiple aborts for vertex: " + this.logIdentifier));
            return;
        }
        LOG.info((Object)("Invoking committer abort for vertex, vertexId=" + this.logIdentifier));
        if (this.outputCommitters != null) {
            try {
                this.dagUgi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                    @Override
                    public Void run() {
                        for (Map.Entry entry : VertexImpl.this.outputCommitters.entrySet()) {
                            try {
                                LOG.info((Object)("Invoking committer abort for output=" + (String)entry.getKey() + ", vertexId=" + VertexImpl.this.logIdentifier));
                                ((OutputCommitter)entry.getValue()).abortOutput(finalState);
                            }
                            catch (Exception e) {
                                LOG.warn((Object)("Could not abort committer for output=" + (String)entry.getKey() + ", vertexId=" + VertexImpl.this.logIdentifier), (Throwable)e);
                            }
                        }
                        return null;
                    }
                });
            }
            catch (Exception e) {
                throw new TezUncheckedException("Unknown error while attempting VertexCommitter(s) abort", (Throwable)e);
            }
        }
        if (this.finishTime == 0L) {
            this.setFinishTime();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mayBeConstructFinalFullCounters() {
        Object object = this.fullCountersLock;
        synchronized (object) {
            if (this.fullCounters != null) {
                return;
            }
            this.constructFinalFullcounters();
        }
    }

    @InterfaceAudience.Private
    public void constructFinalFullcounters() {
        this.fullCounters = new TezCounters();
        this.vertexStats = new VertexStats();
        for (Task t : this.tasks.values()) {
            this.vertexStats.updateStats(t.getReport());
            TezCounters counters = t.getCounters();
            this.fullCounters.incrAllCounters((AbstractCounters)counters);
        }
    }

    private void addDiagnostic(String diag) {
        this.diagnostics.add(diag);
    }

    private static boolean isEventFromVertex(Vertex vertex, EventMetaData sourceMeta) {
        return sourceMeta.getTaskVertexName().equals(vertex.getName());
    }

    private static void checkEventSourceMetadata(Vertex vertex, EventMetaData sourceMeta) {
        assert (VertexImpl.isEventFromVertex(vertex, sourceMeta));
    }

    private static void handleRoutedTezEvents(VertexImpl vertex, List<TezEvent> tezEvents, boolean recovered) throws AMUserCodeException {
        if (vertex.getAppContext().isRecoveryEnabled() && !recovered && !tezEvents.isEmpty()) {
            ArrayList recoveryEvents = Lists.newArrayList();
            for (TezEvent tezEvent : tezEvents) {
                if (!VertexImpl.isEventFromVertex(vertex, tezEvent.getSourceInfo()) || !tezEvent.getEventType().equals((Object)EventType.COMPOSITE_DATA_MOVEMENT_EVENT) && !tezEvent.getEventType().equals((Object)EventType.DATA_MOVEMENT_EVENT) && !tezEvent.getEventType().equals((Object)EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) && !tezEvent.getEventType().equals((Object)EventType.ROOT_INPUT_INITIALIZER_EVENT)) continue;
                recoveryEvents.add(tezEvent);
            }
            if (!recoveryEvents.isEmpty()) {
                VertexRecoverableEventsGeneratedEvent historyEvent = new VertexRecoverableEventsGeneratedEvent(vertex.vertexId, recoveryEvents);
                vertex.appContext.getHistoryHandler().handle(new DAGHistoryEvent(vertex.getDAGId(), historyEvent));
            }
        }
        block11: for (TezEvent tezEvent : tezEvents) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Vertex: " + vertex.getName() + " routing event: " + tezEvent.getEventType() + " Recovered:" + recovered));
            }
            EventMetaData sourceMeta = tezEvent.getSourceInfo();
            switch (tezEvent.getEventType()) {
                case INPUT_FAILED_EVENT: 
                case DATA_MOVEMENT_EVENT: 
                case COMPOSITE_DATA_MOVEMENT_EVENT: {
                    if (VertexImpl.isEventFromVertex(vertex, sourceMeta)) {
                        TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
                        if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
                            ((DataMovementEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
                        } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
                            ((CompositeDataMovementEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
                        } else {
                            ((InputFailedEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
                        }
                        Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName());
                        Edge destEdge = vertex.targetVertices.get(destVertex);
                        if (destEdge == null) {
                            throw new TezUncheckedException("Bad destination vertex: " + sourceMeta.getEdgeVertexName() + " for event vertex: " + vertex.getVertexId());
                        }
                        vertex.eventHandler.handle((Event)new VertexEventRouteEvent(destVertex.getVertexId(), Collections.singletonList(tezEvent)));
                        break;
                    }
                    if (vertex.tasksNotYetScheduled) {
                        vertex.pendingTaskEvents.add(tezEvent);
                        break;
                    }
                    Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(sourceMeta.getTaskVertexName()));
                    if (srcEdge == null) {
                        throw new TezUncheckedException("Bad source vertex: " + sourceMeta.getTaskVertexName() + " for destination vertex: " + vertex.getVertexId());
                    }
                    srcEdge.sendTezEventToDestinationTasks(tezEvent);
                    break;
                }
                case ROOT_INPUT_DATA_INFORMATION_EVENT: {
                    if (vertex.tasksNotYetScheduled) {
                        vertex.pendingTaskEvents.add(tezEvent);
                        break;
                    }
                    VertexImpl.checkEventSourceMetadata(vertex, sourceMeta);
                    InputInitializerEvent riEvent = (InputDataInformationEvent)tezEvent.getEvent();
                    Task targetTask = vertex.getTask(riEvent.getTargetIndex());
                    targetTask.registerTezEvent(tezEvent);
                    break;
                }
                case VERTEX_MANAGER_EVENT: {
                    VertexManagerEvent vmEvent = (VertexManagerEvent)tezEvent.getEvent();
                    Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
                    Preconditions.checkArgument((target != null ? 1 : 0) != 0, (Object)("Event sent to unkown vertex: " + vmEvent.getTargetVertexName()));
                    if (target == vertex) {
                        vertex.vertexManager.onVertexManagerEventReceived(vmEvent);
                        break;
                    }
                    VertexImpl.checkEventSourceMetadata(vertex, sourceMeta);
                    vertex.eventHandler.handle((Event)new VertexEventRouteEvent(target.getVertexId(), Collections.singletonList(tezEvent)));
                    break;
                }
                case ROOT_INPUT_INITIALIZER_EVENT: {
                    InputInitializerEvent riEvent = (InputInitializerEvent)tezEvent.getEvent();
                    Vertex target = vertex.getDAG().getVertex(riEvent.getTargetVertexName());
                    Preconditions.checkArgument((target != null ? 1 : 0) != 0, (Object)("Event sent to unknown vertex: " + riEvent.getTargetVertexName()));
                    riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName());
                    if (target == vertex) {
                        if (vertex.rootInputDescriptors == null || !vertex.rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
                            throw new TezUncheckedException("InputInitializerEvent targeted at unknown initializer on vertex " + vertex.logIdentifier + ", Event=" + riEvent);
                        }
                        if (vertex.getState() == VertexState.NEW) {
                            vertex.pendingInitializerEvents.add(tezEvent);
                            break;
                        }
                        if (vertex.getState() == VertexState.INITIALIZING) {
                            vertex.rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
                            break;
                        }
                        if (!LOG.isDebugEnabled()) continue block11;
                        LOG.debug((Object)("Dropping event" + tezEvent + " since state is not INITIALIZING in " + vertex.getLogIdentifier() + ", state=" + (Object)((Object)vertex.getState())));
                        break;
                    }
                    VertexImpl.checkEventSourceMetadata(vertex, sourceMeta);
                    vertex.eventHandler.handle((Event)new VertexEventRouteEvent(target.getVertexId(), Collections.singletonList(tezEvent)));
                    break;
                }
                case INPUT_READ_ERROR_EVENT: {
                    VertexImpl.checkEventSourceMetadata(vertex, sourceMeta);
                    Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName()));
                    srcEdge.sendTezEventToSourceTasks(tezEvent);
                    break;
                }
                case TASK_STATUS_UPDATE_EVENT: {
                    VertexImpl.checkEventSourceMetadata(vertex, sourceMeta);
                    TaskStatusUpdateEvent sEvent = (TaskStatusUpdateEvent)tezEvent.getEvent();
                    vertex.getEventHandler().handle((Event)new TaskAttemptEventStatusUpdate(sourceMeta.getTaskAttemptID(), sEvent));
                    break;
                }
                case TASK_ATTEMPT_COMPLETED_EVENT: {
                    VertexImpl.checkEventSourceMetadata(vertex, sourceMeta);
                    vertex.getEventHandler().handle((Event)new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
                    break;
                }
                case TASK_ATTEMPT_FAILED_EVENT: {
                    VertexImpl.checkEventSourceMetadata(vertex, sourceMeta);
                    TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent)tezEvent.getEvent();
                    vertex.getEventHandler().handle((Event)new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_FAILED, "Error: " + taskFailedEvent.getDiagnostics()));
                    break;
                }
                default: {
                    throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType());
                }
            }
        }
    }

    private void setupInputInitializerManager() {
        this.rootInputInitializerManager = this.createRootInputInitializerManager(this.getDAG().getName(), this.getName(), this.getVertexId(), this.eventHandler, this.getTotalTasks(), this.appContext.getTaskScheduler().getNumClusterNodes(), this.getTaskResource(), this.appContext.getTaskScheduler().getTotalResources());
        ArrayList inputList = Lists.newArrayListWithCapacity((int)this.inputsWithInitializers.size());
        for (String inputName : this.inputsWithInitializers) {
            inputList.add(this.rootInputDescriptors.get(inputName));
        }
        LOG.info((Object)("Vertex will initialize via inputInitializers " + this.logIdentifier + ". Starting root input initializers: " + this.inputsWithInitializers.size()));
        this.initWaitsForRootInitializers = true;
        this.rootInputInitializerManager.runInputInitializers(inputList);
        this.rootInputInitializerManager.handleInitializerEvents(this.pendingInitializerEvents);
        this.pendingInitializerEvents.clear();
    }

    @Override
    public void setInputVertices(Map<Vertex, Edge> inVertices) {
        this.sourceVertices = inVertices;
    }

    @Override
    public void setOutputVertices(Map<Vertex, Edge> outVertices) {
        this.targetVertices = outVertices;
    }

    @Override
    public void setAdditionalInputs(List<DAGProtos.RootInputLeafOutputProto> inputs) {
        this.rootInputDescriptors = Maps.newHashMapWithExpectedSize((int)inputs.size());
        for (DAGProtos.RootInputLeafOutputProto input : inputs) {
            InputDescriptor id = DagTypeConverters.convertInputDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)input.getIODescriptor());
            this.rootInputDescriptors.put(input.getName(), (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>)new RootInputLeafOutput(input.getName(), (EntityDescriptor)id, input.hasControllerDescriptor() ? DagTypeConverters.convertInputInitializerDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)input.getControllerDescriptor()) : null));
            this.rootInputSpecs.put(input.getName(), DEFAULT_ROOT_INPUT_SPECS);
        }
    }

    @Override
    @Nullable
    public Map<String, OutputCommitter> getOutputCommitters() {
        return this.outputCommitters;
    }

    @Nullable
    @InterfaceAudience.Private
    @VisibleForTesting
    public OutputCommitter getOutputCommitter(String outputName) {
        if (this.outputCommitters != null) {
            return this.outputCommitters.get(outputName);
        }
        return null;
    }

    @Override
    public void setAdditionalOutputs(List<DAGProtos.RootInputLeafOutputProto> outputs) {
        LOG.info((Object)("setting additional outputs for vertex " + this.vertexName));
        this.additionalOutputs = Maps.newHashMapWithExpectedSize((int)outputs.size());
        this.outputCommitters = Maps.newHashMapWithExpectedSize((int)outputs.size());
        for (DAGProtos.RootInputLeafOutputProto output : outputs) {
            OutputDescriptor od = DagTypeConverters.convertOutputDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)output.getIODescriptor());
            this.additionalOutputs.put(output.getName(), (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>)new RootInputLeafOutput(output.getName(), (EntityDescriptor)od, output.hasControllerDescriptor() ? DagTypeConverters.convertOutputCommitterDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)output.getControllerDescriptor()) : null));
            OutputSpec outputSpec = new OutputSpec(output.getName(), od, 0);
            this.additionalOutputSpecs.add(outputSpec);
        }
    }

    @Override
    @Nullable
    public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> getAdditionalInputs() {
        return this.rootInputDescriptors;
    }

    @Override
    @Nullable
    public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> getAdditionalOutputs() {
        return this.additionalOutputs;
    }

    @Override
    public int compareTo(Vertex other) {
        return this.vertexId.compareTo((TezID)other.getVertexId());
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (this.getClass() != obj.getClass()) {
            return false;
        }
        Vertex other = (Vertex)obj;
        return this.vertexId.equals((Object)other.getVertexId());
    }

    public int hashCode() {
        int prime = 11239;
        return 11239 + 11239 * this.vertexId.hashCode();
    }

    @Override
    public Map<Vertex, Edge> getInputVertices() {
        return Collections.unmodifiableMap(this.sourceVertices);
    }

    @Override
    public Map<Vertex, Edge> getOutputVertices() {
        return Collections.unmodifiableMap(this.targetVertices);
    }

    @Override
    public int getInputVerticesCount() {
        return this.sourceVertices.size();
    }

    @Override
    public int getOutputVerticesCount() {
        return this.targetVertices.size();
    }

    @Override
    public ProcessorDescriptor getProcessorDescriptor() {
        return this.processorDescriptor;
    }

    @Override
    public DAG getDAG() {
        return this.appContext.getCurrentDAG();
    }

    private TezDAGID getDAGId() {
        return this.getDAG().getID();
    }

    @Override
    public Resource getTaskResource() {
        return this.taskResource;
    }

    @VisibleForTesting
    String getProcessorName() {
        return this.processorDescriptor.getClassName();
    }

    @VisibleForTesting
    String getJavaOpts() {
        return this.javaOpts;
    }

    @VisibleForTesting
    TaskLocationHint[] getTaskLocationHints() {
        return this.taskLocationHints;
    }

    @Override
    public synchronized List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException {
        ArrayList<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount() + (this.rootInputDescriptors == null ? 0 : this.rootInputDescriptors.size()));
        if (this.rootInputDescriptors != null) {
            for (Map.Entry<Object, Object> entry : this.rootInputDescriptors.entrySet()) {
                inputSpecList.add(new InputSpec((String)entry.getKey(), (InputDescriptor)((RootInputLeafOutput)entry.getValue()).getIODescriptor(), this.rootInputSpecs.get(entry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
            }
        }
        for (Map.Entry<Object, Object> entry : this.getInputVertices().entrySet()) {
            InputSpec inputSpec = ((Edge)entry.getValue()).getDestinationSpec(taskIndex);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("For vertex : " + this.getName() + ", Using InputSpec : " + inputSpec));
            }
            inputSpecList.add(inputSpec);
        }
        return inputSpecList;
    }

    @Override
    public synchronized List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException {
        ArrayList<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount() + this.additionalOutputSpecs.size());
        outputSpecList.addAll(this.additionalOutputSpecs);
        for (Map.Entry<Vertex, Edge> entry : this.getOutputVertices().entrySet()) {
            OutputSpec outputSpec = entry.getValue().getSourceSpec(taskIndex);
            outputSpecList.add(outputSpec);
        }
        return outputSpecList;
    }

    @Override
    public synchronized List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
        return this.groupInputSpecList;
    }

    @Override
    public synchronized void addSharedOutputs(Set<String> outputs) {
        this.sharedOutputs.addAll(outputs);
    }

    @Override
    public synchronized Set<String> getSharedOutputs() {
        return this.sharedOutputs;
    }

    @VisibleForTesting
    VertexManager getVertexManager() {
        return this.vertexManager;
    }

    private static void logLocationHints(String vertexName, VertexLocationHint locationHint) {
        if (locationHint == null) {
            LOG.debug((Object)("No Vertex LocationHint specified for vertex=" + vertexName));
            return;
        }
        HashMultiset hosts = HashMultiset.create();
        HashMultiset racks = HashMultiset.create();
        int counter = 0;
        for (TaskLocationHint taskLocationHint : locationHint.getTaskLocationHints()) {
            StringBuilder sb = new StringBuilder();
            if (taskLocationHint.getHosts() == null) {
                sb.append("No Hosts");
            } else {
                sb.append("Hosts: ");
                for (String host : taskLocationHint.getHosts()) {
                    hosts.add((Object)host);
                    sb.append(host).append(", ");
                }
            }
            if (taskLocationHint.getRacks() == null) {
                sb.append("No Racks");
            } else {
                sb.append("Racks: ");
                for (String rack : taskLocationHint.getRacks()) {
                    racks.add((Object)rack);
                    sb.append(rack).append(", ");
                }
            }
            LOG.debug((Object)("Vertex: " + vertexName + ", Location: " + counter + " : " + sb.toString()));
            ++counter;
        }
        LOG.debug((Object)("Vertex: " + vertexName + ", Host Counts"));
        for (Multiset.Entry host : hosts.entrySet()) {
            LOG.debug((Object)("Vertex: " + vertexName + ", host: " + host.toString()));
        }
        LOG.debug((Object)("Vertex: " + vertexName + ", Rack Counts"));
        for (Multiset.Entry rack : racks.entrySet()) {
            LOG.debug((Object)("Vertex: " + vertexName + ", rack: " + rack.toString()));
        }
    }

    private static class VertexStateChangedCallback
    implements OnStateChangedCallback<VertexState, VertexImpl> {
        private VertexStateChangedCallback() {
        }

        @Override
        public void onStateChanged(VertexImpl vertex, VertexState vertexState) {
            vertex.stateChangeNotifier.stateChanged(vertex.getVertexId(), new VertexStateUpdate(vertex.getName(), this.convertInternalState(vertexState, vertex.getVertexId())));
        }

        private org.apache.tez.dag.api.event.VertexState convertInternalState(VertexState vertexState, TezVertexID vertexId) {
            switch (vertexState) {
                case RUNNING: {
                    return org.apache.tez.dag.api.event.VertexState.RUNNING;
                }
                case SUCCEEDED: {
                    return org.apache.tez.dag.api.event.VertexState.SUCCEEDED;
                }
                case FAILED: {
                    return org.apache.tez.dag.api.event.VertexState.FAILED;
                }
                case KILLED: {
                    return org.apache.tez.dag.api.event.VertexState.KILLED;
                }
            }
            throw new TezUncheckedException("Not expecting state updates for state: " + (Object)((Object)vertexState) + ", VertexID: " + vertexId);
        }
    }

    private static class InternalErrorTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private InternalErrorTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            LOG.error((Object)("Invalid event " + event.getType() + " on Vertex " + vertex.getVertexId()));
            vertex.eventHandler.handle((Event)new DAGEventDiagnosticsUpdate(vertex.getDAGId(), "Invalid event " + event.getType() + " on Vertex " + vertex.getVertexId()));
            vertex.setFinishTime();
            vertex.finished(VertexState.ERROR);
        }
    }

    private static class RouteEventTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private RouteEventTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventRouteEvent rEvent = (VertexEventRouteEvent)event;
            boolean recovered = rEvent.isRecovered();
            List<TezEvent> tezEvents = rEvent.getEvents();
            try {
                VertexImpl.handleRoutedTezEvents(vertex, tezEvents, recovered);
            }
            catch (AMUserCodeException e) {
                String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + vertex.getLogIdentifier();
                LOG.error((Object)msg, (Throwable)e);
                if (vertex.getState() == VertexState.RUNNING) {
                    vertex.addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                    vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
                    return VertexState.TERMINATING;
                }
                vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                return VertexState.FAILED;
            }
            return vertex.getState();
        }
    }

    private static class TaskRescheduledAfterVertexSuccessTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private TaskRescheduledAfterVertexSuccessTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            if (vertex.outputCommitters == null || vertex.outputCommitters.isEmpty() || !vertex.commitVertexOutputs) {
                LOG.info((Object)(vertex.getVertexId() + " back to running due to rescheduling " + ((VertexEventTaskReschedule)event).getTaskID()));
                new TaskRescheduledTransition().transition(vertex, event);
                vertex.eventHandler.handle((Event)new DAGEventVertexReRunning(vertex.getVertexId()));
                return VertexState.RUNNING;
            }
            String diagnosticMsg = vertex.getVertexId() + " failed due to post-commit rescheduling of " + ((VertexEventTaskReschedule)event).getTaskID();
            LOG.info((Object)diagnosticMsg);
            vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OWN_TASK_FAILURE);
            vertex.abortVertex(VertexStatus.State.FAILED);
            vertex.finished(VertexState.FAILED, VertexTerminationCause.OWN_TASK_FAILURE, diagnosticMsg);
            return VertexState.FAILED;
        }
    }

    private static class TaskCompletedAfterVertexSuccessTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private TaskCompletedAfterVertexSuccessTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            String diagnosticMsg;
            VertexStatus.State finalStatus;
            VertexState finalState;
            VertexEventTaskCompleted vEvent = (VertexEventTaskCompleted)event;
            if (vEvent.getState() == TaskState.FAILED) {
                finalState = VertexState.FAILED;
                finalStatus = VertexStatus.State.FAILED;
                diagnosticMsg = "Vertex " + vertex.logIdentifier + " failed as task " + vEvent.getTaskID() + " failed after vertex succeeded.";
            } else {
                finalState = VertexState.ERROR;
                finalStatus = VertexStatus.State.ERROR;
                diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() + " completed with state " + (Object)((Object)vEvent.getState()) + " after vertex succeeded.";
            }
            LOG.info((Object)diagnosticMsg);
            vertex.abortVertex(finalStatus);
            vertex.finished(finalState, VertexTerminationCause.OWN_TASK_FAILURE, diagnosticMsg);
            return finalState;
        }
    }

    private static class VertexNoTasksCompletedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private VertexNoTasksCompletedTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            return VertexImpl.checkVertexForCompletion(vertex);
        }
    }

    private static class TaskRescheduledTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private TaskRescheduledTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            --vertex.completedTaskCount;
            --vertex.succeededTaskCount;
        }
    }

    private static class TaskCompletedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private TaskCompletedTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            boolean forceTransitionToKillWait = false;
            ++vertex.completedTaskCount;
            LOG.info((Object)("Num completed Tasks for " + vertex.logIdentifier + " : " + vertex.completedTaskCount));
            VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted)event;
            Task task = vertex.tasks.get(taskEvent.getTaskID());
            if (taskEvent.getState() == TaskState.SUCCEEDED) {
                this.taskSucceeded(vertex, task);
            } else if (taskEvent.getState() == TaskState.FAILED) {
                LOG.info((Object)("Failing vertex: " + vertex.logIdentifier + " because task failed: " + taskEvent.getTaskID()));
                vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE);
                forceTransitionToKillWait = true;
                this.taskFailed(vertex, task);
            } else if (taskEvent.getState() == TaskState.KILLED) {
                this.taskKilled(vertex, task);
            }
            VertexState state = VertexImpl.checkVertexForCompletion(vertex);
            if (state == VertexState.RUNNING && forceTransitionToKillWait) {
                return VertexState.TERMINATING;
            }
            return state;
        }

        private void taskSucceeded(VertexImpl vertex, Task task) {
            ++vertex.succeededTaskCount;
        }

        private void taskFailed(VertexImpl vertex, Task task) {
            ++vertex.failedTaskCount;
            vertex.addDiagnostic("Task failed, taskId=" + task.getTaskId() + ", diagnostics=" + task.getDiagnostics());
        }

        private void taskKilled(VertexImpl vertex, Task task) {
            ++vertex.killedTaskCount;
        }
    }

    private static class TaskAttemptCompletedEventTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private TaskAttemptCompletedEventTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            VertexEventTaskAttemptCompleted completionEvent = (VertexEventTaskAttemptCompleted)event;
            if (vertex.targetVertices != null) {
                for (Vertex targetVertex : vertex.targetVertices.keySet()) {
                    vertex.eventHandler.handle((Event)new VertexEventSourceTaskAttemptCompleted(targetVertex.getVertexId(), completionEvent));
                }
            }
        }
    }

    private static class SourceTaskAttemptCompletedEventTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private SourceTaskAttemptCompletedEventTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventTaskAttemptCompleted completionEvent = ((VertexEventSourceTaskAttemptCompleted)event).getCompletionEvent();
            LOG.info((Object)("Source task attempt completed for vertex: " + vertex.getVertexId() + " attempt: " + completionEvent.getTaskAttemptId() + " with state: " + (Object)((Object)completionEvent.getTaskAttemptState()) + " vertexState: " + (Object)((Object)vertex.getState())));
            if (TaskAttemptStateInternal.SUCCEEDED.equals((Object)completionEvent.getTaskAttemptState())) {
                ++vertex.numSuccessSourceAttemptCompletions;
                if (vertex.getState() == VertexState.RUNNING) {
                    try {
                        vertex.vertexManager.onSourceTaskCompleted(completionEvent.getTaskAttemptId().getTaskID());
                    }
                    catch (AMUserCodeException e) {
                        String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                        LOG.error((Object)msg, (Throwable)e);
                        vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                        vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
                        return VertexState.TERMINATING;
                    }
                } else {
                    vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId());
                }
            }
            return vertex.getState();
        }
    }

    private static class VertexManagerUserCodeErrorTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private VertexManagerUserCodeErrorTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventManagerUserCodeError errEvent = (VertexEventManagerUserCodeError)event;
            AMUserCodeException e = errEvent.getError();
            String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
            LOG.error((Object)msg, (Throwable)e);
            if (vertex.getState() == VertexState.RECOVERING) {
                LOG.info((Object)"Received a user code error during recovering, setting recovered state to FAILED");
                vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                vertex.terminationCause = VertexTerminationCause.AM_USERCODE_FAILURE;
                vertex.recoveredState = VertexState.FAILED;
                return VertexState.RECOVERING;
            }
            if (vertex.getState() == VertexState.RUNNING) {
                vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
                return VertexState.TERMINATING;
            }
            vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
            return VertexState.FAILED;
        }
    }

    private static class VertexKilledTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private VertexKilledTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            vertex.addDiagnostic("Vertex received Kill while in RUNNING state.");
            VertexEventTermination vet = (VertexEventTermination)event;
            VertexTerminationCause trigger = vet.getTerminationCause();
            switch (trigger) {
                case DAG_KILL: {
                    vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL);
                    break;
                }
                case OWN_TASK_FAILURE: {
                    vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE);
                    break;
                }
                case ROOT_INPUT_INIT_FAILURE: 
                case COMMIT_FAILURE: 
                case INVALID_NUM_OF_TASKS: 
                case INIT_FAILURE: 
                case INTERNAL_ERROR: 
                case AM_USERCODE_FAILURE: 
                case OTHER_VERTEX_FAILURE: {
                    vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE);
                    break;
                }
                default: {
                    throw new TezUncheckedException("VertexKilledTransition: event.terminationCause is unexpected: " + (Object)((Object)trigger));
                }
            }
        }
    }

    private static class TerminateInitingVertexTransition
    extends TerminateInitedVertexTransition {
        private TerminateInitingVertexTransition() {
        }

        @Override
        public void transition(VertexImpl vertex, VertexEvent event) {
            super.transition(vertex, event);
        }
    }

    private static class TerminateInitedVertexTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private TerminateInitedVertexTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            VertexEventTermination vet = (VertexEventTermination)event;
            vertex.trySetTerminationCause(vet.getTerminationCause());
            vertex.abortVertex(VertexStatus.State.KILLED);
            vertex.addDiagnostic("Vertex received Kill in INITED state.");
            vertex.finished(VertexState.KILLED);
        }
    }

    private static class TerminateNewVertexTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private TerminateNewVertexTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            VertexEventTermination vet = (VertexEventTermination)event;
            vertex.trySetTerminationCause(vet.getTerminationCause());
            vertex.setFinishTime();
            vertex.addDiagnostic("Vertex received Kill in NEW state.");
            vertex.finished(VertexState.KILLED);
        }
    }

    private static class RootInputInitFailedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private RootInputInitFailedTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventRootInputFailed fe = (VertexEventRootInputFailed)event;
            String msg = "Vertex Input: " + fe.getInputName() + " initializer failed, vertex=" + vertex.getLogIdentifier();
            LOG.error((Object)msg, (Throwable)fe.getError());
            if (vertex.getState() == VertexState.RUNNING) {
                vertex.addDiagnostic(msg + ", " + ExceptionUtils.getStackTrace((Throwable)fe.getError().getCause()));
                vertex.tryEnactKill(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
                return VertexState.TERMINATING;
            }
            vertex.finished(VertexState.FAILED, VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, msg + ", " + ExceptionUtils.getStackTrace((Throwable)fe.getError().getCause()));
            return VertexState.FAILED;
        }
    }

    public static class StartTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            Preconditions.checkState((vertex.getState() == VertexState.INITED ? 1 : 0) != 0, (Object)("Unexpected state " + (Object)((Object)vertex.getState()) + " for " + vertex.logIdentifier));
            vertex.startTimeRequested = vertex.clock.getTime();
            return vertex.startVertex();
        }
    }

    public static class StartWhileInitializingTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        public void transition(VertexImpl vertex, VertexEvent event) {
            Preconditions.checkState((vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ? 1 : 0) != 0, (Object)("Vertex: " + vertex.logIdentifier + " got invalid start event"));
            vertex.startTimeRequested = vertex.clock.getTime();
            vertex.startSignalPending = true;
        }
    }

    public static class SourceVertexStartedTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        public void transition(VertexImpl vertex, VertexEvent event) {
            VertexEventSourceVertexStarted startEvent = (VertexEventSourceVertexStarted)event;
            int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1;
            if (vertex.distanceFromRoot < distanceFromRoot) {
                vertex.distanceFromRoot = distanceFromRoot;
            }
            ++vertex.numStartedSourceVertices;
            LOG.info((Object)("Source vertex started: " + startEvent.getSourceVertexId() + " for vertex: " + vertex.getVertexId() + " numStartedSources: " + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size()));
            if (vertex.numStartedSourceVertices < vertex.sourceVertices.size()) {
                LOG.info((Object)("Cannot start vertex: " + vertex.logIdentifier + " numStartedSources: " + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size()));
                return;
            }
            vertex.startSignalPending = true;
            if (vertex.getState() != VertexState.INITED) {
                LOG.info((Object)("Cannot start vertex. Not in inited state. " + vertex.logIdentifier + " . VertesState: " + (Object)((Object)vertex.getState()) + " numTasks: " + vertex.numTasks + " Num uninitialized edges: " + vertex.uninitializedEdges.size()));
                return;
            }
            Preconditions.checkState((vertex.numTasks >= 0 && vertex.uninitializedEdges.isEmpty() ? 1 : 0) != 0, (Object)("Cannot start vertex that is not completely defined. Vertex: " + vertex.logIdentifier + " numTasks: " + vertex.numTasks));
            vertex.startIfPossible();
        }
    }

    public static class OneToOneSourceSplitTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventOneToOneSourceSplit splitEvent = (VertexEventOneToOneSourceSplit)event;
            TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource();
            if (vertex.originalOneToOneSplitSource != null) {
                VertexState state = vertex.getState();
                Preconditions.checkState((state == VertexState.INITIALIZING || state == VertexState.INITED || state == VertexState.RUNNING ? 1 : 0) != 0, (Object)(" Unexpected 1-1 split for vertex " + vertex.getVertexId() + " in state " + (Object)((Object)vertex.getState()) + " . Split in vertex " + originalSplitSource + " sent by vertex " + splitEvent.getSenderVertex() + " numTasks " + splitEvent.getNumTasks()));
                if (vertex.originalOneToOneSplitSource.equals((Object)originalSplitSource)) {
                    LOG.info((Object)("Ignoring split of vertex " + vertex.getVertexId() + " because of split in vertex " + originalSplitSource + " sent by vertex " + splitEvent.getSenderVertex() + " numTasks " + splitEvent.getNumTasks()));
                    return state;
                }
                throw new TezUncheckedException("Vertex: " + vertex.getVertexId() + " asked to split by: " + originalSplitSource + " but was already split by:" + vertex.originalOneToOneSplitSource);
            }
            LOG.info((Object)("Splitting vertex " + vertex.getVertexId() + " because of split in vertex " + originalSplitSource + " sent by vertex " + splitEvent.getSenderVertex() + " numTasks " + splitEvent.getNumTasks()));
            vertex.originalOneToOneSplitSource = originalSplitSource;
            try {
                vertex.setParallelism(splitEvent.getNumTasks(), null, null, null, false);
            }
            catch (Exception e) {
                LOG.error((Object)"Unexpected exception, Just set Parallelims to a specified value, not involve EdgeManager,exception should not happen here", (Throwable)e);
            }
            if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.INITED) {
                return vertex.getState();
            }
            Preconditions.checkState((vertex.getState() == VertexState.INITIALIZING ? 1 : 0) != 0, (Object)(" Unexpected 1-1 split for vertex " + vertex.getVertexId() + " in state " + (Object)((Object)vertex.getState()) + " . Split in vertex " + originalSplitSource + " sent by vertex " + splitEvent.getSenderVertex() + " numTasks " + splitEvent.getNumTasks()));
            return vertex.getState();
        }
    }

    public static class RootInputInitializedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized)event;
            VertexState state = vertex.getState();
            if (state == VertexState.INITIALIZING) {
                try {
                    List<TezEvent> inputInfoEvents = vertex.vertexManager.onRootVertexInitialized(liInitEvent.getInputName(), (InputDescriptor)vertex.getAdditionalInputs().get(liInitEvent.getInputName()).getIODescriptor(), liInitEvent.getEvents());
                    if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
                        VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false);
                    }
                }
                catch (AMUserCodeException e) {
                    String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                    LOG.error((Object)msg, (Throwable)e);
                    vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                    return VertexState.FAILED;
                }
            }
            vertex.numInitializedInputs++;
            if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
                vertex.rootInputInitializerManager.shutdown();
                vertex.rootInputInitializerManager = null;
            }
            if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers) {
                if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
                    vertex.initWaitsForRootInitializers = false;
                }
                if (vertex.canInitVertex()) {
                    Preconditions.checkState((vertex.numTasks >= 0 ? 1 : 0) != 0, (Object)("Parallelism should have been set by now for vertex: " + vertex.logIdentifier));
                    return VertexInitializedTransition.doTransition(vertex);
                }
            }
            return vertex.getState();
        }
    }

    public static class VertexInitializedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        static VertexState doTransition(VertexImpl vertex) {
            Preconditions.checkState((boolean)vertex.canInitVertex(), (Object)("Vertex: " + vertex.logIdentifier));
            boolean isInitialized = vertex.initializeVertexInInitializingState();
            if (!isInitialized) {
                return VertexState.FAILED;
            }
            vertex.startIfPossible();
            return VertexState.INITED;
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            return VertexInitializedTransition.doTransition(vertex);
        }
    }

    public static class InitTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexState vertexState = VertexState.NEW;
            ++vertex.numInitedSourceVertices;
            if ((vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() || vertex.numInitedSourceVertices == vertex.sourceVertices.size() || vertex.numInitedSourceVertices == vertex.sourceVertices.size() + 1) && (vertexState = this.handleInitEvent(vertex, event)) != VertexState.FAILED && vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
                for (Vertex target : vertex.targetVertices.keySet()) {
                    vertex.getEventHandler().handle((Event)new VertexEvent(target.getVertexId(), VertexEventType.V_INIT));
                }
            }
            return vertexState;
        }

        private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
            VertexState state = vertex.setupVertex();
            if (state.equals((Object)VertexState.FAILED)) {
                return state;
            }
            if (vertex.targetVertices != null) {
                for (Edge e : vertex.targetVertices.values()) {
                    if (e.getEdgeManager() != null) continue;
                    Preconditions.checkState((e.getEdgeProperty().getDataMovementType() == EdgeProperty.DataMovementType.CUSTOM ? 1 : 0) != 0, (Object)("Null edge manager allowed only for custom edge. " + vertex.logIdentifier));
                    vertex.uninitializedEdges.add(e);
                }
            }
            if (vertex.sourceVertices != null) {
                for (Edge e : vertex.sourceVertices.values()) {
                    if (e.getEdgeManager() != null) continue;
                    Preconditions.checkState((e.getEdgeProperty().getDataMovementType() == EdgeProperty.DataMovementType.CUSTOM ? 1 : 0) != 0, (Object)("Null edge manager allowed only for custom edge. " + vertex.logIdentifier));
                    vertex.uninitializedEdges.add(e);
                }
            }
            if (vertex.numTasks == -1) {
                LOG.info((Object)("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split to set #tasks for the vertex " + vertex.getVertexId()));
                if (vertex.inputsWithInitializers != null) {
                    LOG.info((Object)("Vertex will initialize from input initializer. " + vertex.logIdentifier));
                    vertex.setupInputInitializerManager();
                    return VertexState.INITIALIZING;
                }
                boolean hasOneToOneUninitedSource = false;
                for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) {
                    if (entry.getValue().getEdgeProperty().getDataMovementType() != EdgeProperty.DataMovementType.ONE_TO_ONE || entry.getKey().getTotalTasks() != -1) continue;
                    hasOneToOneUninitedSource = true;
                    break;
                }
                if (hasOneToOneUninitedSource) {
                    LOG.info((Object)("Vertex will initialize from 1-1 sources. " + vertex.logIdentifier));
                    return VertexState.INITIALIZING;
                }
                if (vertex.vertexPlan.hasVertexManagerPlugin()) {
                    LOG.info((Object)("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier));
                    return VertexState.INITIALIZING;
                }
                throw new TezUncheckedException(vertex.getVertexId() + " has -1 tasks but does not have input initializers, " + "1-1 uninited sources or custom vertex manager to set it at runtime");
            }
            LOG.info((Object)("Creating " + vertex.numTasks + " for vertex: " + vertex.logIdentifier));
            vertex.createTasks();
            if (vertex.inputsWithInitializers != null) {
                LOG.info((Object)("Vertex will initialize from input initializer. " + vertex.logIdentifier));
                vertex.setupInputInitializerManager();
                return VertexState.INITIALIZING;
            }
            if (!vertex.uninitializedEdges.isEmpty()) {
                LOG.info((Object)("Vertex has uninitialized edges. " + vertex.logIdentifier));
                return VertexState.INITIALIZING;
            }
            LOG.info((Object)("Directly initializing vertex: " + vertex.logIdentifier));
            vertex.maybeSendConfiguredEvent();
            boolean isInitialized = vertex.initializeVertex();
            if (isInitialized) {
                return VertexState.INITED;
            }
            return VertexState.FAILED;
        }
    }

    public static class IgnoreInitInInitedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            LOG.info((Object)("Received event during INITED state, vertex=" + vertex.logIdentifier + ", eventType=" + event.getType()));
            if (!vertex.vertexAlreadyInitialized) {
                LOG.error((Object)("Vertex not initialized but in INITED state, vertexId=" + vertex.logIdentifier));
                return vertex.finished(VertexState.ERROR);
            }
            return VertexState.INITED;
        }
    }

    public static class RecoverTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
            VertexEventSourceVertexRecovered sourceRecoveredEvent = (VertexEventSourceVertexRecovered)vertexEvent;
            int distanceFromRoot = sourceRecoveredEvent.getSourceDistanceFromRoot() + 1;
            if (vertex.distanceFromRoot < distanceFromRoot) {
                vertex.distanceFromRoot = distanceFromRoot;
            }
            ++vertex.numRecoveredSourceVertices;
            switch (sourceRecoveredEvent.getSourceVertexState()) {
                case NEW: {
                    break;
                }
                case INITED: {
                    ++vertex.numInitedSourceVertices;
                    break;
                }
                case SUCCEEDED: 
                case RUNNING: {
                    ++vertex.numInitedSourceVertices;
                    ++vertex.numStartedSourceVertices;
                    if (sourceRecoveredEvent.getCompletedTaskAttempts() == null) break;
                    vertex.pendingReportedSrcCompletions.addAll(sourceRecoveredEvent.getCompletedTaskAttempts());
                    break;
                }
                case ERROR: 
                case KILLED: 
                case FAILED: {
                    break;
                }
                default: {
                    LOG.warn((Object)("Received invalid SourceVertexRecovered event, vertex=" + vertex.logIdentifier + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID() + ", sourceVertexState=" + (Object)((Object)sourceRecoveredEvent.getSourceVertexState())));
                    return vertex.finished(VertexState.ERROR);
                }
            }
            if (vertex.numRecoveredSourceVertices != vertex.getInputVerticesCount()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Waiting for source vertices to recover, vertex=" + vertex.logIdentifier + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices + ", totalSourceVertices=" + vertex.getInputVerticesCount()));
                }
                return VertexState.RECOVERING;
            }
            VertexState endState = VertexState.NEW;
            LinkedList completedTaskAttempts = Lists.newLinkedList();
            switch (vertex.recoveredState) {
                case NEW: {
                    Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
                    while (iterator.hasNext()) {
                        if (!iterator.next().getEventType().equals((Object)EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) continue;
                        iterator.remove();
                    }
                    if (vertex.numInitedSourceVertices == vertex.getInputVerticesCount()) {
                        vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_INIT));
                    }
                    if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
                        vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_START));
                    }
                    endState = VertexState.NEW;
                    break;
                }
                case INITED: {
                    boolean successSetParallelism;
                    String msg2;
                    vertex.vertexAlreadyInitialized = true;
                    try {
                        vertex.initializeCommitters();
                    }
                    catch (Exception e) {
                        msg2 = "Failed to initialize committers, vertex=" + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace((Throwable)e);
                        LOG.error((Object)msg2);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg2);
                        endState = VertexState.FAILED;
                        break;
                    }
                    try {
                        vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true, false);
                        successSetParallelism = true;
                    }
                    catch (Exception e) {
                        successSetParallelism = false;
                    }
                    if (!successSetParallelism) {
                        msg2 = "Failed to recover edge managers, vertex=" + vertex.logIdentifier;
                        LOG.error((Object)msg2);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg2);
                        endState = VertexState.FAILED;
                        break;
                    }
                    if (vertex.tasks != null) {
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId()));
                        }
                    }
                    if (vertex.numInitedSourceVertices != vertex.getInputVerticesCount()) {
                        LOG.info((Object)("Vertex already initialized but source vertices have not initialized, vertexId=" + vertex.logIdentifier + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices));
                    } else if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
                        vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_START));
                    }
                    endState = VertexState.INITED;
                    break;
                }
                case RUNNING: {
                    boolean successSetParallelism;
                    String msg;
                    String msg2;
                    vertex.tasksNotYetScheduled = false;
                    if (vertex.recoveryCommitInProgress) {
                        LOG.info((Object)("Recovered vertex was in the middle of a commit, failing Vertex=" + vertex.logIdentifier));
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.COMMIT_FAILURE, null);
                        endState = VertexState.FAILED;
                        break;
                    }
                    try {
                        vertex.initializeCommitters();
                    }
                    catch (Exception e) {
                        msg = "Failed to initialize committers, vertex=" + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace((Throwable)e);
                        LOG.error((Object)msg);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
                        endState = VertexState.FAILED;
                        break;
                    }
                    try {
                        vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true, false);
                        successSetParallelism = true;
                    }
                    catch (Exception e) {
                        successSetParallelism = false;
                    }
                    if (!successSetParallelism) {
                        msg2 = "Failed to recover edge managers for vertex:" + vertex.logIdentifier;
                        LOG.error((Object)msg2);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg2);
                        endState = VertexState.FAILED;
                        break;
                    }
                    assert (vertex.tasks.size() == vertex.numTasks);
                    if (vertex.tasks != null && vertex.numTasks != 0) {
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId()));
                        }
                        try {
                            vertex.recoveryCodeSimulatingStart();
                            endState = VertexState.RUNNING;
                        }
                        catch (AMUserCodeException e) {
                            msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + vertex.getLogIdentifier();
                            LOG.error((Object)msg, (Throwable)e);
                            vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                            endState = VertexState.FAILED;
                        }
                        break;
                    }
                    endState = VertexState.SUCCEEDED;
                    vertex.finished(endState);
                    break;
                }
                case KILLED: 
                case FAILED: 
                case SUCCEEDED: {
                    vertex.tasksNotYetScheduled = false;
                    assert (vertex.tasks.size() == vertex.numTasks);
                    if (vertex.tasks != null && vertex.numTasks != 0) {
                        TaskState taskState = TaskState.KILLED;
                        switch (vertex.recoveredState) {
                            case SUCCEEDED: {
                                taskState = TaskState.SUCCEEDED;
                                break;
                            }
                            case KILLED: {
                                taskState = TaskState.KILLED;
                                break;
                            }
                            case FAILED: {
                                taskState = TaskState.FAILED;
                            }
                        }
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId(), taskState));
                        }
                        try {
                            vertex.recoveryCodeSimulatingStart();
                            endState = VertexState.RUNNING;
                        }
                        catch (AMUserCodeException e) {
                            String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                            LOG.error((Object)msg, (Throwable)e);
                            vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                            endState = VertexState.FAILED;
                        }
                        break;
                    }
                    endState = vertex.recoveredState;
                    vertex.finished(endState);
                    break;
                }
                default: {
                    LOG.warn((Object)("Invalid recoveredState found when trying to recover vertex, recoveredState=" + (Object)((Object)vertex.recoveredState)));
                    vertex.finished(VertexState.ERROR);
                    endState = VertexState.ERROR;
                }
            }
            LOG.info((Object)("Recovered Vertex State, vertexId=" + vertex.logIdentifier + ", state=" + (Object)((Object)endState) + ", numInitedSourceVertices" + vertex.numInitedSourceVertices + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices + ", tasksIsNull=" + (vertex.tasks == null) + ", numTasks=" + (vertex.tasks == null ? 0 : vertex.tasks.size())));
            for (Map.Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
                vertex.eventHandler.handle((Event)new VertexEventSourceVertexRecovered(entry.getKey().getVertexId(), vertex.vertexId, endState, completedTaskAttempts, vertex.getDistanceFromRoot()));
            }
            if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED).contains((Object)endState)) {
                vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
                vertex.recoveredEvents.clear();
                if (!vertex.pendingRouteEvents.isEmpty()) {
                    try {
                        VertexImpl.handleRoutedTezEvents(vertex, vertex.pendingRouteEvents, false);
                        vertex.pendingRouteEvents.clear();
                    }
                    catch (AMUserCodeException e) {
                        String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + vertex.getLogIdentifier();
                        LOG.error((Object)msg, (Throwable)e);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                        endState = VertexState.FAILED;
                    }
                }
            } else if (!vertex.recoveredEvents.isEmpty()) {
                throw new RuntimeException("Invalid Vertex state, found non-zero recovered events in invalid state, recoveredState=" + (Object)((Object)endState) + ", recoveredEvents=" + vertex.recoveredEvents.size());
            }
            return endState;
        }
    }

    public static class BufferDataRecoverTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
            LOG.info((Object)("Received upstream event while still recovering, vertexId=" + vertex.logIdentifier + ", vertexEventType=" + vertexEvent.getType()));
            if (((VertexEventType)vertexEvent.getType()).equals((Object)VertexEventType.V_ROUTE_EVENT)) {
                VertexEventRouteEvent evt = (VertexEventRouteEvent)vertexEvent;
                vertex.pendingRouteEvents.addAll(evt.getEvents());
            } else if (((VertexEventType)vertexEvent.getType()).equals((Object)VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)) {
                VertexEventSourceTaskAttemptCompleted evt = (VertexEventSourceTaskAttemptCompleted)vertexEvent;
                vertex.pendingReportedSrcCompletions.add(evt.getCompletionEvent().getTaskAttemptId());
            } else if (((VertexEventType)vertexEvent.getType()).equals((Object)VertexEventType.V_SOURCE_VERTEX_STARTED)) {
                VertexEventSourceVertexStarted startEvent = (VertexEventSourceVertexStarted)vertexEvent;
                int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1;
                if (vertex.distanceFromRoot < distanceFromRoot) {
                    vertex.distanceFromRoot = distanceFromRoot;
                }
                ++vertex.numStartedSourceVertices;
            } else if (((VertexEventType)vertexEvent.getType()).equals((Object)VertexEventType.V_INIT)) {
                ++vertex.numInitedSourceVertices;
            }
        }
    }

    public static class NullEdgeInitializedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
            VertexEventNullEdgeInitialized event = (VertexEventNullEdgeInitialized)vertexEvent;
            Edge edge = event.getEdge();
            Vertex otherVertex = event.getVertex();
            Preconditions.checkState((vertex.getState() == VertexState.NEW || vertex.getState() == VertexState.INITIALIZING ? 1 : 0) != 0, (Object)("Unexpected state " + (Object)((Object)vertex.getState()) + " for vertex: " + vertex.logIdentifier));
            Preconditions.checkState((vertex.sourceVertices == null || vertex.sourceVertices.containsKey(otherVertex) || vertex.targetVertices == null || vertex.targetVertices.containsKey(otherVertex) ? 1 : 0) != 0, (Object)("Not connected to vertex " + otherVertex.getName() + " from vertex: " + vertex.logIdentifier));
            LOG.info((Object)("Edge initialized for connection to vertex " + otherVertex.getName() + " at vertex : " + vertex.logIdentifier));
            vertex.uninitializedEdges.remove(edge);
            if (vertex.getState() == VertexState.INITIALIZING && vertex.canInitVertex()) {
                return VertexInitializedTransition.doTransition(vertex);
            }
            return vertex.getState();
        }
    }

    public static class TerminateDuringRecoverTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
            LOG.info((Object)"Received a terminate during recovering, setting recovered state to KILLED");
            vertex.recoveredState = VertexState.KILLED;
        }
    }

    public static class StartRecoverTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
            VertexState endState;
            VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex)vertexEvent;
            VertexState desiredState = recoverEvent.getDesiredState();
            switch (desiredState) {
                case RUNNING: {
                    break;
                }
                case ERROR: 
                case KILLED: 
                case FAILED: 
                case SUCCEEDED: {
                    switch (desiredState) {
                        case SUCCEEDED: {
                            vertex.succeededTaskCount = vertex.numTasks;
                            vertex.completedTaskCount = vertex.numTasks;
                            break;
                        }
                        case KILLED: {
                            vertex.killedTaskCount = vertex.numTasks;
                            break;
                        }
                        case ERROR: 
                        case FAILED: {
                            vertex.failedTaskCount = vertex.numTasks;
                        }
                    }
                    if (vertex.tasks != null) {
                        TaskState taskState = TaskState.KILLED;
                        switch (desiredState) {
                            case SUCCEEDED: {
                                taskState = TaskState.SUCCEEDED;
                                break;
                            }
                            case KILLED: {
                                taskState = TaskState.KILLED;
                                break;
                            }
                            case ERROR: 
                            case FAILED: {
                                taskState = TaskState.FAILED;
                            }
                        }
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId(), taskState, false));
                        }
                    }
                    LOG.info((Object)("DAG informed Vertex of its final completed state, vertex=" + vertex.logIdentifier + ", state=" + (Object)((Object)desiredState)));
                    return desiredState;
                }
                default: {
                    LOG.info((Object)("Unhandled desired state provided by DAG, vertex=" + vertex.logIdentifier + ", state=" + (Object)((Object)desiredState)));
                    vertex.finished(VertexState.ERROR);
                }
            }
            switch (vertex.recoveredState) {
                case NEW: {
                    Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
                    while (iterator.hasNext()) {
                        if (!iterator.next().getEventType().equals((Object)EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) continue;
                        iterator.remove();
                    }
                    vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_INIT));
                    vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_START));
                    endState = VertexState.NEW;
                    break;
                }
                case INITED: {
                    String msg2;
                    try {
                        vertex.initializeCommitters();
                    }
                    catch (Exception e) {
                        msg2 = "Failed to initialize committers, vertex=" + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace((Throwable)e);
                        LOG.error((Object)msg2);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg2);
                        endState = VertexState.FAILED;
                        break;
                    }
                    if (vertex.tasks != null) {
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId()));
                        }
                    }
                    vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_START));
                    if (vertex.getInputVertices().isEmpty()) {
                        endState = VertexState.INITED;
                        break;
                    }
                    endState = VertexState.RECOVERING;
                    break;
                }
                case RUNNING: {
                    String msg;
                    String msg2;
                    vertex.tasksNotYetScheduled = false;
                    try {
                        vertex.initializeCommitters();
                    }
                    catch (Exception e) {
                        msg2 = "Failed to initialize committers, vertex=" + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace((Throwable)e);
                        LOG.error((Object)msg2);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg2);
                        endState = VertexState.FAILED;
                        break;
                    }
                    if (vertex.recoveryCommitInProgress) {
                        msg = "Recovered vertex was in the middle of a commit, failing Vertex=" + vertex.logIdentifier;
                        LOG.warn((Object)msg);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.COMMIT_FAILURE, msg);
                        endState = VertexState.FAILED;
                        break;
                    }
                    assert (vertex.tasks.size() == vertex.numTasks);
                    if (vertex.tasks != null && vertex.numTasks != 0) {
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId()));
                        }
                        try {
                            vertex.recoveryCodeSimulatingStart();
                            endState = VertexState.RUNNING;
                        }
                        catch (AMUserCodeException e) {
                            msg2 = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                            LOG.error((Object)msg2, (Throwable)e);
                            vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg2 + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                            endState = VertexState.FAILED;
                        }
                        break;
                    }
                    endState = VertexState.SUCCEEDED;
                    vertex.finished(endState);
                    break;
                }
                case KILLED: 
                case FAILED: 
                case SUCCEEDED: {
                    String msg;
                    if (vertex.recoveredState == VertexState.SUCCEEDED && vertex.hasCommitter && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) {
                        msg = "Cannot recover vertex as all recovery events not found, vertex=" + vertex.logIdentifier + ", hasCommitters=" + vertex.hasCommitter + ", summaryCompletionSeen=" + vertex.summaryCompleteSeen + ", finalCompletionSeen=" + vertex.vertexCompleteSeen;
                        LOG.warn((Object)msg);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.COMMIT_FAILURE, msg);
                        endState = VertexState.FAILED;
                        break;
                    }
                    vertex.tasksNotYetScheduled = false;
                    if (vertex.tasks != null && vertex.numTasks != 0) {
                        TaskState taskState = TaskState.KILLED;
                        switch (vertex.recoveredState) {
                            case SUCCEEDED: {
                                taskState = TaskState.SUCCEEDED;
                                break;
                            }
                            case KILLED: {
                                taskState = TaskState.KILLED;
                                break;
                            }
                            case FAILED: {
                                taskState = TaskState.FAILED;
                            }
                        }
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId(), taskState));
                        }
                        try {
                            vertex.recoveryCodeSimulatingStart();
                            endState = VertexState.RUNNING;
                        }
                        catch (AMUserCodeException e) {
                            String msg3 = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                            LOG.error((Object)msg3, (Throwable)e);
                            vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg3 + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                            endState = VertexState.FAILED;
                        }
                        break;
                    }
                    endState = vertex.recoveredState;
                    vertex.finished(endState);
                    break;
                }
                default: {
                    LOG.warn((Object)("Invalid recoveredState found when trying to recover vertex, vertex=" + vertex.logIdentifier + ", recoveredState=" + (Object)((Object)vertex.recoveredState)));
                    vertex.finished(VertexState.ERROR);
                    endState = VertexState.ERROR;
                }
            }
            if (!endState.equals((Object)VertexState.RECOVERING)) {
                LOG.info((Object)("Recovered Vertex State, vertexId=" + vertex.logIdentifier + ", state=" + (Object)((Object)endState) + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices + ", recoveredEvents=" + (vertex.recoveredEvents == null ? "null" : Integer.valueOf(vertex.recoveredEvents.size())) + ", tasksIsNull=" + (vertex.tasks == null) + ", numTasks=" + (vertex.tasks == null ? "null" : Integer.valueOf(vertex.tasks.size()))));
                for (Map.Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
                    vertex.eventHandler.handle((Event)new VertexEventSourceVertexRecovered(entry.getKey().getVertexId(), vertex.vertexId, endState, null, vertex.getDistanceFromRoot()));
                }
            }
            if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED).contains((Object)endState)) {
                vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
                vertex.recoveredEvents.clear();
            } else if (!vertex.recoveredEvents.isEmpty()) {
                throw new RuntimeException("Invalid Vertex state, found non-zero recovered events in invalid state, vertex=" + vertex.logIdentifier + ", recoveredState=" + (Object)((Object)endState) + ", recoveredEvents=" + vertex.recoveredEvents.size());
            }
            return endState;
        }
    }
}

