/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.counters.AbstractCounters;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EntityDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.Scope;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptEventInfo;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventCommitCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.app.dag.impl.ImmediateStartVertexManager;
import org.apache.tez.dag.app.dag.impl.OutputCommitterContextImpl;
import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.VertexManager;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.InputStatistics;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.OutputStatistics;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.IOStatistics;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VertexImpl
implements Vertex,
EventHandler<VertexEvent> {
    private static final String LINE_SEPARATOR = System.getProperty("line.separator");
    private static final Logger LOG = LoggerFactory.getLogger(VertexImpl.class);
    private final Clock clock;
    private final Lock readLock;
    private final Lock writeLock;
    private final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
    private final TaskHeartbeatHandler taskHeartbeatHandler;
    private final Object tasksSyncHandle = new Object();
    private final EventHandler eventHandler;
    private final AppContext appContext;
    private final DAG dag;
    private boolean lazyTasksCopyNeeded = false;
    volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap();
    private Object fullCountersLock = new Object();
    private TezCounters fullCounters = null;
    private Resource taskResource;
    private Configuration vertexConf;
    private final boolean isSpeculationEnabled;
    @VisibleForTesting
    final int taskSchedulerIdentifier;
    @VisibleForTesting
    final int containerLauncherIdentifier;
    @VisibleForTesting
    final int taskCommunicatorIdentifier;
    @VisibleForTesting
    int numStartedSourceVertices = 0;
    @VisibleForTesting
    int numInitedSourceVertices = 0;
    @VisibleForTesting
    int numRecoveredSourceVertices = 0;
    private int distanceFromRoot = 0;
    private final List<String> diagnostics = new ArrayList<String>();
    protected final StateChangeNotifier stateChangeNotifier;
    @VisibleForTesting
    int numSuccessSourceAttemptCompletions = 0;
    List<GroupInputSpec> groupInputSpecList;
    Set<String> sharedOutputs = Sets.newHashSet();
    private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
    private static final RouteEventTransition ROUTE_EVENT_TRANSITION = new RouteEventTransition();
    private static final TaskAttemptCompletedEventTransition TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = new TaskAttemptCompletedEventTransition();
    private static final SourceTaskAttemptCompletedEventTransition SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = new SourceTaskAttemptCompletedEventTransition();
    private static final CommitCompletedTransition COMMIT_COMPLETED_TRANSITION = new CommitCompletedTransition();
    private static final VertexStateChangedCallback STATE_CHANGED_CALLBACK = new VertexStateChangedCallback();
    private VertexState recoveredState = VertexState.NEW;
    @VisibleForTesting
    List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
    private boolean vertexAlreadyInitialized = false;
    @VisibleForTesting
    final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
    LegacySpeculator speculator;
    @VisibleForTesting
    Map<String, ListenableFuture<Void>> commitFutures = new ConcurrentHashMap<String, ListenableFuture<Void>>();
    protected static final StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent> stateMachineFactory = new StateMachineFactory((Enum)VertexState.NEW).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW, VertexState.INITED, VertexState.INITIALIZING, VertexState.FAILED), (Enum)VertexEventType.V_INIT, (MultipleArcTransition)new InitTransition()).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW), (Enum)VertexEventType.V_NULL_EDGE_INITIALIZED, (MultipleArcTransition)new NullEdgeInitializedTransition()).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW), (Enum)VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, (MultipleArcTransition)SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW, new VertexState[]{VertexState.INITED, VertexState.INITIALIZING, VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.ERROR, VertexState.RECOVERING}), (Enum)VertexEventType.V_RECOVER, (MultipleArcTransition)new StartRecoverTransition()).addTransition((Enum)VertexState.NEW, EnumSet.of(VertexState.NEW, new VertexState[]{VertexState.INITED, VertexState.INITIALIZING, VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.ERROR, VertexState.RECOVERING}), (Enum)VertexEventType.V_SOURCE_VERTEX_RECOVERED, (MultipleArcTransition)new RecoverTransition()).addTransition((Enum)VertexState.NEW, (Enum)VertexState.NEW, (Enum)VertexEventType.V_SOURCE_VERTEX_STARTED, (SingleArcTransition)new SourceVertexStartedTransition()).addTransition((Enum)VertexState.NEW, (Enum)VertexState.KILLED, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new TerminateNewVertexTransition()).addTransition((Enum)VertexState.NEW, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.RECOVERING, EnumSet.of(VertexState.NEW, new VertexState[]{VertexState.INITED, VertexState.INITIALIZING, VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.ERROR, VertexState.RECOVERING}), (Enum)VertexEventType.V_SOURCE_VERTEX_RECOVERED, (MultipleArcTransition)new RecoverTransition()).addTransition((Enum)VertexState.RECOVERING, (Enum)VertexState.RECOVERING, EnumSet.of(VertexEventType.V_INIT, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED), (SingleArcTransition)new BufferDataRecoverTransition()).addTransition((Enum)VertexState.RECOVERING, (Enum)VertexState.RECOVERING, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new TerminateDuringRecoverTransition()).addTransition((Enum)VertexState.RECOVERING, EnumSet.of(VertexState.RECOVERING), (Enum)VertexEventType.V_MANAGER_USER_CODE_ERROR, (MultipleArcTransition)new VertexManagerUserCodeErrorTransition()).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, VertexState.FAILED), (Enum)VertexEventType.V_ROOT_INPUT_INITIALIZED, (MultipleArcTransition)new RootInputInitializedTransition()).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, VertexState.FAILED), (Enum)VertexEventType.V_INPUT_DATA_INFORMATION, (MultipleArcTransition)new InputDataInformationTransition()).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITED, VertexState.FAILED), (Enum)VertexEventType.V_READY_TO_INIT, (MultipleArcTransition)new VertexInitializedTransition()).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.FAILED), (Enum)VertexEventType.V_ROOT_INPUT_FAILED, (MultipleArcTransition)new RootInputInitFailedTransition()).addTransition((Enum)VertexState.INITIALIZING, (Enum)VertexState.INITIALIZING, (Enum)VertexEventType.V_START, (SingleArcTransition)new StartWhileInitializingTransition()).addTransition((Enum)VertexState.INITIALIZING, (Enum)VertexState.INITIALIZING, (Enum)VertexEventType.V_SOURCE_VERTEX_STARTED, (SingleArcTransition)new SourceVertexStartedTransition()).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING), (Enum)VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, (MultipleArcTransition)SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING, VertexState.FAILED), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.FAILED), (Enum)VertexEventType.V_MANAGER_USER_CODE_ERROR, (MultipleArcTransition)new VertexManagerUserCodeErrorTransition()).addTransition((Enum)VertexState.INITIALIZING, (Enum)VertexState.KILLED, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new TerminateInitingVertexTransition()).addTransition((Enum)VertexState.INITIALIZING, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, VertexState.FAILED), (Enum)VertexEventType.V_NULL_EDGE_INITIALIZED, (MultipleArcTransition)new NullEdgeInitializedTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.FAILED), (Enum)VertexEventType.V_ROOT_INPUT_FAILED, (MultipleArcTransition)new RootInputInitFailedTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.INITED, VertexState.ERROR), (Enum)VertexEventType.V_INIT, (MultipleArcTransition)new IgnoreInitInInitedTransition()).addTransition((Enum)VertexState.INITED, (Enum)VertexState.INITED, (Enum)VertexEventType.V_SOURCE_VERTEX_STARTED, (SingleArcTransition)new SourceVertexStartedTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.INITED), (Enum)VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, (MultipleArcTransition)SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.RUNNING, VertexState.INITED, VertexState.TERMINATING), (Enum)VertexEventType.V_START, (MultipleArcTransition)new StartTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.INITED, VertexState.FAILED), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.INITED, (Enum)VertexState.KILLED, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new TerminateInitedVertexTransition()).addTransition((Enum)VertexState.INITED, EnumSet.of(VertexState.FAILED), (Enum)VertexEventType.V_MANAGER_USER_CODE_ERROR, (MultipleArcTransition)new VertexManagerUserCodeErrorTransition()).addTransition((Enum)VertexState.INITED, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.TERMINATING), (Enum)VertexEventType.V_ROOT_INPUT_FAILED, (MultipleArcTransition)new RootInputInitFailedTransition()).addTransition((Enum)VertexState.RUNNING, (Enum)VertexState.RUNNING, (Enum)VertexEventType.V_TASK_ATTEMPT_COMPLETED, (SingleArcTransition)TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING), (Enum)VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, (MultipleArcTransition)SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.RUNNING, new VertexState[]{VertexState.COMMITTING, VertexState.SUCCEEDED, VertexState.TERMINATING, VertexState.FAILED, VertexState.ERROR}), (Enum)VertexEventType.V_TASK_COMPLETED, (MultipleArcTransition)new TaskCompletedTransition()).addTransition((Enum)VertexState.RUNNING, (Enum)VertexState.TERMINATING, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new VertexKilledTransition()).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.TERMINATING), (Enum)VertexEventType.V_MANAGER_USER_CODE_ERROR, (MultipleArcTransition)new VertexManagerUserCodeErrorTransition()).addTransition((Enum)VertexState.RUNNING, (Enum)VertexState.RUNNING, (Enum)VertexEventType.V_TASK_RESCHEDULED, (SingleArcTransition)new TaskRescheduledTransition()).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED), (Enum)VertexEventType.V_COMPLETED, (MultipleArcTransition)new VertexNoTasksCompletedTransition()).addTransition((Enum)VertexState.RUNNING, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.RUNNING, EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.COMMITTING, EnumSet.of(VertexState.COMMITTING, VertexState.TERMINATING, VertexState.SUCCEEDED, VertexState.FAILED), (Enum)VertexEventType.V_COMMIT_COMPLETED, (MultipleArcTransition)COMMIT_COMPLETED_TRANSITION).addTransition((Enum)VertexState.COMMITTING, (Enum)VertexState.TERMINATING, (Enum)VertexEventType.V_TERMINATE, (SingleArcTransition)new VertexKilledWhileCommittingTransition()).addTransition((Enum)VertexState.COMMITTING, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.COMMITTING, EnumSet.of(VertexState.COMMITTING, VertexState.TERMINATING), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.COMMITTING, (Enum)VertexState.TERMINATING, (Enum)VertexEventType.V_TASK_RESCHEDULED, (SingleArcTransition)new TaskRescheduledWhileCommittingTransition()).addTransition((Enum)VertexState.COMMITTING, EnumSet.of(VertexState.TERMINATING), (Enum)VertexEventType.V_MANAGER_USER_CODE_ERROR, (MultipleArcTransition)new VertexManagerUserCodeErrorTransition()).addTransition((Enum)VertexState.TERMINATING, EnumSet.of(VertexState.TERMINATING, VertexState.KILLED, VertexState.FAILED), (Enum)VertexEventType.V_TASK_COMPLETED, (MultipleArcTransition)new TaskCompletedTransition()).addTransition((Enum)VertexState.TERMINATING, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.TERMINATING, EnumSet.of(VertexState.TERMINATING, VertexState.FAILED, VertexState.KILLED, VertexState.ERROR), (Enum)VertexEventType.V_COMMIT_COMPLETED, (MultipleArcTransition)COMMIT_COMPLETED_TRANSITION).addTransition((Enum)VertexState.TERMINATING, (Enum)VertexState.TERMINATING, EnumSet.of(VertexEventType.V_TERMINATE, new VertexEventType[]{VertexEventType.V_MANAGER_USER_CODE_ERROR, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_RESCHEDULED})).addTransition((Enum)VertexState.SUCCEEDED, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.SUCCEEDED, EnumSet.of(VertexState.RUNNING, VertexState.FAILED), (Enum)VertexEventType.V_TASK_RESCHEDULED, (MultipleArcTransition)new TaskRescheduledAfterVertexSuccessTransition()).addTransition((Enum)VertexState.SUCCEEDED, EnumSet.of(VertexState.SUCCEEDED, VertexState.FAILED), (Enum)VertexEventType.V_ROUTE_EVENT, (MultipleArcTransition)ROUTE_EVENT_TRANSITION).addTransition((Enum)VertexState.SUCCEEDED, EnumSet.of(VertexState.FAILED, VertexState.ERROR), (Enum)VertexEventType.V_TASK_COMPLETED, (MultipleArcTransition)new TaskCompletedAfterVertexSuccessTransition()).addTransition((Enum)VertexState.SUCCEEDED, (Enum)VertexState.SUCCEEDED, EnumSet.of(VertexEventType.V_TERMINATE, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)).addTransition((Enum)VertexState.SUCCEEDED, (Enum)VertexState.SUCCEEDED, (Enum)VertexEventType.V_TASK_ATTEMPT_COMPLETED, (SingleArcTransition)new TaskAttemptCompletedEventTransition()).addTransition((Enum)VertexState.FAILED, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.FAILED, (Enum)VertexState.FAILED, EnumSet.of(VertexEventType.V_TERMINATE, new VertexEventType[]{VertexEventType.V_MANAGER_USER_CODE_ERROR, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_TASK_RESCHEDULED, VertexEventType.V_START, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_COMPLETED, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_NULL_EDGE_INITIALIZED, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_RECOVERED})).addTransition((Enum)VertexState.KILLED, (Enum)VertexState.ERROR, (Enum)VertexEventType.V_INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)VertexState.KILLED, (Enum)VertexState.KILLED, EnumSet.of(VertexEventType.V_TERMINATE, new VertexEventType[]{VertexEventType.V_MANAGER_USER_CODE_ERROR, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_INIT, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_START, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TASK_RESCHEDULED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_COMPLETED, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_RECOVERED})).addTransition((Enum)VertexState.ERROR, (Enum)VertexState.ERROR, EnumSet.of(VertexEventType.V_INIT, new VertexEventType[]{VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_START, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TERMINATE, VertexEventType.V_MANAGER_USER_CODE_ERROR, VertexEventType.V_TASK_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_RESCHEDULED, VertexEventType.V_INTERNAL_ERROR, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_RECOVERED})).installTopology();
    private final StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl> stateMachine;
    @VisibleForTesting
    int numTasks;
    @VisibleForTesting
    int completedTaskCount = 0;
    @VisibleForTesting
    int succeededTaskCount = 0;
    @VisibleForTesting
    int failedTaskCount = 0;
    @VisibleForTesting
    int killedTaskCount = 0;
    @VisibleForTesting
    AtomicInteger failedTaskAttemptCount = new AtomicInteger(0);
    @VisibleForTesting
    AtomicInteger killedTaskAttemptCount = new AtomicInteger(0);
    @VisibleForTesting
    long initTimeRequested;
    @VisibleForTesting
    long initedTime;
    @VisibleForTesting
    long startTimeRequested;
    @VisibleForTesting
    long startedTime;
    @VisibleForTesting
    long finishTime;
    private float progress;
    private final TezVertexID vertexId;
    private final DAGProtos.VertexPlan vertexPlan;
    private boolean initWaitsForRootInitializers = false;
    private final String vertexName;
    private final ProcessorDescriptor processorDescriptor;
    private boolean vertexToBeReconfiguredByManager = false;
    AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
    AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
    @VisibleForTesting
    Map<Vertex, Edge> sourceVertices;
    private Map<Vertex, Edge> targetVertices;
    Set<Edge> uninitializedEdges = Sets.newHashSet();
    LinkedHashMap<String, Integer> ioIndices = Maps.newLinkedHashMap();
    private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> rootInputDescriptors;
    private Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> additionalOutputs;
    private Map<String, OutputCommitter> outputCommitters;
    private Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>();
    private static final InputSpecUpdate DEFAULT_ROOT_INPUT_SPECS = InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate();
    private final List<OutputSpec> additionalOutputSpecs = new ArrayList<OutputSpec>();
    private Set<String> inputsWithInitializers;
    private int numInitializedInputs;
    @VisibleForTesting
    int numInitializerCompletionsHandled = 0;
    private boolean startSignalPending = false;
    List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
    private boolean tasksNotYetScheduled = true;
    private final List<EventInfo> onDemandRouteEvents = Lists.newArrayListWithCapacity((int)1000);
    private final ReadWriteLock onDemandRouteEventsReadWriteLock = new ReentrantReadWriteLock();
    private final Lock onDemandRouteEventsReadLock = this.onDemandRouteEventsReadWriteLock.readLock();
    private final Lock onDemandRouteEventsWriteLock = this.onDemandRouteEventsReadWriteLock.writeLock();
    List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>();
    List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
    private RootInputInitializerManager rootInputInitializerManager;
    VertexManager vertexManager;
    private final UserGroupInformation dagUgi;
    private AtomicBoolean committed = new AtomicBoolean(false);
    private AtomicBoolean aborted = new AtomicBoolean(false);
    private AtomicBoolean commitCanceled = new AtomicBoolean(false);
    private boolean commitVertexOutputs = false;
    private Map<String, DAGImpl.VertexGroupInfo> dagVertexGroups;
    private TaskLocationHint[] taskLocationHints;
    private Map<String, LocalResource> localResources;
    private final Map<String, String> environment;
    private final Map<String, String> environmentTaskSpecific;
    private final String javaOptsTaskSpecific;
    private final String javaOpts;
    private final ContainerContext containerContext;
    private VertexTerminationCause terminationCause;
    private String logIdentifier;
    @VisibleForTesting
    boolean recoveryCommitInProgress = false;
    private boolean summaryCompleteSeen = false;
    @VisibleForTesting
    boolean hasCommitter = false;
    private boolean vertexCompleteSeen = false;
    private Map<String, EdgeProperty> recoveredSourceEdgeProperties = null;
    private Map<String, InputSpecUpdate> recoveredRootInputSpecUpdates = null;
    boolean recoveryInitEventSeen = false;
    boolean recoveryStartEventSeen = false;
    private VertexStats vertexStats = null;
    private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts;
    @VisibleForTesting
    VertexStatisticsImpl completedTasksStatsCache;
    private VertexStatisticsImpl finalStatistics;

    private void augmentStateMachine() {
        this.stateMachine.registerStateEnteredCallback(VertexState.SUCCEEDED, STATE_CHANGED_CALLBACK).registerStateEnteredCallback(VertexState.FAILED, STATE_CHANGED_CALLBACK).registerStateEnteredCallback(VertexState.KILLED, STATE_CHANGED_CALLBACK).registerStateEnteredCallback(VertexState.RUNNING, STATE_CHANGED_CALLBACK);
    }

    void resetCompletedTaskStatsCache(boolean recompute) {
        this.completedTasksStatsCache = new VertexStatisticsImpl();
        if (recompute) {
            for (Task t : this.getTasks().values()) {
                if (t.getState() != TaskState.SUCCEEDED) continue;
                this.completedTasksStatsCache.mergeFrom(((TaskImpl)t).getStatistics());
            }
        }
    }

    public VertexImpl(TezVertexID vertexId, DAGProtos.VertexPlan vertexPlan, String vertexName, Configuration dagConf, EventHandler eventHandler, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, TaskHeartbeatHandler thh, boolean commitVertexOutputs, AppContext appContext, VertexLocationHint vertexLocationHint, Map<String, DAGImpl.VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption, StateChangeNotifier entityStatusTracker) {
        this.vertexId = vertexId;
        this.vertexPlan = vertexPlan;
        this.vertexName = StringInterner.weakIntern((String)vertexName);
        this.vertexConf = new Configuration(dagConf);
        if (vertexPlan.hasVertexConf()) {
            DAGProtos.ConfigurationProto confProto = vertexPlan.getVertexConf();
            for (DAGProtos.PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) {
                TezConfiguration.validateProperty((String)keyValuePair.getKey(), (Scope)Scope.VERTEX);
                this.vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue());
            }
        }
        this.clock = clock;
        this.appContext = appContext;
        this.commitVertexOutputs = commitVertexOutputs;
        this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
        this.taskHeartbeatHandler = thh;
        this.eventHandler = eventHandler;
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
        if (LOG.isDebugEnabled()) {
            VertexImpl.logLocationHints(this.vertexName, vertexLocationHint);
        }
        this.setTaskLocationHints(vertexLocationHint);
        this.dagUgi = appContext.getCurrentDAG().getDagUGI();
        this.dag = appContext.getCurrentDAG();
        this.taskResource = DagTypeConverters.createResourceRequestFromTaskConfig((DAGProtos.PlanTaskConfiguration)vertexPlan.getTaskConfig());
        this.processorDescriptor = DagTypeConverters.convertProcessorDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)vertexPlan.getProcessorDescriptor());
        this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan((List)vertexPlan.getTaskConfig().getLocalResourceList());
        this.environment = DagTypeConverters.createEnvironmentMapFromDAGPlan((List)vertexPlan.getTaskConfig().getEnvironmentSettingList());
        this.taskSpecificLaunchCmdOpts = taskSpecificLaunchCmdOption;
        String javaOptsWithoutLoggerMods = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
        String logString = this.vertexConf.get("tez.task.log.level", "INFO");
        String[] taskLogParams = TezClientUtils.parseLogParams((String)logString);
        this.javaOpts = TezClientUtils.maybeAddDefaultLoggingJavaOpts((String)taskLogParams[0], (String)javaOptsWithoutLoggerMods);
        if (this.taskSpecificLaunchCmdOpts.hasModifiedLogProperties()) {
            String[] taskLogParamsTaskSpecific = taskSpecificLaunchCmdOption.getTaskSpecificLogParams();
            this.javaOptsTaskSpecific = TezClientUtils.maybeAddDefaultLoggingJavaOpts((String)taskLogParamsTaskSpecific[0], (String)javaOptsWithoutLoggerMods);
            this.environmentTaskSpecific = new HashMap<String, String>(this.environment.size());
            this.environmentTaskSpecific.putAll(this.environment);
            if (taskLogParamsTaskSpecific.length == 2 && !Strings.isNullOrEmpty((String)taskLogParamsTaskSpecific[1])) {
                TezClientUtils.addLogParamsToEnv(this.environmentTaskSpecific, (String[])taskLogParamsTaskSpecific);
            }
        } else {
            this.javaOptsTaskSpecific = null;
            this.environmentTaskSpecific = null;
        }
        TezClientUtils.addLogParamsToEnv(this.environment, (String[])taskLogParams);
        this.containerContext = new ContainerContext(this.localResources, appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this);
        if (vertexPlan.getInputsCount() > 0) {
            this.setAdditionalInputs(vertexPlan.getInputsList());
        }
        if (vertexPlan.getOutputsCount() > 0) {
            this.setAdditionalOutputs(vertexPlan.getOutputsList());
        }
        this.stateChangeNotifier = entityStatusTracker;
        this.numTasks = vertexPlan.getTaskConfig().getNumTasks();
        this.dagVertexGroups = dagVertexGroups;
        this.isSpeculationEnabled = this.vertexConf.getBoolean("tez.am.speculation.enabled", false);
        if (this.isSpeculationEnabled()) {
            this.speculator = new LegacySpeculator(this.vertexConf, this.getAppContext(), (Vertex)this);
        }
        this.logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
        boolean isLocal = this.vertexConf.getBoolean("tez.local.mode", false);
        String tezDefaultComponentName = isLocal ? TezConstants.getTezUberServicePluginName() : TezConstants.getTezYarnServicePluginName();
        Vertex.VertexExecutionContext execContext = this.dag.getDefaultExecutionContext();
        if (vertexPlan.hasExecutionContext()) {
            execContext = DagTypeConverters.convertFromProto((DAGProtos.VertexExecutionContextProto)vertexPlan.getExecutionContext());
            LOG.info("Using ExecutionContext from Vertex for Vertex {}", (Object)vertexName);
        } else if (execContext != null) {
            LOG.info("Using ExecutionContext from DAG for Vertex {}", (Object)vertexName);
        }
        if (execContext != null && execContext.shouldExecuteInAm()) {
            tezDefaultComponentName = TezConstants.getTezUberServicePluginName();
        }
        String taskSchedulerName = tezDefaultComponentName;
        String containerLauncherName = tezDefaultComponentName;
        String taskCommName = tezDefaultComponentName;
        if (execContext != null) {
            if (execContext.getTaskSchedulerName() != null) {
                taskSchedulerName = execContext.getTaskSchedulerName();
            }
            if (execContext.getContainerLauncherName() != null) {
                containerLauncherName = execContext.getContainerLauncherName();
            }
            if (execContext.getTaskCommName() != null) {
                taskCommName = execContext.getTaskCommName();
            }
        }
        LOG.info("Vertex: " + this.logIdentifier + " configured with TaskScheduler=" + taskSchedulerName + ", ContainerLauncher=" + containerLauncherName + ", TaskComm=" + taskCommName);
        this.taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName);
        this.taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName);
        this.containerLauncherIdentifier = appContext.getContainerLauncherIdentifier(containerLauncherName);
        Preconditions.checkNotNull((Object)this.taskSchedulerIdentifier, (Object)("Unknown taskScheduler: " + taskSchedulerName));
        Preconditions.checkNotNull((Object)this.taskCommunicatorIdentifier, (Object)("Unknown taskCommunicator: " + containerLauncherName));
        Preconditions.checkNotNull((Object)this.containerLauncherIdentifier, (Object)("Unknown containerLauncher: " + taskCommName));
        StringBuilder sb = new StringBuilder();
        sb.append("Running vertex: ").append(this.logIdentifier).append(" : ").append("TaskScheduler=").append(this.taskSchedulerIdentifier).append(":").append(taskSchedulerName).append(", ContainerLauncher=").append(this.containerLauncherIdentifier).append(":").append(containerLauncherName).append(", TaskCommunicator=").append(this.taskCommunicatorIdentifier).append(":").append(taskCommName);
        LOG.info(sb.toString());
        this.stateMachine = new StateMachineTez(stateMachineFactory.make((Object)this), this);
        this.augmentStateMachine();
    }

    @Override
    public Configuration getConf() {
        return this.vertexConf;
    }

    @Override
    public int getTaskSchedulerIdentifier() {
        return this.taskSchedulerIdentifier;
    }

    @Override
    public int getContainerLauncherIdentifier() {
        return this.containerLauncherIdentifier;
    }

    @Override
    public int getTaskCommunicatorIdentifier() {
        return this.taskCommunicatorIdentifier;
    }

    private boolean isSpeculationEnabled() {
        return this.isSpeculationEnabled;
    }

    protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() {
        return this.stateMachine;
    }

    @Override
    public TezVertexID getVertexId() {
        return this.vertexId;
    }

    @Override
    public DAGProtos.VertexPlan getVertexPlan() {
        return this.vertexPlan;
    }

    @Override
    public int getDistanceFromRoot() {
        return this.distanceFromRoot;
    }

    @Override
    public LinkedHashMap<String, Integer> getIOIndices() {
        return this.ioIndices;
    }

    @Override
    public String getName() {
        return this.vertexName;
    }

    EventHandler getEventHandler() {
        return this.eventHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Task getTask(TezTaskID taskID) {
        this.readLock.lock();
        try {
            Task task = this.tasks.get(taskID);
            return task;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public Task getTask(int taskIndex) {
        return this.getTask(TezTaskID.getInstance((TezVertexID)this.vertexId, (int)taskIndex));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getTotalTasks() {
        this.readLock.lock();
        try {
            int n = this.numTasks;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getCompletedTasks() {
        this.readLock.lock();
        try {
            int n = this.succeededTaskCount + this.failedTaskCount + this.killedTaskCount;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getSucceededTasks() {
        this.readLock.lock();
        try {
            int n = this.succeededTaskCount;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getRunningTasks() {
        this.readLock.lock();
        try {
            int num = 0;
            for (Task task : this.tasks.values()) {
                if (task.getState() != TaskState.RUNNING) continue;
                ++num;
            }
            int n = num;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TezCounters getAllCounters() {
        this.readLock.lock();
        try {
            if (this.inTerminalState()) {
                this.mayBeConstructFinalFullCounters();
                TezCounters tezCounters = this.fullCounters;
                return tezCounters;
            }
            TezCounters counters = new TezCounters();
            TezCounters tezCounters = VertexImpl.incrTaskCounters(counters, this.tasks.values());
            return tezCounters;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public VertexStats getVertexStats() {
        this.readLock.lock();
        try {
            if (this.inTerminalState()) {
                this.mayBeConstructFinalFullCounters();
                VertexStats vertexStats = this.vertexStats;
                return vertexStats;
            }
            VertexStats stats = new VertexStats();
            VertexStats vertexStats = VertexImpl.updateVertexStats(stats, this.tasks.values());
            return vertexStats;
        }
        finally {
            this.readLock.unlock();
        }
    }

    boolean inTerminalState() {
        VertexState state = this.getInternalState();
        return state == VertexState.ERROR || state == VertexState.FAILED || state == VertexState.KILLED || state == VertexState.SUCCEEDED;
    }

    public static TezCounters incrTaskCounters(TezCounters counters, Collection<Task> tasks) {
        for (Task task : tasks) {
            counters.incrAllCounters((AbstractCounters)task.getCounters());
        }
        return counters;
    }

    public static VertexStats updateVertexStats(VertexStats stats, Collection<Task> tasks) {
        for (Task task : tasks) {
            stats.updateStats(task.getReport());
        }
        return stats;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<String> getDiagnostics() {
        this.readLock.lock();
        try {
            List<String> list = this.diagnostics;
            return list;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public float getProgress() {
        this.readLock.lock();
        try {
            this.computeProgress();
            float f = this.progress;
            return f;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ProgressBuilder getVertexProgress() {
        this.readLock.lock();
        try {
            ProgressBuilder progress = new ProgressBuilder();
            progress.setTotalTaskCount(this.numTasks);
            progress.setSucceededTaskCount(this.succeededTaskCount);
            progress.setRunningTaskCount(this.getRunningTasks());
            progress.setFailedTaskCount(this.failedTaskCount);
            progress.setKilledTaskCount(this.killedTaskCount);
            progress.setFailedTaskAttemptCount(this.failedTaskAttemptCount.get());
            progress.setKilledTaskAttemptCount(this.killedTaskAttemptCount.get());
            ProgressBuilder progressBuilder = progress;
            return progressBuilder;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VertexStatusBuilder getVertexStatus(Set<StatusGetOpts> statusOptions) {
        this.readLock.lock();
        try {
            VertexStatusBuilder status = new VertexStatusBuilder();
            status.setState(this.getInternalState());
            status.setDiagnostics(this.diagnostics);
            status.setProgress(this.getVertexProgress());
            if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
                status.setVertexCounters(this.getAllCounters());
            }
            VertexStatusBuilder vertexStatusBuilder = status;
            return vertexStatusBuilder;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TaskLocationHint getTaskLocationHint(TezTaskID taskId) {
        this.readLock.lock();
        try {
            if (this.taskLocationHints == null || this.taskLocationHints.length <= taskId.getId()) {
                TaskLocationHint taskLocationHint = null;
                return taskLocationHint;
            }
            TaskLocationHint taskLocationHint = this.taskLocationHints[taskId.getId()];
            return taskLocationHint;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @VisibleForTesting
    List<EventInfo> getOnDemandRouteEvents() {
        return this.onDemandRouteEvents;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void computeProgress() {
        this.readLock.lock();
        try {
            float progress = 0.0f;
            for (Task task : this.tasks.values()) {
                progress += task.isFinished() ? 1.0f : task.getProgress();
            }
            if (this.numTasks != 0) {
                progress /= (float)this.numTasks;
            }
            this.progress = progress;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<TezTaskID, Task> getTasks() {
        Object object = this.tasksSyncHandle;
        synchronized (object) {
            this.lazyTasksCopyNeeded = true;
            return Collections.unmodifiableMap(this.tasks);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VertexState getState() {
        this.readLock.lock();
        try {
            VertexState vertexState = (VertexState)this.getStateMachine().getCurrentState();
            return vertexState;
        }
        finally {
            this.readLock.unlock();
        }
    }

    boolean trySetTerminationCause(VertexTerminationCause trigger) {
        if (this.terminationCause == null) {
            this.terminationCause = trigger;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VertexTerminationCause getTerminationCause() {
        this.readLock.lock();
        try {
            VertexTerminationCause vertexTerminationCause = this.terminationCause;
            return vertexTerminationCause;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public AppContext getAppContext() {
        return this.appContext;
    }

    private void handleParallelismUpdate(int newParallelism, Map<String, EdgeProperty> sourceEdgeProperties, Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldParallelism) {
        Preconditions.checkArgument((oldParallelism != -1 ? 1 : 0) != 0, (Object)this.getLogIdentifier());
        if (oldParallelism < newParallelism) {
            this.addTasks(newParallelism);
        } else if (oldParallelism > newParallelism) {
            this.removeTasks(newParallelism);
        }
        Preconditions.checkState((this.numTasks == newParallelism ? 1 : 0) != 0, (Object)this.getLogIdentifier());
        this.recoveredSourceEdgeProperties = sourceEdgeProperties;
        this.recoveredRootInputSpecUpdates = rootInputSpecUpdates;
    }

    @Override
    public VertexState restoreFromEvent(HistoryEvent historyEvent) {
        this.writeLock.lock();
        try {
            switch (historyEvent.getEventType()) {
                case VERTEX_INITIALIZED: {
                    this.recoveryInitEventSeen = true;
                    this.recoveredState = this.setupVertex((VertexInitializedEvent)historyEvent);
                    this.createTasks();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Recovered state for vertex after Init event, vertex=" + this.logIdentifier + ", recoveredState=" + (Object)((Object)this.recoveredState));
                    }
                    VertexState vertexState = this.recoveredState;
                    return vertexState;
                }
                case VERTEX_STARTED: {
                    if (!this.recoveryInitEventSeen) {
                        throw new RuntimeException("Started Event seen but no Init Event was encountered earlier");
                    }
                    this.recoveryStartEventSeen = true;
                    VertexStartedEvent startedEvent = (VertexStartedEvent)historyEvent;
                    this.startTimeRequested = startedEvent.getStartRequestedTime();
                    this.startedTime = startedEvent.getStartTime();
                    this.recoveredState = VertexState.RUNNING;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Recovered state for vertex after Started event, vertex=" + this.logIdentifier + ", recoveredState=" + (Object)((Object)this.recoveredState));
                    }
                    VertexState vertexState = this.recoveredState;
                    return vertexState;
                }
                case VERTEX_PARALLELISM_UPDATED: {
                    VertexParallelismUpdatedEvent updatedEvent = (VertexParallelismUpdatedEvent)historyEvent;
                    int oldNumTasks = this.numTasks;
                    int newNumTasks = updatedEvent.getNumTasks();
                    this.handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeProperties(), updatedEvent.getRootInputSpecUpdates(), oldNumTasks);
                    Preconditions.checkState((this.numTasks == newNumTasks ? 1 : 0) != 0, (Object)this.getLogIdentifier());
                    if (updatedEvent.getVertexLocationHint() != null) {
                        this.setVertexLocationHint(updatedEvent.getVertexLocationHint());
                    }
                    this.stateChangeNotifier.stateChanged(this.vertexId, (VertexStateUpdate)new VertexStateUpdateParallelismUpdated(this.vertexName, this.numTasks, oldNumTasks));
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Recovered state for vertex after parallelism updated event, vertex=" + this.logIdentifier + ", recoveredState=" + (Object)((Object)this.recoveredState));
                    }
                    VertexState vertexState = this.recoveredState;
                    return vertexState;
                }
                case VERTEX_COMMIT_STARTED: {
                    this.recoveryCommitInProgress = true;
                    this.hasCommitter = true;
                    VertexState vertexState = this.recoveredState;
                    return vertexState;
                }
                case VERTEX_FINISHED: {
                    VertexFinishedEvent finishedEvent = (VertexFinishedEvent)historyEvent;
                    if (finishedEvent.isFromSummary()) {
                        this.summaryCompleteSeen = true;
                    } else {
                        this.vertexCompleteSeen = true;
                    }
                    this.numTasks = finishedEvent.getNumTasks();
                    this.recoveryCommitInProgress = false;
                    this.recoveredState = finishedEvent.getState();
                    this.diagnostics.add(finishedEvent.getDiagnostics());
                    this.finishTime = finishedEvent.getFinishTime();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Recovered state for vertex after finished event, vertex=" + this.logIdentifier + ", recoveredState=" + (Object)((Object)this.recoveredState));
                    }
                    VertexState vertexState = this.recoveredState;
                    return vertexState;
                }
                case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: {
                    VertexRecoverableEventsGeneratedEvent vEvent = (VertexRecoverableEventsGeneratedEvent)historyEvent;
                    this.recoveredEvents.addAll(vEvent.getTezEvents());
                    VertexState vertexState = this.recoveredState;
                    return vertexState;
                }
            }
            throw new RuntimeException("Unexpected event received for restoring state, eventType=" + (Object)((Object)historyEvent.getEventType()));
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public String getLogIdentifier() {
        return this.logIdentifier;
    }

    @Override
    public void incrementFailedTaskAttemptCount() {
        this.failedTaskAttemptCount.incrementAndGet();
    }

    @Override
    public void incrementKilledTaskAttemptCount() {
        this.killedTaskAttemptCount.incrementAndGet();
    }

    @Override
    public int getFailedTaskAttemptCount() {
        return this.failedTaskAttemptCount.get();
    }

    @Override
    public int getKilledTaskAttemptCount() {
        return this.killedTaskAttemptCount.get();
    }

    private void setTaskLocationHints(VertexLocationHint vertexLocationHint) {
        if (vertexLocationHint != null && vertexLocationHint.getTaskLocationHints() != null && !vertexLocationHint.getTaskLocationHints().isEmpty()) {
            List locHints = vertexLocationHint.getTaskLocationHints();
            this.taskLocationHints = locHints.toArray(new TaskLocationHint[locHints.size()]);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void scheduleSpeculativeTask(TezTaskID taskId) {
        this.readLock.lock();
        try {
            Preconditions.checkState((taskId.getId() < this.numTasks ? 1 : 0) != 0);
            this.eventHandler.handle((Event)new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT));
        }
        finally {
            this.readLock.unlock();
        }
    }

    void setupEdgeRouting() throws AMUserCodeException {
        for (Edge e : this.sourceVertices.values()) {
            e.routingToBegin();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unsetTasksNotYetScheduled() throws AMUserCodeException {
        block7: {
            if (this.tasksNotYetScheduled) {
                this.setupEdgeRouting();
                this.writeLock.lock();
                try {
                    this.tasksNotYetScheduled = false;
                    if (this.pendingTaskEvents.isEmpty()) break block7;
                    LOG.info("Routing pending task events for vertex: " + this.logIdentifier);
                    try {
                        this.handleRoutedTezEvents(this.pendingTaskEvents, false, true);
                    }
                    catch (AMUserCodeException e) {
                        String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + this.logIdentifier;
                        LOG.error(msg, (Throwable)((Object)e));
                        this.addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                        this.eventHandler.handle((Event)new VertexEventTermination(this.vertexId, VertexTerminationCause.AM_USERCODE_FAILURE));
                        this.writeLock.unlock();
                        return;
                    }
                    this.pendingTaskEvents.clear();
                }
                finally {
                    this.writeLock.unlock();
                }
            }
        }
    }

    TaskSpec createRemoteTaskSpec(int taskIndex) throws AMUserCodeException {
        return TaskSpec.createBaseTaskSpec((String)this.getDAG().getName(), (String)this.getName(), (int)this.getTotalTasks(), (ProcessorDescriptor)this.getProcessorDescriptor(), this.getInputSpecList(taskIndex), this.getOutputSpecList(taskIndex), this.getGroupInputSpecList(taskIndex));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void scheduleTasks(List<VertexManagerPluginContext.ScheduleTaskRequest> tasksToSchedule) {
        try {
            this.unsetTasksNotYetScheduled();
            this.writeLock.lock();
            try {
                for (VertexManagerPluginContext.ScheduleTaskRequest task : tasksToSchedule) {
                    if (this.numTasks <= task.getTaskIndex()) {
                        throw new TezUncheckedException("Invalid taskId: " + task.getTaskIndex() + " for vertex: " + this.logIdentifier);
                    }
                    TaskLocationHint locationHint = task.getTaskLocationHint();
                    if (locationHint == null) continue;
                    if (this.taskLocationHints == null) {
                        this.taskLocationHints = new TaskLocationHint[this.numTasks];
                    }
                    this.taskLocationHints[task.getTaskIndex()] = locationHint;
                }
            }
            finally {
                this.writeLock.unlock();
            }
            this.readLock.lock();
            try {
                for (VertexManagerPluginContext.ScheduleTaskRequest task : tasksToSchedule) {
                    TezTaskID taskId = TezTaskID.getInstance((TezVertexID)this.vertexId, (int)task.getTaskIndex());
                    TaskSpec baseTaskSpec = this.createRemoteTaskSpec(taskId.getId());
                    this.eventHandler.handle((Event)new TaskEventScheduleTask(taskId, baseTaskSpec, this.getTaskLocationHint(taskId)));
                }
            }
            finally {
                this.readLock.unlock();
            }
        }
        catch (AMUserCodeException e) {
            String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + this.getLogIdentifier();
            LOG.error(msg, (Throwable)((Object)e));
            this.eventHandler.handle((Event)new VertexEventManagerUserCodeError(this.getVertexId(), e));
            throw new TezUncheckedException((Throwable)((Object)e));
        }
    }

    @Override
    public void reconfigureVertex(int parallelism, @Nullable VertexLocationHint locationHint, @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException {
        this.setParallelism(parallelism, locationHint, sourceEdgeProperties, null, false, true);
    }

    @Override
    public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate, int parallelism, @Nullable VertexLocationHint locationHint) throws AMUserCodeException {
        this.setParallelism(parallelism, locationHint, null, rootInputSpecUpdate, false, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean fromVertexManager) throws AMUserCodeException {
        HashMap sourceEdgeProperties = Maps.newHashMap();
        this.readLock.lock();
        try {
            if (sourceEdgeManagers != null && !sourceEdgeManagers.isEmpty()) {
                for (Edge e : this.sourceVertices.values()) {
                    EdgeManagerPluginDescriptor newEdge = sourceEdgeManagers.get(e.getSourceVertexName());
                    EdgeProperty oldEdge = e.getEdgeProperty();
                    if (newEdge == null) continue;
                    sourceEdgeProperties.put(e.getSourceVertexName(), EdgeProperty.create((EdgeManagerPluginDescriptor)newEdge, (EdgeProperty.DataSourceType)oldEdge.getDataSourceType(), (EdgeProperty.SchedulingType)oldEdge.getSchedulingType(), (OutputDescriptor)oldEdge.getEdgeSource(), (InputDescriptor)oldEdge.getEdgeDestination()));
                }
            }
        }
        finally {
            this.readLock.unlock();
        }
        this.setParallelism(parallelism, vertexLocationHint, sourceEdgeProperties, rootInputSpecUpdates, false, fromVertexManager);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map<String, EdgeProperty> sourceEdgeProperties, Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean recovering, boolean fromVertexManager) throws AMUserCodeException {
        if (recovering) {
            this.writeLock.lock();
            try {
                if (sourceEdgeProperties != null) {
                    for (Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) {
                        LOG.info("Recovering edge manager for source:" + entry.getKey() + " destination: " + this.getLogIdentifier());
                        Vertex vertex = this.appContext.getCurrentDAG().getVertex(entry.getKey());
                        Edge edge = this.sourceVertices.get(vertex);
                        try {
                            edge.setEdgeProperty(entry.getValue());
                        }
                        catch (Exception e) {
                            throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge,sourceVertex:" + edge.getSourceVertexName() + "destinationVertex:" + edge.getDestinationVertexName(), (Throwable)e);
                        }
                    }
                }
                if (rootInputSpecUpdates != null) {
                    LOG.info("Got updated RootInputsSpecs during recovery: " + rootInputSpecUpdates.toString());
                    this.rootInputSpecs.putAll(rootInputSpecUpdates);
                }
                return;
            }
            finally {
                this.writeLock.unlock();
            }
        }
        Preconditions.checkArgument((parallelism >= 0 ? 1 : 0) != 0, (Object)("Parallelism must be >=0. Value: " + parallelism + " for vertex: " + this.logIdentifier));
        this.writeLock.lock();
        try {
            if (!this.tasksNotYetScheduled) {
                String msg = "setParallelism cannot be called after scheduling tasks. Vertex: " + this.getLogIdentifier();
                LOG.info(msg);
                throw new TezUncheckedException(msg);
            }
            if (fromVertexManager && this.canInitVertex()) {
                Preconditions.checkState((boolean)this.vertexToBeReconfiguredByManager, (Object)"Vertex is fully configured but still the reconfiguration API has been called. VertexManager must notify the framework using  context.vertexReconfigurationPlanned() before re-configuring the vertex.");
            }
            if (this.numTasks == -1) {
                if (this.getState() != VertexState.INITIALIZING) {
                    throw new TezUncheckedException("Vertex state is not Initializing. Value: " + (Object)((Object)this.getState()) + " for vertex: " + this.logIdentifier);
                }
                if (sourceEdgeProperties != null) {
                    for (Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) {
                        LOG.info("Replacing edge manager for source:" + entry.getKey() + " destination: " + this.getLogIdentifier());
                        Vertex vertex = this.appContext.getCurrentDAG().getVertex(entry.getKey());
                        Edge edge = this.sourceVertices.get(vertex);
                        try {
                            edge.setEdgeProperty(entry.getValue());
                        }
                        catch (Exception e) {
                            throw new TezUncheckedException("Fail to update EdgeProperty for Edge,sourceVertex:" + edge.getSourceVertexName() + "destinationVertex:" + edge.getDestinationVertexName(), (Throwable)e);
                        }
                    }
                }
                if (rootInputSpecUpdates != null) {
                    LOG.info("Got updated RootInputsSpecs: " + rootInputSpecUpdates.toString());
                    for (Map.Entry<String, EdgeProperty> entry : rootInputSpecUpdates.entrySet()) {
                        Preconditions.checkState((((InputSpecUpdate)entry.getValue()).isForAllWorkUnits() || ((InputSpecUpdate)entry.getValue()).getAllNumPhysicalInputs() != null && ((InputSpecUpdate)entry.getValue()).getAllNumPhysicalInputs().size() == parallelism ? 1 : 0) != 0, (Object)("Not enough input spec updates for root input named " + entry.getKey()));
                    }
                    this.rootInputSpecs.putAll(rootInputSpecUpdates);
                }
                int oldNumTasks = this.numTasks;
                this.numTasks = parallelism;
                this.stateChangeNotifier.stateChanged(this.vertexId, (VertexStateUpdate)new VertexStateUpdateParallelismUpdated(this.vertexName, this.numTasks, oldNumTasks));
                this.createTasks();
                this.setVertexLocationHint(vertexLocationHint);
                LOG.info("Vertex " + this.getLogIdentifier() + " parallelism set to " + parallelism);
                if (this.canInitVertex()) {
                    this.getEventHandler().handle((Event)new VertexEvent(this.getVertexId(), VertexEventType.V_READY_TO_INIT));
                }
            } else {
                Preconditions.checkState((rootInputSpecUpdates == null ? 1 : 0) != 0, (Object)"Root Input specs can only be updated when the vertex is configured with -1 tasks");
                int oldNumTasks = this.numTasks;
                for (Edge edge : this.sourceVertices.values()) {
                    edge.startEventBuffering();
                }
                if (parallelism == this.numTasks) {
                    LOG.info("setParallelism same as current value: " + parallelism + " for vertex: " + this.logIdentifier);
                    Preconditions.checkArgument((sourceEdgeProperties != null ? 1 : 0) != 0, (Object)"Source edge managers or RootInputSpecs must be set when not changing parallelism");
                } else {
                    LOG.info("Resetting vertex location hints due to change in parallelism for vertex: " + this.logIdentifier);
                    vertexLocationHint = null;
                    if (parallelism > this.numTasks) {
                        this.addTasks(parallelism);
                    } else if (parallelism < this.numTasks) {
                        this.removeTasks(parallelism);
                    }
                }
                Preconditions.checkState((this.numTasks == parallelism ? 1 : 0) != 0, (Object)this.getLogIdentifier());
                this.setVertexLocationHint(vertexLocationHint);
                LOG.info("Vertex " + this.getLogIdentifier() + " parallelism set to " + parallelism + " from " + oldNumTasks);
                this.stateChangeNotifier.stateChanged(this.vertexId, (VertexStateUpdate)new VertexStateUpdateParallelismUpdated(this.vertexName, this.numTasks, oldNumTasks));
                assert (this.tasks.size() == this.numTasks);
                if (sourceEdgeProperties != null) {
                    for (Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) {
                        LOG.info("Replacing edge manager for source:" + entry.getKey() + " destination: " + this.getLogIdentifier());
                        Vertex sourceVertex = this.appContext.getCurrentDAG().getVertex(entry.getKey());
                        Edge edge = this.sourceVertices.get(sourceVertex);
                        try {
                            edge.setEdgeProperty(entry.getValue());
                        }
                        catch (Exception e) {
                            throw new TezUncheckedException((Throwable)e);
                        }
                    }
                }
                VertexParallelismUpdatedEvent vertexParallelismUpdatedEvent = new VertexParallelismUpdatedEvent(this.vertexId, this.numTasks, vertexLocationHint, sourceEdgeProperties, rootInputSpecUpdates, oldNumTasks);
                this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.getDAGId(), vertexParallelismUpdatedEvent));
                for (Edge edge : this.sourceVertices.values()) {
                    edge.stopEventBuffering();
                }
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setVertexLocationHint(VertexLocationHint vertexLocationHint) {
        this.writeLock.lock();
        try {
            if (LOG.isDebugEnabled()) {
                VertexImpl.logLocationHints(this.vertexName, vertexLocationHint);
            }
            this.setTaskLocationHints(vertexLocationHint);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void vertexReconfigurationPlanned() {
        this.writeLock.lock();
        try {
            Preconditions.checkState((!this.vmIsInitialized.get() ? 1 : 0) != 0, (Object)"context.vertexReconfigurationPlanned() cannot be called after initialize()");
            Preconditions.checkState((!this.completelyConfiguredSent.get() ? 1 : 0) != 0, (Object)"vertexReconfigurationPlanned()  cannot be invoked after the vertex has been configured.");
            this.vertexToBeReconfiguredByManager = true;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void doneReconfiguringVertex() {
        this.writeLock.lock();
        try {
            Preconditions.checkState((boolean)this.vertexToBeReconfiguredByManager, (Object)"doneReconfiguringVertex() can be invoked only after vertexReconfigurationPlanned() is invoked");
            this.vertexToBeReconfiguredByManager = false;
            if (this.canInitVertex()) {
                this.maybeSendConfiguredEvent();
            } else {
                Preconditions.checkState((this.getInternalState() == VertexState.INITIALIZING ? 1 : 0) != 0, (Object)("Vertex: " + this.getLogIdentifier()));
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handle(VertexEvent event) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processing VertexEvent " + event.getVertexId() + " of type " + event.getType() + " while in state " + (Object)((Object)this.getInternalState()) + ". Event: " + (Object)((Object)event));
        }
        try {
            this.writeLock.lock();
            VertexState oldState = this.getInternalState();
            try {
                this.getStateMachine().doTransition(event.getType(), (Object)event);
            }
            catch (InvalidStateTransitonException e) {
                String message = "Invalid event " + event.getType() + " on vertex " + this.vertexName + " with vertexId " + this.vertexId + " at current state " + (Object)((Object)oldState);
                LOG.error("Can't handle " + message, (Throwable)e);
                this.addDiagnostic(message);
                this.eventHandler.handle((Event)new VertexEvent(this.vertexId, VertexEventType.V_INTERNAL_ERROR));
            }
            if (oldState != this.getInternalState()) {
                LOG.info(this.logIdentifier + " transitioned from " + (Object)((Object)oldState) + " to " + (Object)((Object)this.getInternalState()) + " due to event " + event.getType());
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private VertexState getInternalState() {
        this.readLock.lock();
        try {
            VertexState vertexState = (VertexState)this.getStateMachine().getCurrentState();
            return vertexState;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addTask(Task task) {
        Object object = this.tasksSyncHandle;
        synchronized (object) {
            if (this.lazyTasksCopyNeeded) {
                LinkedHashMap<TezTaskID, Task> newTasks = new LinkedHashMap<TezTaskID, Task>();
                newTasks.putAll(this.tasks);
                this.tasks = newTasks;
                this.lazyTasksCopyNeeded = false;
            }
        }
        this.tasks.put(task.getTaskId(), task);
    }

    void setFinishTime() {
        this.finishTime = this.clock.getTime();
    }

    void logJobHistoryVertexInitializedEvent() {
        VertexInitializedEvent initEvt = new VertexInitializedEvent(this.vertexId, this.vertexName, this.initTimeRequested, this.initedTime, this.numTasks, this.getProcessorName(), this.getAdditionalInputs());
        this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.getDAGId(), initEvt));
    }

    void logJobHistoryVertexStartedEvent() {
        VertexStartedEvent startEvt = new VertexStartedEvent(this.vertexId, this.startTimeRequested, this.startedTime);
        this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.getDAGId(), startEvt));
    }

    void logJobHistoryVertexFinishedEvent() throws IOException {
        this.setFinishTime();
        this.logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, this.finishTime, "");
    }

    void logJobHistoryVertexFailedEvent(VertexState state) throws IOException {
        this.logJobHistoryVertexCompletedHelper(state, this.clock.getTime(), StringUtils.join(this.getDiagnostics(), (String)LINE_SEPARATOR));
    }

    private void logJobHistoryVertexCompletedHelper(VertexState finalState, long finishTime, String diagnostics) throws IOException {
        HashMap<String, Integer> taskStats = new HashMap<String, Integer>();
        taskStats.put("numCompletedTasks", this.completedTaskCount);
        taskStats.put("numSucceededTasks", this.succeededTaskCount);
        taskStats.put("numFailedTasks", this.failedTaskCount);
        taskStats.put("numKilledTasks", this.killedTaskCount);
        taskStats.put("numFailedTaskAttempts", this.failedTaskAttemptCount.get());
        taskStats.put("numKilledTaskAttempts", this.killedTaskAttemptCount.get());
        VertexFinishedEvent finishEvt = new VertexFinishedEvent(this.vertexId, this.vertexName, this.numTasks, this.initTimeRequested, this.initedTime, this.startTimeRequested, this.startedTime, finishTime, finalState, diagnostics, this.getAllCounters(), this.getVertexStats(), taskStats);
        this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.getDAGId(), finishEvt));
    }

    private static VertexState commitOrFinish(final VertexImpl vertex) {
        if (vertex.outputCommitters != null && !vertex.outputCommitters.isEmpty()) {
            boolean firstCommit = true;
            for (Map.Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
                final OutputCommitter committer = entry.getValue();
                final String outputName = entry.getKey();
                if (vertex.sharedOutputs.contains(outputName)) continue;
                if (firstCommit) {
                    LOG.info("Invoking committer commit for vertex, vertexId=" + vertex.logIdentifier);
                    try {
                        vertex.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(vertex.getDAGId(), new VertexCommitStartedEvent(vertex.vertexId, vertex.clock.getTime())));
                    }
                    catch (IOException e) {
                        LOG.error("Failed to persist commit start event to recovery, vertex=" + vertex.logIdentifier, (Throwable)e);
                        vertex.trySetTerminationCause(VertexTerminationCause.RECOVERY_ERROR);
                        return vertex.finished(VertexState.FAILED);
                    }
                    firstCommit = false;
                }
                VertexCommitCallback commitCallback = new VertexCommitCallback(vertex, outputName);
                CallableEvent commitCallableEvent = new CallableEvent(commitCallback){

                    @Override
                    public Void call() throws Exception {
                        vertex.dagUgi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                            @Override
                            public Void run() throws Exception {
                                LOG.info("Invoking committer commit for output=" + outputName + ", vertexId=" + vertex.logIdentifier);
                                committer.commitOutput();
                                return null;
                            }
                        });
                        return null;
                    }
                };
                ListenableFuture commitFuture = vertex.getAppContext().getExecService().submit((Callable)commitCallableEvent);
                Futures.addCallback((ListenableFuture)commitFuture, commitCallableEvent.getCallback());
                vertex.commitFutures.put(outputName, (ListenableFuture<Void>)commitFuture);
            }
        }
        if (vertex.commitFutures.isEmpty()) {
            return vertex.finished(VertexState.SUCCEEDED);
        }
        return VertexState.COMMITTING;
    }

    static VertexState checkTasksForCompletion(VertexImpl vertex) {
        LOG.info("Checking tasks for vertex completion for " + vertex.logIdentifier + ", numTasks=" + vertex.numTasks + ", failedTaskCount=" + vertex.failedTaskCount + ", killedTaskCount=" + vertex.killedTaskCount + ", successfulTaskCount=" + vertex.succeededTaskCount + ", completedTaskCount=" + vertex.completedTaskCount + ", commitInProgress=" + vertex.commitFutures.size() + ", terminationCause=" + (Object)((Object)vertex.terminationCause));
        if (vertex.completedTaskCount > vertex.tasks.size()) {
            LOG.error("task completion accounting issue: completedTaskCount > nTasks: for vertex " + vertex.logIdentifier + ", numTasks=" + vertex.numTasks + ", failedTaskCount=" + vertex.failedTaskCount + ", killedTaskCount=" + vertex.killedTaskCount + ", successfulTaskCount=" + vertex.succeededTaskCount + ", completedTaskCount=" + vertex.completedTaskCount + ", terminationCause=" + (Object)((Object)vertex.terminationCause));
        }
        if (vertex.completedTaskCount == vertex.tasks.size()) {
            vertex.finalStatistics = vertex.constructStatistics();
            if (vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
                LOG.info("All tasks are succeeded, vertex:" + vertex.logIdentifier);
                if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
                    return VertexImpl.commitOrFinish(vertex);
                }
                return vertex.finished(VertexState.SUCCEEDED);
            }
            return VertexImpl.finishWithTerminationCause(vertex);
        }
        return vertex.getInternalState();
    }

    static VertexState checkCommitsForCompletion(VertexImpl vertex) {
        LOG.info("Checking commits for vertex completion for " + vertex.logIdentifier + ", numTasks=" + vertex.numTasks + ", failedTaskCount=" + vertex.failedTaskCount + ", killedTaskCount=" + vertex.killedTaskCount + ", successfulTaskCount=" + vertex.succeededTaskCount + ", completedTaskCount=" + vertex.completedTaskCount + ", commitInProgress=" + vertex.commitFutures.size() + ", terminationCause=" + (Object)((Object)vertex.terminationCause));
        if (vertex.terminationCause == null) {
            Preconditions.checkState((vertex.getState() == VertexState.COMMITTING ? 1 : 0) != 0, (Object)("Vertex should be in COMMITTING state, but in " + (Object)((Object)vertex.getState()) + ", vertex:" + vertex.getLogIdentifier()));
            if (vertex.commitFutures.isEmpty()) {
                return vertex.finished(VertexState.SUCCEEDED);
            }
            return VertexState.COMMITTING;
        }
        if (!vertex.commitFutures.isEmpty()) {
            return VertexState.TERMINATING;
        }
        return VertexImpl.finishWithTerminationCause(vertex);
    }

    private static VertexState finishWithTerminationCause(VertexImpl vertex) {
        Preconditions.checkArgument((vertex.getTerminationCause() != null ? 1 : 0) != 0, (Object)"TerminationCause is not set");
        String diagnosticMsg = "Vertex did not succeed due to " + (Object)((Object)vertex.getTerminationCause()) + ", failedTasks:" + vertex.failedTaskCount + " killedTasks:" + vertex.killedTaskCount;
        LOG.info(diagnosticMsg);
        vertex.addDiagnostic(diagnosticMsg);
        return vertex.finished(vertex.getTerminationCause().getFinishedState());
    }

    void tryEnactKill(VertexTerminationCause trigger, TaskTerminationCause taskterminationCause) {
        TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN;
        if (taskterminationCause == TaskTerminationCause.DAG_KILL) {
            errCause = TaskAttemptTerminationCause.TERMINATED_BY_CLIENT;
        }
        if (this.trySetTerminationCause(trigger)) {
            String msg = "Killing tasks in vertex: " + this.logIdentifier + " due to trigger: " + (Object)((Object)trigger);
            LOG.info(msg);
            for (Task task : this.tasks.values()) {
                this.eventHandler.handle((Event)new TaskEventTermination(task.getTaskId(), errCause, msg));
            }
        }
    }

    VertexState finished(VertexState finalState, VertexTerminationCause terminationCause, String diag) {
        if (this.finishTime == 0L) {
            this.setFinishTime();
        }
        if (terminationCause != null) {
            this.trySetTerminationCause(terminationCause);
        }
        if (this.rootInputInitializerManager != null) {
            this.rootInputInitializerManager.shutdown();
            this.rootInputInitializerManager = null;
        }
        switch (finalState) {
            case ERROR: {
                this.addDiagnostic("Vertex: " + this.logIdentifier + " error due to:" + (Object)((Object)terminationCause));
                if (!StringUtils.isEmpty((String)diag)) {
                    this.addDiagnostic(diag);
                }
                this.abortVertex(VertexStatus.State.valueOf((String)finalState.name()));
                this.eventHandler.handle((Event)new DAGEvent(this.getDAGId(), DAGEventType.INTERNAL_ERROR));
                try {
                    this.logJobHistoryVertexFailedEvent(finalState);
                }
                catch (IOException e) {
                    LOG.error("Failed to send vertex finished event to recovery", (Throwable)e);
                }
                break;
            }
            case KILLED: 
            case FAILED: {
                this.addDiagnostic("Vertex " + this.logIdentifier + " killed/failed due to:" + (Object)((Object)terminationCause));
                if (!StringUtils.isEmpty((String)diag)) {
                    this.addDiagnostic(diag);
                }
                this.abortVertex(VertexStatus.State.valueOf((String)finalState.name()));
                this.eventHandler.handle((Event)new DAGEventVertexCompleted(this.getVertexId(), finalState, terminationCause));
                try {
                    this.logJobHistoryVertexFailedEvent(finalState);
                }
                catch (IOException e) {
                    LOG.error("Failed to send vertex finished event to recovery", (Throwable)e);
                }
                break;
            }
            case SUCCEEDED: {
                try {
                    this.logJobHistoryVertexFinishedEvent();
                    this.eventHandler.handle((Event)new DAGEventVertexCompleted(this.getVertexId(), finalState));
                }
                catch (IOException e) {
                    LOG.error("Failed to send vertex finished event to recovery", (Throwable)e);
                    finalState = VertexState.FAILED;
                    this.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
                    this.eventHandler.handle((Event)new DAGEventVertexCompleted(this.getVertexId(), finalState));
                }
                break;
            }
            default: {
                throw new TezUncheckedException("Unexpected VertexState: " + (Object)((Object)finalState));
            }
        }
        return finalState;
    }

    VertexState finished(VertexState finalState) {
        return this.finished(finalState, null, null);
    }

    private void initializeCommitters() throws Exception {
        if (!this.additionalOutputSpecs.isEmpty()) {
            LOG.info("Invoking committer inits for vertex, vertexId=" + this.logIdentifier);
            for (Map.Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry : this.additionalOutputs.entrySet()) {
                final String outputName = entry.getKey();
                final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue();
                if (od.getControllerDescriptor() == null || ((OutputCommitterDescriptor)od.getControllerDescriptor()).getClassName() == null) {
                    LOG.info("Ignoring committer as none specified for output=" + outputName + ", vertexId=" + this.logIdentifier);
                    continue;
                }
                LOG.info("Instantiating committer for output=" + outputName + ", vertexId=" + this.logIdentifier + ", committerClass=" + ((OutputCommitterDescriptor)od.getControllerDescriptor()).getClassName());
                this.dagUgi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                    @Override
                    public Void run() throws Exception {
                        OutputCommitterContextImpl outputCommitterContext = new OutputCommitterContextImpl(VertexImpl.this.appContext.getApplicationID(), VertexImpl.this.appContext.getApplicationAttemptId().getAttemptId(), VertexImpl.this.appContext.getCurrentDAG().getName(), VertexImpl.this.vertexName, (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>)od, VertexImpl.this.vertexId.getId());
                        OutputCommitter outputCommitter = (OutputCommitter)ReflectionUtils.createClazzInstance((String)((OutputCommitterDescriptor)od.getControllerDescriptor()).getClassName(), (Class[])new Class[]{OutputCommitterContext.class}, (Object[])new Object[]{outputCommitterContext});
                        LOG.info("Invoking committer init for output=" + outputName + ", vertexId=" + VertexImpl.this.logIdentifier);
                        outputCommitter.initialize();
                        VertexImpl.this.outputCommitters.put(outputName, outputCommitter);
                        LOG.info("Invoking committer setup for output=" + outputName + ", vertexId=" + VertexImpl.this.logIdentifier);
                        outputCommitter.setupOutput();
                        return null;
                    }
                });
            }
        }
    }

    private boolean initializeVertex() {
        try {
            this.initializeCommitters();
        }
        catch (Exception e) {
            LOG.warn("Vertex Committer init failed, vertex=" + this.logIdentifier, (Throwable)e);
            this.addDiagnostic("Vertex init failed : " + ExceptionUtils.getStackTrace((Throwable)e));
            this.trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
            this.finished(VertexState.FAILED);
            return false;
        }
        this.initedTime = this.clock.getTime();
        this.logJobHistoryVertexInitializedEvent();
        return true;
    }

    private void checkTaskLimits() {
    }

    @VisibleForTesting
    ContainerContext getContainerContext(int taskIdx) {
        if (this.taskSpecificLaunchCmdOpts.addTaskSpecificLaunchCmdOption(this.vertexName, taskIdx)) {
            String jvmOpts;
            String string = jvmOpts = this.javaOptsTaskSpecific != null ? this.javaOptsTaskSpecific : this.javaOpts;
            if (this.taskSpecificLaunchCmdOpts.hasModifiedTaskLaunchOpts()) {
                jvmOpts = this.taskSpecificLaunchCmdOpts.getTaskSpecificOption(jvmOpts, this.vertexName, taskIdx);
            }
            ContainerContext context = new ContainerContext(this.localResources, this.appContext.getCurrentDAG().getCredentials(), this.environmentTaskSpecific != null ? this.environmentTaskSpecific : this.environment, jvmOpts);
            return context;
        }
        return this.containerContext;
    }

    private TaskImpl createTask(int taskIndex) {
        ContainerContext conContext = this.getContainerContext(taskIndex);
        return new TaskImpl(this.getVertexId(), taskIndex, this.eventHandler, this.vertexConf, this.taskCommunicatorManagerInterface, this.clock, this.taskHeartbeatHandler, this.appContext, this.targetVertices != null ? this.targetVertices.isEmpty() : true, this.taskResource, conContext, this.stateChangeNotifier, this);
    }

    private void createTasks() {
        for (int i = 0; i < this.numTasks; ++i) {
            TaskImpl task = this.createTask(i);
            this.addTask(task);
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("Created task for vertex " + this.logIdentifier + ": " + task.getTaskId());
        }
    }

    private void addTasks(int newNumTasks) {
        int initialNumTasks;
        Preconditions.checkArgument((newNumTasks > this.numTasks ? 1 : 0) != 0, (Object)this.getLogIdentifier());
        for (int i = initialNumTasks = this.numTasks; i < newNumTasks; ++i) {
            TaskImpl task = this.createTask(i);
            this.addTask(task);
            ++this.numTasks;
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("Created task for vertex " + this.logIdentifier + ": " + task.getTaskId());
        }
    }

    private void removeTasks(int newNumTasks) {
        Preconditions.checkArgument((newNumTasks < this.numTasks ? 1 : 0) != 0, (Object)this.getLogIdentifier());
        LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
        Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet().iterator();
        int i = 0;
        while (iter.hasNext()) {
            ++i;
            Map.Entry<TezTaskID, Task> entry = iter.next();
            Task task = entry.getValue();
            if (task.getState() != TaskState.NEW) {
                String msg = "All tasks must be in initial state when changing parallelism for vertex: " + this.getLogIdentifier();
                LOG.warn(msg);
                throw new TezUncheckedException(msg);
            }
            if (i <= newNumTasks) continue;
            LOG.info("Removing task: " + entry.getKey());
            iter.remove();
            --this.numTasks;
        }
    }

    private VertexState setupVertex() {
        return this.setupVertex(null);
    }

    private VertexState setupVertex(VertexInitializedEvent event) {
        if (event == null) {
            this.initTimeRequested = this.clock.getTime();
        } else {
            this.initTimeRequested = event.getInitRequestedTime();
            this.initedTime = event.getInitedTime();
        }
        if (this.dagVertexGroups != null && !this.dagVertexGroups.isEmpty()) {
            LinkedList groupSpecList = Lists.newLinkedList();
            for (DAGImpl.VertexGroupInfo groupInfo : this.dagVertexGroups.values()) {
                if (!groupInfo.edgeMergedInputs.containsKey(this.getName())) continue;
                InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(this.getName());
                groupSpecList.add(new GroupInputSpec(groupInfo.groupName, (List)Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
            }
            if (!groupSpecList.isEmpty()) {
                this.groupInputSpecList = groupSpecList;
            }
        }
        if (event != null) {
            this.rootInputDescriptors = event.getAdditionalInputs();
        } else if (this.rootInputDescriptors != null) {
            LOG.info("Root Inputs exist for Vertex: " + this.getName() + " : " + this.rootInputDescriptors);
            for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : this.rootInputDescriptors.values()) {
                if (input.getControllerDescriptor() == null || ((InputInitializerDescriptor)input.getControllerDescriptor()).getClassName() == null) continue;
                if (this.inputsWithInitializers == null) {
                    this.inputsWithInitializers = Sets.newHashSet();
                }
                this.inputsWithInitializers.add(input.getName());
                LOG.info("Starting root input initializer for input: " + input.getName() + ", with class: [" + ((InputInitializerDescriptor)input.getControllerDescriptor()).getClassName() + "]");
            }
        }
        boolean hasBipartite = false;
        if (this.sourceVertices != null) {
            for (Edge edge : this.sourceVertices.values()) {
                if (edge.getEdgeProperty().getDataMovementType() != EdgeProperty.DataMovementType.SCATTER_GATHER) continue;
                hasBipartite = true;
                break;
            }
        }
        if (hasBipartite && this.inputsWithInitializers != null) {
            LOG.error("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
            if (event != null) {
                return VertexState.FAILED;
            }
            return this.finished(VertexState.FAILED);
        }
        this.assignVertexManager();
        try {
            this.vertexManager.initialize();
            this.vmIsInitialized.set(true);
        }
        catch (AMUserCodeException e) {
            String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + this.logIdentifier;
            LOG.error(msg, (Throwable)((Object)e));
            this.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
            return VertexState.FAILED;
        }
        if (event != null) {
            int oldNumTasks = this.numTasks;
            this.numTasks = event.getNumTasks();
            this.stateChangeNotifier.stateChanged(this.vertexId, (VertexStateUpdate)new VertexStateUpdateParallelismUpdated(this.vertexName, this.numTasks, oldNumTasks));
        } else {
            this.numTasks = this.getVertexPlan().getTaskConfig().getNumTasks();
        }
        if (this.numTasks != -1 && this.numTasks < 0) {
            this.addDiagnostic("Invalid task count for vertex, numTasks=" + this.numTasks);
            this.trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
            if (event != null) {
                return this.finished(VertexState.FAILED);
            }
            return VertexState.FAILED;
        }
        this.checkTaskLimits();
        return VertexState.INITED;
    }

    private void assignVertexManager() {
        boolean hasUserVertexManager;
        boolean hasBipartite = false;
        boolean hasOneToOne = false;
        boolean hasCustom = false;
        if (this.sourceVertices != null) {
            block6: for (Edge edge : this.sourceVertices.values()) {
                switch (edge.getEdgeProperty().getDataMovementType()) {
                    case SCATTER_GATHER: {
                        hasBipartite = true;
                        continue block6;
                    }
                    case ONE_TO_ONE: {
                        hasOneToOne = true;
                        continue block6;
                    }
                    case BROADCAST: {
                        continue block6;
                    }
                    case CUSTOM: {
                        hasCustom = true;
                        continue block6;
                    }
                }
                throw new TezUncheckedException("Unknown data movement type: " + edge.getEdgeProperty().getDataMovementType());
            }
        }
        if (hasUserVertexManager = this.vertexPlan.hasVertexManagerPlugin()) {
            VertexManagerPluginDescriptor pluginDesc = DagTypeConverters.convertVertexManagerPluginDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)this.vertexPlan.getVertexManagerPlugin());
            LOG.info("Setting user vertex manager plugin: " + pluginDesc.getClassName() + " on vertex: " + this.getLogIdentifier());
            this.vertexManager = new VertexManager(pluginDesc, this.dagUgi, this, this.appContext, this.stateChangeNotifier);
        } else if (this.inputsWithInitializers != null) {
            LOG.info("Setting vertexManager to RootInputVertexManager for " + this.logIdentifier);
            this.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create((String)RootInputVertexManager.class.getName()), this.dagUgi, this, this.appContext, this.stateChangeNotifier);
        } else if (hasOneToOne && !hasCustom) {
            LOG.info("Setting vertexManager to InputReadyVertexManager for " + this.logIdentifier);
            this.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create((String)InputReadyVertexManager.class.getName()), this.dagUgi, this, this.appContext, this.stateChangeNotifier);
        } else if (hasBipartite && !hasCustom) {
            LOG.info("Setting vertexManager to ShuffleVertexManager for " + this.logIdentifier);
            this.vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder((Configuration)this.vertexConf).build(), this.dagUgi, this, this.appContext, this.stateChangeNotifier);
        } else {
            LOG.info("Setting vertexManager to ImmediateStartVertexManager for " + this.logIdentifier);
            this.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create((String)ImmediateStartVertexManager.class.getName()), this.dagUgi, this, this.appContext, this.stateChangeNotifier);
        }
    }

    private static List<TaskAttemptIdentifier> getTaskAttemptIdentifiers(DAG dag, List<TezTaskAttemptID> taIds) {
        ArrayList<TaskAttemptIdentifier> attempts = new ArrayList<TaskAttemptIdentifier>(taIds.size());
        String dagName = dag.getName();
        for (TezTaskAttemptID taId : taIds) {
            String vertexName = dag.getVertex(taId.getTaskID().getVertexID()).getName();
            attempts.add(VertexImpl.getTaskAttemptIdentifier(dagName, vertexName, taId));
        }
        return attempts;
    }

    private static TaskAttemptIdentifier getTaskAttemptIdentifier(String dagName, String vertexName, TezTaskAttemptID taId) {
        return new TaskAttemptIdentifierImpl(dagName, vertexName, taId);
    }

    private void recoveryCodeSimulatingStart() throws AMUserCodeException {
        this.vertexManager.onVertexStarted(VertexImpl.getTaskAttemptIdentifiers(this.dag, this.pendingReportedSrcCompletions));
        this.maybeSendConfiguredEvent();
    }

    private void routeRecoveredEvents(VertexState vertexState, List<TezEvent> tezEvents) {
        for (TezEvent tezEvent : tezEvents) {
            EventMetaData sourceMeta = tezEvent.getSourceInfo();
            TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
            if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
                ((DataMovementEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
            } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
                ((CompositeDataMovementEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
            } else if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
                ((InputFailedEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
            } else {
                if (tezEvent.getEventType() == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
                    if (vertexState != VertexState.RUNNING && vertexState != VertexState.INITED) continue;
                    this.eventHandler.handle((Event)new VertexEventRouteEvent(this.getVertexId(), Collections.singletonList(tezEvent), true));
                    continue;
                }
                if (tezEvent.getEventType() == EventType.ROOT_INPUT_INITIALIZER_EVENT) {
                    InputInitializerEvent iiEvent = (InputInitializerEvent)tezEvent.getEvent();
                    iiEvent.setSourceVertexName(this.vertexName);
                    this.eventHandler.handle((Event)new VertexEventRouteEvent(this.getDAG().getVertex(iiEvent.getTargetVertexName()).getVertexId(), Collections.singletonList(tezEvent), true));
                    continue;
                }
            }
            Vertex destVertex = this.getDAG().getVertex(sourceMeta.getEdgeVertexName());
            Edge destEdge = this.targetVertices.get(destVertex);
            if (destEdge == null) {
                throw new TezUncheckedException("Bad destination vertex: " + sourceMeta.getEdgeVertexName() + " for event vertex: " + this.getLogIdentifier());
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Routing recovered event, vertex=" + this.logIdentifier + ", eventType=" + tezEvent.getEventType() + ", sourceInfo=" + sourceMeta + ", destinationVertex=" + destVertex.getLogIdentifier());
            }
            this.eventHandler.handle((Event)new VertexEventRouteEvent(destVertex.getVertexId(), Collections.singletonList(tezEvent), true));
        }
    }

    @VisibleForTesting
    protected RootInputInitializerManager createRootInputInitializerManager(String dagName, String vertexName, TezVertexID vertexID, EventHandler eventHandler, int numTasks, int numNodes, Resource vertexTaskResource, Resource totalResource) {
        return new RootInputInitializerManager(this, this.appContext, this.dagUgi, this.stateChangeNotifier);
    }

    private boolean initializeVertexInInitializingState() {
        boolean isInitialized = this.initializeVertex();
        return isInitialized;
    }

    void startIfPossible() {
        if (this.startSignalPending) {
            LOG.info("Triggering start event for vertex: " + this.logIdentifier + " with distanceFromRoot: " + this.distanceFromRoot);
            this.eventHandler.handle((Event)new VertexEvent(this.vertexId, VertexEventType.V_START));
        }
    }

    boolean canInitVertex() {
        if (this.numTasks >= 0 && this.uninitializedEdges.isEmpty() && !this.initWaitsForRootInitializers) {
            return true;
        }
        LOG.info("Cannot init vertex: " + this.logIdentifier + " numTasks: " + this.numTasks + " numUnitializedEdges: " + this.uninitializedEdges.size() + " numInitializedInputs: " + this.numInitializedInputs + " initWaitsForRootInitializers: " + this.initWaitsForRootInitializers);
        return false;
    }

    private void maybeSendConfiguredEvent() {
        Preconditions.checkState((boolean)this.canInitVertex(), (Object)("Vertex: " + this.getLogIdentifier()));
        if (!this.vertexToBeReconfiguredByManager && this.completelyConfiguredSent.compareAndSet(false, true)) {
            this.stateChangeNotifier.stateChanged(this.vertexId, new VertexStateUpdate(this.vertexName, org.apache.tez.dag.api.event.VertexState.CONFIGURED));
        }
    }

    private VertexState startVertex() {
        Preconditions.checkState((this.getState() == VertexState.INITED ? 1 : 0) != 0, (Object)("Vertex must be inited " + this.logIdentifier));
        this.startedTime = this.clock.getTime();
        try {
            this.vertexManager.onVertexStarted(VertexImpl.getTaskAttemptIdentifiers(this.dag, this.pendingReportedSrcCompletions));
        }
        catch (AMUserCodeException e) {
            String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + this.logIdentifier;
            LOG.error(msg, (Throwable)((Object)e));
            this.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
            this.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
            return VertexState.TERMINATING;
        }
        this.pendingReportedSrcCompletions.clear();
        this.logJobHistoryVertexStartedEvent();
        this.maybeSendConfiguredEvent();
        if (this.targetVertices != null) {
            for (Vertex targetVertex : this.targetVertices.keySet()) {
                this.eventHandler.handle((Event)new VertexEventSourceVertexStarted(targetVertex.getVertexId(), this.getVertexId(), this.distanceFromRoot));
            }
        }
        if (this.numTasks == 0) {
            this.eventHandler.handle((Event)new VertexEvent(this.vertexId, VertexEventType.V_COMPLETED));
        }
        return VertexState.RUNNING;
    }

    void abortVertex(final VertexStatus.State finalState) {
        if (this.aborted.getAndSet(true)) {
            LOG.info("Ignoring multiple aborts for vertex: " + this.logIdentifier);
            return;
        }
        if (this.outputCommitters != null) {
            LOG.info("Invoking committer abort for vertex, vertexId=" + this.logIdentifier);
            try {
                this.dagUgi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                    @Override
                    public Void run() {
                        for (Map.Entry entry : VertexImpl.this.outputCommitters.entrySet()) {
                            try {
                                LOG.info("Invoking committer abort for output=" + (String)entry.getKey() + ", vertexId=" + VertexImpl.this.logIdentifier);
                                ((OutputCommitter)entry.getValue()).abortOutput(finalState);
                            }
                            catch (Exception e) {
                                LOG.warn("Could not abort committer for output=" + (String)entry.getKey() + ", vertexId=" + VertexImpl.this.logIdentifier, (Throwable)e);
                            }
                        }
                        return null;
                    }
                });
            }
            catch (Exception e) {
                throw new TezUncheckedException("Unknown error while attempting VertexCommitter(s) abort", (Throwable)e);
            }
        }
        if (this.finishTime == 0L) {
            this.setFinishTime();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mayBeConstructFinalFullCounters() {
        Object object = this.fullCountersLock;
        synchronized (object) {
            if (this.fullCounters != null) {
                return;
            }
            this.constructFinalFullcounters();
        }
    }

    private VertexStatisticsImpl constructStatistics() {
        return this.completedTasksStatsCache;
    }

    @InterfaceAudience.Private
    public void constructFinalFullcounters() {
        this.fullCounters = new TezCounters();
        this.vertexStats = new VertexStats();
        for (Task t : this.tasks.values()) {
            this.vertexStats.updateStats(t.getReport());
            TezCounters counters = t.getCounters();
            this.fullCounters.incrAllCounters((AbstractCounters)counters);
        }
    }

    private void commitCompleted(VertexEventCommitCompleted commitCompletedEvent) {
        Preconditions.checkState((this.commitFutures.remove(commitCompletedEvent.getOutputName()) != null ? 1 : 0) != 0, (Object)("Unknown commit:" + commitCompletedEvent.getOutputName() + ", vertex=" + this.logIdentifier));
        if (commitCompletedEvent.isSucceeded()) {
            LOG.info("Commit succeeded for output:" + commitCompletedEvent.getOutputName() + ", vertexId=" + this.logIdentifier);
        } else {
            String diag = "Commit failed for output:" + commitCompletedEvent.getOutputName() + ", vertexId=" + this.logIdentifier + ", " + ExceptionUtils.getStackTrace((Throwable)commitCompletedEvent.getException());
            LOG.info(diag);
            this.addDiagnostic(diag);
            this.trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE);
            this.cancelCommits();
        }
    }

    private void cancelCommits() {
        if (!this.commitCanceled.getAndSet(true)) {
            for (Map.Entry<String, ListenableFuture<Void>> entry : this.commitFutures.entrySet()) {
                LOG.info("Canceling commit of output:" + entry.getKey() + ", vertexId=" + this.logIdentifier);
                entry.getValue().cancel(true);
            }
        }
    }

    private void addDiagnostic(String diag) {
        this.diagnostics.add(diag);
    }

    private static boolean isEventFromVertex(Vertex vertex, EventMetaData sourceMeta) {
        return sourceMeta.getTaskVertexName().equals(vertex.getName());
    }

    private static void checkEventSourceMetadata(Vertex vertex, EventMetaData sourceMeta) {
        assert (VertexImpl.isEventFromVertex(vertex, sourceMeta));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID, int fromEventId, int preRoutedFromEventId, int maxEvents) {
        Task task = this.getTask(attemptID.getTaskID());
        ArrayList events = task.getTaskAttemptTezEvents(attemptID, preRoutedFromEventId, maxEvents);
        int nextPreRoutedFromEventId = preRoutedFromEventId + events.size();
        int nextFromEventId = fromEventId;
        this.onDemandRouteEventsReadLock.lock();
        try {
            int currEventCount = this.onDemandRouteEvents.size();
            try {
                if (currEventCount > fromEventId) {
                    int numEventsSent;
                    if (events != TaskImpl.EMPTY_TASK_ATTEMPT_TEZ_EVENTS) {
                        events.ensureCapacity(maxEvents);
                    } else {
                        events = Lists.newArrayListWithCapacity((int)maxEvents);
                    }
                    int numPreRoutedEvents = events.size();
                    int taskIndex = attemptID.getTaskID().getId();
                    Preconditions.checkState((taskIndex < this.tasks.size() ? 1 : 0) != 0, (Object)("Invalid task index for TA: " + attemptID + " vertex: " + this.getLogIdentifier()));
                    boolean isFirstEvent = true;
                    boolean firstEventObsoleted = false;
                    for (nextFromEventId = fromEventId; nextFromEventId < currEventCount; ++nextFromEventId) {
                        boolean earlyExit = false;
                        if (events.size() == maxEvents) break;
                        EventInfo eventInfo = this.onDemandRouteEvents.get(nextFromEventId);
                        if (eventInfo.isObsolete) {
                            firstEventObsoleted = true;
                            continue;
                        }
                        TezEvent tezEvent = eventInfo.tezEvent;
                        switch (tezEvent.getEventType()) {
                            case INPUT_FAILED_EVENT: 
                            case DATA_MOVEMENT_EVENT: 
                            case COMPOSITE_DATA_MOVEMENT_EVENT: {
                                int srcTaskIndex = eventInfo.eventTaskIndex;
                                Edge srcEdge = eventInfo.eventEdge;
                                Edge.PendingEventRouteMetadata pendingRoute = null;
                                if (isFirstEvent) {
                                    isFirstEvent = false;
                                    pendingRoute = srcEdge.removePendingEvents(attemptID);
                                    if (pendingRoute != null && tezEvent != pendingRoute.getTezEvent()) {
                                        Preconditions.checkState((boolean)firstEventObsoleted);
                                        pendingRoute = null;
                                    }
                                }
                                if (srcEdge.maybeAddTezEventForDestinationTask(tezEvent, attemptID, srcTaskIndex, events, maxEvents, pendingRoute)) break;
                                earlyExit = true;
                                break;
                            }
                            case ROOT_INPUT_DATA_INFORMATION_EVENT: {
                                InputDataInformationEvent riEvent = (InputDataInformationEvent)tezEvent.getEvent();
                                if (riEvent.getTargetIndex() != taskIndex) break;
                                events.add(tezEvent);
                                break;
                            }
                            default: {
                                throw new TezUncheckedException("Unexpected event type for task: " + tezEvent.getEventType());
                            }
                        }
                        if (earlyExit) break;
                    }
                    if ((numEventsSent = events.size() - numPreRoutedEvents) > 0) {
                        StringBuilder builder = new StringBuilder();
                        builder.append("Sending ").append(attemptID).append(" numEvents: ").append(numEventsSent).append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId).append(" out of ").append(currEventCount).append(" on-demand events in vertex: ").append(this.getLogIdentifier());
                        LOG.info(builder.toString());
                    }
                }
            }
            catch (AMUserCodeException e) {
                String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + this.getLogIdentifier();
                LOG.error(msg, (Throwable)((Object)e));
                this.eventHandler.handle((Event)new VertexEventManagerUserCodeError(this.getVertexId(), e));
                nextFromEventId = fromEventId;
                events.clear();
            }
        }
        finally {
            this.onDemandRouteEventsReadLock.unlock();
        }
        if (!events.isEmpty()) {
            for (int i = events.size() - 1; i >= 0; --i) {
                TezEvent lastEvent = (TezEvent)events.get(i);
                EventType lastEventType = lastEvent.getEventType();
                if (lastEventType != EventType.COMPOSITE_DATA_MOVEMENT_EVENT && lastEventType != EventType.DATA_MOVEMENT_EVENT && lastEventType != EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) continue;
                task.getAttempt(attemptID).setLastEventSent(lastEvent);
                break;
            }
        }
        return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId);
    }

    private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean recovered, boolean isPendingEvents) throws AMUserCodeException {
        if (this.getAppContext().isRecoveryEnabled() && !recovered && !isPendingEvents && !tezEvents.isEmpty()) {
            ArrayList recoveryEvents = Lists.newArrayList();
            for (TezEvent tezEvent : tezEvents) {
                if (!VertexImpl.isEventFromVertex(this, tezEvent.getSourceInfo()) || !tezEvent.getEventType().equals((Object)EventType.COMPOSITE_DATA_MOVEMENT_EVENT) && !tezEvent.getEventType().equals((Object)EventType.DATA_MOVEMENT_EVENT) && !tezEvent.getEventType().equals((Object)EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) && !tezEvent.getEventType().equals((Object)EventType.ROOT_INPUT_INITIALIZER_EVENT)) continue;
                recoveryEvents.add(tezEvent);
            }
            if (!recoveryEvents.isEmpty()) {
                VertexRecoverableEventsGeneratedEvent historyEvent = new VertexRecoverableEventsGeneratedEvent(this.vertexId, recoveryEvents);
                this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.getDAGId(), historyEvent));
            }
        }
        block16: for (TezEvent tezEvent : tezEvents) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Vertex: " + this.getLogIdentifier() + " routing event: " + tezEvent.getEventType() + " Recovered:" + recovered);
            }
            EventMetaData sourceMeta = tezEvent.getSourceInfo();
            switch (tezEvent.getEventType()) {
                case INPUT_FAILED_EVENT: 
                case DATA_MOVEMENT_EVENT: 
                case COMPOSITE_DATA_MOVEMENT_EVENT: {
                    if (VertexImpl.isEventFromVertex(this, sourceMeta)) {
                        TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
                        if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
                            ((DataMovementEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
                        } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
                            ((CompositeDataMovementEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
                        } else {
                            ((InputFailedEvent)tezEvent.getEvent()).setVersion(srcTaId.getId());
                        }
                        Vertex destVertex = this.getDAG().getVertex(sourceMeta.getEdgeVertexName());
                        Edge destEdge = this.targetVertices.get(destVertex);
                        if (destEdge == null) {
                            throw new TezUncheckedException("Bad destination vertex: " + sourceMeta.getEdgeVertexName() + " for event vertex: " + this.getLogIdentifier());
                        }
                        this.eventHandler.handle((Event)new VertexEventRouteEvent(destVertex.getVertexId(), Collections.singletonList(tezEvent)));
                        break;
                    }
                    if (this.tasksNotYetScheduled) {
                        this.pendingTaskEvents.add(tezEvent);
                        break;
                    }
                    int srcTaskIndex = sourceMeta.getTaskAttemptID().getTaskID().getId();
                    Vertex edgeVertex = this.getDAG().getVertex(sourceMeta.getTaskVertexName());
                    Edge srcEdge = this.sourceVertices.get(edgeVertex);
                    if (srcEdge == null) {
                        throw new TezUncheckedException("Bad source vertex: " + sourceMeta.getTaskVertexName() + " for destination vertex: " + this.getLogIdentifier());
                    }
                    if (srcEdge.hasOnDemandRouting()) {
                        this.processOnDemandEvent(tezEvent, srcEdge, srcTaskIndex);
                        break;
                    }
                    srcEdge.sendTezEventToDestinationTasks(tezEvent);
                    break;
                }
                case ROOT_INPUT_DATA_INFORMATION_EVENT: {
                    VertexImpl.checkEventSourceMetadata(this, sourceMeta);
                    if (this.tasksNotYetScheduled) {
                        this.pendingTaskEvents.add(tezEvent);
                        break;
                    }
                    InputInitializerEvent riEvent = (InputDataInformationEvent)tezEvent.getEvent();
                    Task targetTask = this.getTask(riEvent.getTargetIndex());
                    targetTask.registerTezEvent(tezEvent);
                    break;
                }
                case VERTEX_MANAGER_EVENT: {
                    VertexManagerEvent vmEvent = (VertexManagerEvent)tezEvent.getEvent();
                    Vertex target = this.getDAG().getVertex(vmEvent.getTargetVertexName());
                    Preconditions.checkArgument((target != null ? 1 : 0) != 0, (Object)("Event sent to unkown vertex: " + vmEvent.getTargetVertexName()));
                    TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
                    if (srcTaId.getTaskID().getVertexID().equals((Object)this.vertexId)) {
                        vmEvent.setProducerAttemptIdentifier(VertexImpl.getTaskAttemptIdentifier(this.dag.getName(), this.getName(), srcTaId));
                    }
                    if (target == this) {
                        this.vertexManager.onVertexManagerEventReceived(vmEvent);
                        break;
                    }
                    VertexImpl.checkEventSourceMetadata(this, sourceMeta);
                    this.eventHandler.handle((Event)new VertexEventRouteEvent(target.getVertexId(), Collections.singletonList(tezEvent)));
                    break;
                }
                case ROOT_INPUT_INITIALIZER_EVENT: {
                    InputInitializerEvent riEvent = (InputInitializerEvent)tezEvent.getEvent();
                    Vertex target = this.getDAG().getVertex(riEvent.getTargetVertexName());
                    Preconditions.checkArgument((target != null ? 1 : 0) != 0, (Object)("Event sent to unknown vertex: " + riEvent.getTargetVertexName()));
                    riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName());
                    if (target == this) {
                        if (this.rootInputDescriptors == null || !this.rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
                            throw new TezUncheckedException("InputInitializerEvent targeted at unknown initializer on vertex " + this.logIdentifier + ", Event=" + riEvent);
                        }
                        if (this.getState() == VertexState.NEW) {
                            this.pendingInitializerEvents.add(tezEvent);
                            break;
                        }
                        if (this.getState() == VertexState.INITIALIZING) {
                            this.rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
                            break;
                        }
                        if (!LOG.isDebugEnabled()) continue block16;
                        LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in " + this.getLogIdentifier() + ", state=" + (Object)((Object)this.getState()));
                        break;
                    }
                    VertexImpl.checkEventSourceMetadata(this, sourceMeta);
                    this.eventHandler.handle((Event)new VertexEventRouteEvent(target.getVertexId(), Collections.singletonList(tezEvent)));
                    break;
                }
                case INPUT_READ_ERROR_EVENT: {
                    VertexImpl.checkEventSourceMetadata(this, sourceMeta);
                    Edge srcEdge = this.sourceVertices.get(this.getDAG().getVertex(sourceMeta.getEdgeVertexName()));
                    srcEdge.sendTezEventToSourceTasks(tezEvent);
                    break;
                }
                case TASK_ATTEMPT_FAILED_EVENT: {
                    VertexImpl.checkEventSourceMetadata(this, sourceMeta);
                    TaskAttemptTerminationCause errCause = null;
                    switch (sourceMeta.getEventGenerator()) {
                        case INPUT: {
                            errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
                            break;
                        }
                        case PROCESSOR: {
                            errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
                            break;
                        }
                        case OUTPUT: {
                            errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
                            break;
                        }
                        case SYSTEM: {
                            errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
                            break;
                        }
                        default: {
                            throw new TezUncheckedException("Unknown EventProducerConsumerType: " + sourceMeta.getEventGenerator());
                        }
                    }
                    TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent)tezEvent.getEvent();
                    this.getEventHandler().handle((Event)new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_FAILED, "Error: " + taskFailedEvent.getDiagnostics(), errCause));
                    break;
                }
                case TASK_ATTEMPT_COMPLETED_EVENT: {
                    VertexImpl.checkEventSourceMetadata(this, sourceMeta);
                    this.getEventHandler().handle((Event)new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
                    break;
                }
                default: {
                    throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType());
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processOnDemandEvent(TezEvent tezEvent, Edge srcEdge, int srcTaskIndex) {
        this.onDemandRouteEventsWriteLock.lock();
        try {
            this.onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
            if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
                for (EventInfo eventInfo : this.onDemandRouteEvents) {
                    if (eventInfo.eventEdge != srcEdge || !eventInfo.tezEvent.getSourceInfo().getTaskAttemptID().equals((Object)tezEvent.getSourceInfo().getTaskAttemptID()) || eventInfo.tezEvent.getEventType() != EventType.DATA_MOVEMENT_EVENT && eventInfo.tezEvent.getEventType() != EventType.COMPOSITE_DATA_MOVEMENT_EVENT) continue;
                    eventInfo.isObsolete = true;
                }
            }
        }
        finally {
            this.onDemandRouteEventsWriteLock.unlock();
        }
    }

    private void setupInputInitializerManager() {
        this.rootInputInitializerManager = this.createRootInputInitializerManager(this.getDAG().getName(), this.getName(), this.getVertexId(), this.eventHandler, this.getTotalTasks(), this.appContext.getTaskScheduler().getNumClusterNodes(), this.getTaskResource(), this.appContext.getTaskScheduler().getTotalResources(this.taskSchedulerIdentifier));
        ArrayList inputList = Lists.newArrayListWithCapacity((int)this.inputsWithInitializers.size());
        for (String inputName : this.inputsWithInitializers) {
            inputList.add(this.rootInputDescriptors.get(inputName));
        }
        LOG.info("Vertex will initialize via inputInitializers " + this.logIdentifier + ". Starting root input initializers: " + this.inputsWithInitializers.size());
        this.initWaitsForRootInitializers = true;
        this.rootInputInitializerManager.runInputInitializers(inputList);
        this.rootInputInitializerManager.handleInitializerEvents(this.pendingInitializerEvents);
        this.pendingInitializerEvents.clear();
    }

    @Override
    public void setInputVertices(Map<Vertex, Edge> inVertices) {
        this.sourceVertices = inVertices;
        for (Vertex vertex : this.sourceVertices.keySet()) {
            this.addIO(vertex.getName());
        }
    }

    @Override
    public void setOutputVertices(Map<Vertex, Edge> outVertices) {
        this.targetVertices = outVertices;
        for (Vertex vertex : this.targetVertices.keySet()) {
            this.addIO(vertex.getName());
        }
    }

    @Override
    public void setAdditionalInputs(List<DAGProtos.RootInputLeafOutputProto> inputs) {
        this.rootInputDescriptors = Maps.newHashMapWithExpectedSize((int)inputs.size());
        for (DAGProtos.RootInputLeafOutputProto input : inputs) {
            this.addIO(input.getName());
            InputDescriptor id = DagTypeConverters.convertInputDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)input.getIODescriptor());
            this.rootInputDescriptors.put(input.getName(), (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>)new RootInputLeafOutput(input.getName(), (EntityDescriptor)id, input.hasControllerDescriptor() ? DagTypeConverters.convertInputInitializerDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)input.getControllerDescriptor()) : null));
            this.rootInputSpecs.put(input.getName(), DEFAULT_ROOT_INPUT_SPECS);
        }
    }

    @Override
    public void handleSpeculatorEvent(SpeculatorEvent event) {
        if (this.isSpeculationEnabled()) {
            this.speculator.handle(event);
        }
    }

    @Override
    @Nullable
    public Map<String, OutputCommitter> getOutputCommitters() {
        return this.outputCommitters;
    }

    @Nullable
    @InterfaceAudience.Private
    @VisibleForTesting
    public OutputCommitter getOutputCommitter(String outputName) {
        if (this.outputCommitters != null) {
            return this.outputCommitters.get(outputName);
        }
        return null;
    }

    @Override
    public void setAdditionalOutputs(List<DAGProtos.RootInputLeafOutputProto> outputs) {
        LOG.info("setting additional outputs for vertex " + this.vertexName);
        this.additionalOutputs = Maps.newHashMapWithExpectedSize((int)outputs.size());
        this.outputCommitters = Maps.newHashMapWithExpectedSize((int)outputs.size());
        for (DAGProtos.RootInputLeafOutputProto output : outputs) {
            this.addIO(output.getName());
            OutputDescriptor od = DagTypeConverters.convertOutputDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)output.getIODescriptor());
            this.additionalOutputs.put(output.getName(), (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>)new RootInputLeafOutput(output.getName(), (EntityDescriptor)od, output.hasControllerDescriptor() ? DagTypeConverters.convertOutputCommitterDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)output.getControllerDescriptor()) : null));
            OutputSpec outputSpec = new OutputSpec(output.getName(), od, 0);
            this.additionalOutputSpecs.add(outputSpec);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nullable
    public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> getAdditionalInputs() {
        this.readLock.lock();
        try {
            Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> map = this.rootInputDescriptors;
            return map;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    @Nullable
    public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> getAdditionalOutputs() {
        return this.additionalOutputs;
    }

    @Override
    public int compareTo(Vertex other) {
        return this.vertexId.compareTo((TezID)other.getVertexId());
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (this.getClass() != obj.getClass()) {
            return false;
        }
        Vertex other = (Vertex)obj;
        return this.vertexId.equals((Object)other.getVertexId());
    }

    public int hashCode() {
        int prime = 11239;
        return 11239 + 11239 * this.vertexId.hashCode();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<Vertex, Edge> getInputVertices() {
        this.readLock.lock();
        try {
            Map<Vertex, Edge> map = Collections.unmodifiableMap(this.sourceVertices);
            return map;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public Map<Vertex, Edge> getOutputVertices() {
        return Collections.unmodifiableMap(this.targetVertices);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VertexStatistics getStatistics() {
        this.readLock.lock();
        try {
            if (this.inTerminalState()) {
                Preconditions.checkState((this.finalStatistics != null ? 1 : 0) != 0);
                VertexStatisticsImpl vertexStatisticsImpl = this.finalStatistics;
                return vertexStatisticsImpl;
            }
            VertexStatisticsImpl vertexStatisticsImpl = this.constructStatistics();
            return vertexStatisticsImpl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public int getInputVerticesCount() {
        return this.sourceVertices.size();
    }

    @Override
    public int getOutputVerticesCount() {
        return this.targetVertices.size();
    }

    @Override
    public ProcessorDescriptor getProcessorDescriptor() {
        return this.processorDescriptor;
    }

    @Override
    public DAG getDAG() {
        return this.dag;
    }

    private TezDAGID getDAGId() {
        return this.getDAG().getID();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Resource getTaskResource() {
        this.readLock.lock();
        try {
            Resource resource = this.taskResource;
            return resource;
        }
        finally {
            this.readLock.unlock();
        }
    }

    void addIO(String name) {
        this.ioIndices.put(StringInterner.weakIntern((String)name), this.ioIndices.size());
    }

    @VisibleForTesting
    String getProcessorName() {
        return this.processorDescriptor.getClassName();
    }

    @VisibleForTesting
    String getJavaOpts() {
        return this.javaOpts;
    }

    @VisibleForTesting
    TaskLocationHint[] getTaskLocationHints() {
        return this.taskLocationHints;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException {
        this.readLock.lock();
        try {
            ArrayList<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount() + (this.rootInputDescriptors == null ? 0 : this.rootInputDescriptors.size()));
            if (this.rootInputDescriptors != null) {
                for (Map.Entry entry : this.rootInputDescriptors.entrySet()) {
                    inputSpecList.add(new InputSpec((String)entry.getKey(), (InputDescriptor)((RootInputLeafOutput)entry.getValue()).getIODescriptor(), this.rootInputSpecs.get(entry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
                }
            }
            for (Vertex vertex : this.getInputVertices().keySet()) {
                InputSpec inputSpec = ((VertexImpl)vertex).getDestinationSpecFor(this, taskIndex);
                inputSpecList.add(inputSpec);
            }
            ArrayList<InputSpec> arrayList = inputSpecList;
            return arrayList;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException {
        this.readLock.lock();
        try {
            ArrayList<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount() + this.additionalOutputSpecs.size());
            outputSpecList.addAll(this.additionalOutputSpecs);
            for (Vertex vertex : this.targetVertices.keySet()) {
                OutputSpec outputSpec = ((VertexImpl)vertex).getSourceSpecFor(this, taskIndex);
                outputSpecList.add(outputSpec);
            }
            ArrayList<OutputSpec> arrayList = outputSpecList;
            return arrayList;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws AMUserCodeException {
        this.readLock.lock();
        try {
            Edge edge = this.sourceVertices.get(vertex);
            Preconditions.checkState((edge != null ? 1 : 0) != 0, (Object)this.getLogIdentifier());
            OutputSpec outputSpec = edge.getSourceSpec(taskIndex);
            return outputSpec;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private InputSpec getDestinationSpecFor(VertexImpl vertex, int taskIndex) throws AMUserCodeException {
        this.readLock.lock();
        try {
            Edge edge = this.targetVertices.get(vertex);
            Preconditions.checkState((edge != null ? 1 : 0) != 0, (Object)this.getLogIdentifier());
            InputSpec inputSpec = edge.getDestinationSpec(taskIndex);
            return inputSpec;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
        this.readLock.lock();
        try {
            List<GroupInputSpec> list = this.groupInputSpecList;
            return list;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public synchronized void addSharedOutputs(Set<String> outputs) {
        this.sharedOutputs.addAll(outputs);
    }

    @Override
    public synchronized Set<String> getSharedOutputs() {
        return this.sharedOutputs;
    }

    @VisibleForTesting
    VertexManager getVertexManager() {
        return this.vertexManager;
    }

    private static void logLocationHints(String vertexName, VertexLocationHint locationHint) {
        if (locationHint == null) {
            LOG.debug("No Vertex LocationHint specified for vertex=" + vertexName);
            return;
        }
        HashMultiset hosts = HashMultiset.create();
        HashMultiset racks = HashMultiset.create();
        int counter = 0;
        for (TaskLocationHint taskLocationHint : locationHint.getTaskLocationHints()) {
            StringBuilder sb = new StringBuilder();
            if (taskLocationHint.getHosts() == null) {
                sb.append("No Hosts");
            } else {
                sb.append("Hosts: ");
                for (String host : taskLocationHint.getHosts()) {
                    hosts.add((Object)host);
                    sb.append(host).append(", ");
                }
            }
            if (taskLocationHint.getRacks() == null) {
                sb.append("No Racks");
            } else {
                sb.append("Racks: ");
                for (String rack : taskLocationHint.getRacks()) {
                    racks.add((Object)rack);
                    sb.append(rack).append(", ");
                }
            }
            LOG.debug("Vertex: " + vertexName + ", Location: " + counter + " : " + sb.toString());
            ++counter;
        }
        LOG.debug("Vertex: " + vertexName + ", Host Counts");
        for (Multiset.Entry host : hosts.entrySet()) {
            LOG.debug("Vertex: " + vertexName + ", host: " + host.toString());
        }
        LOG.debug("Vertex: " + vertexName + ", Rack Counts");
        for (Multiset.Entry rack : racks.entrySet()) {
            LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString());
        }
    }

    private static class VertexCommitCallback
    implements FutureCallback<Void> {
        private String outputName;
        private VertexImpl vertex;

        public VertexCommitCallback(VertexImpl vertex, String outputName) {
            this.vertex = vertex;
            this.outputName = outputName;
        }

        public void onSuccess(Void result) {
            this.vertex.getEventHandler().handle((Event)new VertexEventCommitCompleted(this.vertex.vertexId, this.outputName, true, null));
        }

        public void onFailure(Throwable t) {
            this.vertex.getEventHandler().handle((Event)new VertexEventCommitCompleted(this.vertex.vertexId, this.outputName, false, t));
        }
    }

    private static class VertexStateChangedCallback
    implements OnStateChangedCallback<VertexState, VertexImpl> {
        private VertexStateChangedCallback() {
        }

        @Override
        public void onStateChanged(VertexImpl vertex, VertexState vertexState) {
            vertex.stateChangeNotifier.stateChanged(vertex.getVertexId(), new VertexStateUpdate(vertex.getName(), this.convertInternalState(vertexState, vertex.getVertexId())));
        }

        private org.apache.tez.dag.api.event.VertexState convertInternalState(VertexState vertexState, TezVertexID vertexId) {
            switch (vertexState) {
                case RUNNING: {
                    return org.apache.tez.dag.api.event.VertexState.RUNNING;
                }
                case SUCCEEDED: {
                    return org.apache.tez.dag.api.event.VertexState.SUCCEEDED;
                }
                case FAILED: {
                    return org.apache.tez.dag.api.event.VertexState.FAILED;
                }
                case KILLED: {
                    return org.apache.tez.dag.api.event.VertexState.KILLED;
                }
            }
            throw new TezUncheckedException("Not expecting state updates for state: " + (Object)((Object)vertexState) + ", VertexID: " + vertexId);
        }
    }

    private static class InternalErrorTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private InternalErrorTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            LOG.error("Invalid event " + event.getType() + " on Vertex " + vertex.getLogIdentifier());
            vertex.eventHandler.handle((Event)new DAGEventDiagnosticsUpdate(vertex.getDAGId(), "Invalid event " + event.getType() + " on Vertex " + vertex.getLogIdentifier()));
            vertex.setFinishTime();
            vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
            vertex.cancelCommits();
            vertex.finished(VertexState.ERROR);
        }
    }

    private static class RouteEventTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private RouteEventTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventRouteEvent rEvent = (VertexEventRouteEvent)event;
            boolean recovered = rEvent.isRecovered();
            List<TezEvent> tezEvents = rEvent.getEvents();
            try {
                vertex.handleRoutedTezEvents(tezEvents, recovered, false);
            }
            catch (AMUserCodeException e) {
                String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + vertex.getLogIdentifier();
                LOG.error(msg, (Throwable)((Object)e));
                if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.COMMITTING) {
                    vertex.addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                    vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
                    vertex.cancelCommits();
                    return VertexState.TERMINATING;
                }
                vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                return VertexState.FAILED;
            }
            return vertex.getState();
        }
    }

    private static class CommitCompletedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private CommitCompletedTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            vertex.commitCompleted((VertexEventCommitCompleted)event);
            return VertexImpl.checkCommitsForCompletion(vertex);
        }
    }

    private static class TaskRescheduledAfterVertexSuccessTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private TaskRescheduledAfterVertexSuccessTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            if (vertex.outputCommitters == null || vertex.outputCommitters.isEmpty() || !vertex.commitVertexOutputs) {
                LOG.info(vertex.getLogIdentifier() + " back to running due to rescheduling " + ((VertexEventTaskReschedule)event).getTaskID());
                new TaskRescheduledTransition().transition(vertex, event);
                vertex.eventHandler.handle((Event)new DAGEventVertexReRunning(vertex.getVertexId()));
                vertex.finalStatistics = null;
                return VertexState.RUNNING;
            }
            String diagnosticMsg = vertex.getLogIdentifier() + " failed due to post-commit rescheduling of " + ((VertexEventTaskReschedule)event).getTaskID();
            LOG.info(diagnosticMsg);
            vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OWN_TASK_FAILURE);
            vertex.finished(VertexState.FAILED, VertexTerminationCause.OWN_TASK_FAILURE, diagnosticMsg);
            return VertexState.FAILED;
        }
    }

    private static class TaskRescheduledWhileCommittingTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private TaskRescheduledWhileCommittingTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            String diagnosticMsg = vertex.getLogIdentifier() + " failed due to in-committing rescheduling of " + ((VertexEventTaskReschedule)event).getTaskID();
            LOG.info(diagnosticMsg);
            vertex.addDiagnostic(diagnosticMsg);
            vertex.tryEnactKill(VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING, TaskTerminationCause.TASK_RESCHEDULE_IN_COMMITTING);
            vertex.cancelCommits();
        }
    }

    private static class TaskCompletedAfterVertexSuccessTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private TaskCompletedAfterVertexSuccessTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            String diagnosticMsg;
            VertexState finalState;
            VertexEventTaskCompleted vEvent = (VertexEventTaskCompleted)event;
            if (vEvent.getState() == TaskState.FAILED) {
                finalState = VertexState.FAILED;
                diagnosticMsg = "Vertex " + vertex.logIdentifier + " failed as task " + vEvent.getTaskID() + " failed after vertex succeeded.";
            } else {
                finalState = VertexState.ERROR;
                diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() + " completed with state " + (Object)((Object)vEvent.getState()) + " after vertex succeeded.";
            }
            LOG.info(diagnosticMsg);
            vertex.finished(finalState, VertexTerminationCause.OWN_TASK_FAILURE, diagnosticMsg);
            return finalState;
        }
    }

    private static class VertexNoTasksCompletedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private VertexNoTasksCompletedTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            return VertexImpl.checkTasksForCompletion(vertex);
        }
    }

    private static class TaskRescheduledTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private TaskRescheduledTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            --vertex.completedTaskCount;
            --vertex.succeededTaskCount;
            vertex.resetCompletedTaskStatsCache(true);
        }
    }

    private static class TaskCompletedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private TaskCompletedTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexState state;
            if (vertex.completedTasksStatsCache == null) {
                vertex.resetCompletedTaskStatsCache(false);
            }
            boolean forceTransitionToKillWait = false;
            ++vertex.completedTaskCount;
            LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : " + vertex.completedTaskCount);
            VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted)event;
            Task task = vertex.tasks.get(taskEvent.getTaskID());
            if (taskEvent.getState() == TaskState.SUCCEEDED) {
                this.taskSucceeded(vertex, task);
                if (!vertex.completedTasksStatsCache.containsTask(task.getTaskId())) {
                    vertex.completedTasksStatsCache.addTask(task.getTaskId());
                    vertex.completedTasksStatsCache.mergeFrom(((TaskImpl)task).getStatistics());
                }
            } else if (taskEvent.getState() == TaskState.FAILED) {
                LOG.info("Failing vertex: " + vertex.logIdentifier + " because task failed: " + taskEvent.getTaskID());
                vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE);
                forceTransitionToKillWait = true;
                this.taskFailed(vertex, task);
            } else if (taskEvent.getState() == TaskState.KILLED) {
                this.taskKilled(vertex, task);
            }
            if ((state = VertexImpl.checkTasksForCompletion(vertex)) == VertexState.RUNNING && forceTransitionToKillWait) {
                return VertexState.TERMINATING;
            }
            return state;
        }

        private void taskSucceeded(VertexImpl vertex, Task task) {
            ++vertex.succeededTaskCount;
        }

        private void taskFailed(VertexImpl vertex, Task task) {
            ++vertex.failedTaskCount;
            vertex.addDiagnostic("Task failed, taskId=" + task.getTaskId() + ", diagnostics=" + task.getDiagnostics());
        }

        private void taskKilled(VertexImpl vertex, Task task) {
            ++vertex.killedTaskCount;
        }
    }

    private static class TaskAttemptCompletedEventTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private TaskAttemptCompletedEventTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            VertexEventTaskAttemptCompleted completionEvent = (VertexEventTaskAttemptCompleted)event;
            if (vertex.targetVertices != null) {
                for (Vertex targetVertex : vertex.targetVertices.keySet()) {
                    vertex.eventHandler.handle((Event)new VertexEventSourceTaskAttemptCompleted(targetVertex.getVertexId(), completionEvent));
                }
            }
        }
    }

    private static class SourceTaskAttemptCompletedEventTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private SourceTaskAttemptCompletedEventTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventTaskAttemptCompleted completionEvent = ((VertexEventSourceTaskAttemptCompleted)event).getCompletionEvent();
            LOG.info("Source task attempt completed for vertex: " + vertex.getLogIdentifier() + " attempt: " + completionEvent.getTaskAttemptId() + " with state: " + (Object)((Object)completionEvent.getTaskAttemptState()) + " vertexState: " + (Object)((Object)vertex.getState()));
            if (TaskAttemptStateInternal.SUCCEEDED.equals((Object)completionEvent.getTaskAttemptState())) {
                ++vertex.numSuccessSourceAttemptCompletions;
                if (vertex.getState() == VertexState.RUNNING) {
                    try {
                        TezTaskAttemptID taId = completionEvent.getTaskAttemptId();
                        vertex.vertexManager.onSourceTaskCompleted(VertexImpl.getTaskAttemptIdentifier(vertex.dag.getName(), vertex.dag.getVertex(taId.getTaskID().getVertexID()).getName(), taId));
                    }
                    catch (AMUserCodeException e) {
                        String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                        LOG.error(msg, (Throwable)((Object)e));
                        vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                        vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
                        return VertexState.TERMINATING;
                    }
                } else {
                    vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId());
                }
            }
            return vertex.getState();
        }
    }

    private static class VertexManagerUserCodeErrorTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private VertexManagerUserCodeErrorTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventManagerUserCodeError errEvent = (VertexEventManagerUserCodeError)event;
            AMUserCodeException e = errEvent.getError();
            String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
            LOG.error(msg, (Throwable)((Object)e));
            if (vertex.getState() == VertexState.RECOVERING) {
                LOG.info("Received a user code error during recovering, setting recovered state to FAILED");
                vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                vertex.trySetTerminationCause(VertexTerminationCause.AM_USERCODE_FAILURE);
                vertex.recoveredState = VertexState.FAILED;
                return VertexState.RECOVERING;
            }
            if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.COMMITTING) {
                vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
                vertex.cancelCommits();
                return VertexState.TERMINATING;
            }
            vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
            return VertexState.FAILED;
        }
    }

    private static class VertexKilledWhileCommittingTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private VertexKilledWhileCommittingTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            VertexEventTermination vet = (VertexEventTermination)event;
            VertexTerminationCause trigger = vet.getTerminationCause();
            String msg = "Vertex received Kill while in COMMITTING state, terminationCause=" + (Object)((Object)trigger) + ", vertex=" + vertex.logIdentifier;
            LOG.info(msg);
            vertex.addDiagnostic(msg);
            vertex.trySetTerminationCause(trigger);
            vertex.cancelCommits();
        }
    }

    private static class VertexKilledTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private VertexKilledTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            vertex.addDiagnostic("Vertex received Kill while in RUNNING state.");
            VertexEventTermination vet = (VertexEventTermination)event;
            VertexTerminationCause trigger = vet.getTerminationCause();
            switch (trigger) {
                case DAG_KILL: {
                    vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL);
                    break;
                }
                case OWN_TASK_FAILURE: {
                    vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE);
                    break;
                }
                case ROOT_INPUT_INIT_FAILURE: 
                case COMMIT_FAILURE: 
                case INVALID_NUM_OF_TASKS: 
                case INIT_FAILURE: 
                case INTERNAL_ERROR: 
                case AM_USERCODE_FAILURE: 
                case VERTEX_RERUN_IN_COMMITTING: 
                case VERTEX_RERUN_AFTER_COMMIT: 
                case OTHER_VERTEX_FAILURE: {
                    vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE);
                    break;
                }
                default: {
                    throw new TezUncheckedException("VertexKilledTransition: event.terminationCause is unexpected: " + (Object)((Object)trigger));
                }
            }
        }
    }

    private static class TerminateInitingVertexTransition
    extends TerminateInitedVertexTransition {
        private TerminateInitingVertexTransition() {
        }

        @Override
        public void transition(VertexImpl vertex, VertexEvent event) {
            super.transition(vertex, event);
        }
    }

    private static class TerminateInitedVertexTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private TerminateInitedVertexTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            VertexEventTermination vet = (VertexEventTermination)event;
            vertex.trySetTerminationCause(vet.getTerminationCause());
            vertex.addDiagnostic("Vertex received Kill in INITED state.");
            vertex.finished(VertexState.KILLED);
        }
    }

    private static class TerminateNewVertexTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        private TerminateNewVertexTransition() {
        }

        public void transition(VertexImpl vertex, VertexEvent event) {
            VertexEventTermination vet = (VertexEventTermination)event;
            vertex.trySetTerminationCause(vet.getTerminationCause());
            vertex.setFinishTime();
            vertex.addDiagnostic("Vertex received Kill in NEW state.");
            vertex.finished(VertexState.KILLED);
        }
    }

    private static class RootInputInitFailedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        private RootInputInitFailedTransition() {
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventRootInputFailed fe = (VertexEventRootInputFailed)event;
            String msg = "Vertex Input: " + fe.getInputName() + " initializer failed, vertex=" + vertex.getLogIdentifier();
            LOG.error(msg, (Throwable)((Object)fe.getError()));
            if (vertex.getState() == VertexState.RUNNING) {
                vertex.addDiagnostic(msg + ", " + ExceptionUtils.getStackTrace((Throwable)fe.getError().getCause()));
                vertex.tryEnactKill(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
                return VertexState.TERMINATING;
            }
            vertex.finished(VertexState.FAILED, VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, msg + ", " + ExceptionUtils.getStackTrace((Throwable)fe.getError().getCause()));
            return VertexState.FAILED;
        }
    }

    public static class StartTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            Preconditions.checkState((vertex.getState() == VertexState.INITED ? 1 : 0) != 0, (Object)("Unexpected state " + (Object)((Object)vertex.getState()) + " for " + vertex.logIdentifier));
            if (!vertex.startSignalPending) {
                vertex.startTimeRequested = vertex.clock.getTime();
            }
            return vertex.startVertex();
        }
    }

    public static class StartWhileInitializingTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        public void transition(VertexImpl vertex, VertexEvent event) {
            Preconditions.checkState((vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ? 1 : 0) != 0, (Object)("Vertex: " + vertex.logIdentifier + " got invalid start event"));
            vertex.startSignalPending = true;
            vertex.startTimeRequested = vertex.clock.getTime();
        }
    }

    public static class SourceVertexStartedTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        public void transition(VertexImpl vertex, VertexEvent event) {
            VertexEventSourceVertexStarted startEvent = (VertexEventSourceVertexStarted)event;
            int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1;
            if (vertex.distanceFromRoot < distanceFromRoot) {
                vertex.distanceFromRoot = distanceFromRoot;
            }
            ++vertex.numStartedSourceVertices;
            vertex.startTimeRequested = vertex.clock.getTime();
            LOG.info("Source vertex started: " + startEvent.getSourceVertexId() + " for vertex: " + vertex.logIdentifier + " numStartedSources: " + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
            if (vertex.numStartedSourceVertices < vertex.sourceVertices.size()) {
                LOG.info("Cannot start vertex: " + vertex.logIdentifier + " numStartedSources: " + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
                return;
            }
            vertex.startSignalPending = true;
            if (vertex.getState() != VertexState.INITED) {
                LOG.info("Cannot start vertex. Not in inited state. " + vertex.logIdentifier + " . VertesState: " + (Object)((Object)vertex.getState()) + " numTasks: " + vertex.numTasks + " Num uninitialized edges: " + vertex.uninitializedEdges.size());
                return;
            }
            Preconditions.checkState((vertex.numTasks >= 0 && vertex.uninitializedEdges.isEmpty() ? 1 : 0) != 0, (Object)("Cannot start vertex that is not completely defined. Vertex: " + vertex.logIdentifier + " numTasks: " + vertex.numTasks));
            vertex.startIfPossible();
        }
    }

    public static class InputDataInformationTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            ++vertex.numInitializerCompletionsHandled;
            VertexEventInputDataInformation iEvent = (VertexEventInputDataInformation)event;
            List<TezEvent> inputInfoEvents = iEvent.getEvents();
            try {
                if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
                    vertex.handleRoutedTezEvents(inputInfoEvents, false, false);
                }
            }
            catch (AMUserCodeException e) {
                String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                LOG.error(msg, (Throwable)((Object)e));
                vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                return VertexState.FAILED;
            }
            if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers) {
                if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size() && vertex.numInitializerCompletionsHandled == vertex.inputsWithInitializers.size()) {
                    vertex.initWaitsForRootInitializers = false;
                }
                if (vertex.canInitVertex()) {
                    Preconditions.checkState((vertex.numTasks >= 0 ? 1 : 0) != 0, (Object)("Parallelism should have been set by now for vertex: " + vertex.logIdentifier));
                    return VertexInitializedTransition.doTransition(vertex);
                }
            }
            return vertex.getState();
        }
    }

    public static class RootInputInitializedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized)event;
            VertexState state = vertex.getState();
            if (state == VertexState.INITIALIZING) {
                try {
                    vertex.vertexManager.onRootVertexInitialized(liInitEvent.getInputName(), (InputDescriptor)vertex.getAdditionalInputs().get(liInitEvent.getInputName()).getIODescriptor(), liInitEvent.getEvents());
                }
                catch (AMUserCodeException e) {
                    String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                    LOG.error(msg, (Throwable)((Object)e));
                    vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                    return VertexState.FAILED;
                }
            }
            vertex.numInitializedInputs++;
            if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
                vertex.rootInputInitializerManager.shutdown();
                vertex.rootInputInitializerManager = null;
            }
            return vertex.getState();
        }
    }

    public static class VertexInitializedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        static VertexState doTransition(VertexImpl vertex) {
            Preconditions.checkState((boolean)vertex.canInitVertex(), (Object)("Vertex: " + vertex.logIdentifier));
            boolean isInitialized = vertex.initializeVertexInInitializingState();
            if (!isInitialized) {
                return VertexState.FAILED;
            }
            vertex.startIfPossible();
            return VertexState.INITED;
        }

        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            return VertexInitializedTransition.doTransition(vertex);
        }
    }

    public static class InitTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            VertexState vertexState = VertexState.NEW;
            ++vertex.numInitedSourceVertices;
            if ((vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() || vertex.numInitedSourceVertices == vertex.sourceVertices.size() || vertex.numInitedSourceVertices == vertex.sourceVertices.size() + 1) && (vertexState = this.handleInitEvent(vertex, event)) != VertexState.FAILED && vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
                for (Vertex target : vertex.targetVertices.keySet()) {
                    vertex.getEventHandler().handle((Event)new VertexEvent(target.getVertexId(), VertexEventType.V_INIT));
                }
            }
            return vertexState;
        }

        private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
            VertexState state = vertex.setupVertex();
            if (state.equals((Object)VertexState.FAILED)) {
                return state;
            }
            if (vertex.targetVertices != null) {
                for (Edge e : vertex.targetVertices.values()) {
                    if (e.getEdgeManager() != null) continue;
                    Preconditions.checkState((e.getEdgeProperty().getDataMovementType() == EdgeProperty.DataMovementType.CUSTOM ? 1 : 0) != 0, (Object)("Null edge manager allowed only for custom edge. " + vertex.logIdentifier));
                    vertex.uninitializedEdges.add(e);
                }
            }
            if (vertex.sourceVertices != null) {
                for (Edge e : vertex.sourceVertices.values()) {
                    if (e.getEdgeManager() != null) continue;
                    Preconditions.checkState((e.getEdgeProperty().getDataMovementType() == EdgeProperty.DataMovementType.CUSTOM ? 1 : 0) != 0, (Object)("Null edge manager allowed only for custom edge. " + vertex.logIdentifier));
                    vertex.uninitializedEdges.add(e);
                }
            }
            if (vertex.numTasks == -1) {
                LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split to set #tasks for the vertex " + vertex.getLogIdentifier());
                if (vertex.inputsWithInitializers != null) {
                    LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
                    vertex.setupInputInitializerManager();
                    return VertexState.INITIALIZING;
                }
                boolean hasOneToOneUninitedSource = false;
                for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) {
                    if (entry.getValue().getEdgeProperty().getDataMovementType() != EdgeProperty.DataMovementType.ONE_TO_ONE || entry.getKey().getTotalTasks() != -1) continue;
                    hasOneToOneUninitedSource = true;
                    break;
                }
                if (hasOneToOneUninitedSource) {
                    LOG.info("Vertex will initialize from 1-1 sources. " + vertex.logIdentifier);
                    return VertexState.INITIALIZING;
                }
                if (vertex.vertexPlan.hasVertexManagerPlugin()) {
                    LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier);
                    return VertexState.INITIALIZING;
                }
                throw new TezUncheckedException(vertex.getLogIdentifier() + " has -1 tasks but does not have input initializers, " + "1-1 uninited sources or custom vertex manager to set it at runtime");
            }
            LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + vertex.logIdentifier);
            vertex.createTasks();
            if (vertex.inputsWithInitializers != null) {
                LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
                vertex.setupInputInitializerManager();
                return VertexState.INITIALIZING;
            }
            if (!vertex.uninitializedEdges.isEmpty()) {
                LOG.info("Vertex has uninitialized edges. " + vertex.logIdentifier);
                return VertexState.INITIALIZING;
            }
            LOG.info("Directly initializing vertex: " + vertex.logIdentifier);
            vertex.maybeSendConfiguredEvent();
            boolean isInitialized = vertex.initializeVertex();
            if (isInitialized) {
                return VertexState.INITED;
            }
            return VertexState.FAILED;
        }
    }

    public static class IgnoreInitInInitedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent event) {
            LOG.info("Received event during INITED state, vertex=" + vertex.logIdentifier + ", eventType=" + event.getType());
            if (!vertex.vertexAlreadyInitialized) {
                LOG.error("Vertex not initialized but in INITED state, vertexId=" + vertex.logIdentifier);
                return vertex.finished(VertexState.ERROR);
            }
            return VertexState.INITED;
        }
    }

    public static class RecoverTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
            VertexEventSourceVertexRecovered sourceRecoveredEvent = (VertexEventSourceVertexRecovered)vertexEvent;
            int distanceFromRoot = sourceRecoveredEvent.getSourceDistanceFromRoot() + 1;
            if (vertex.distanceFromRoot < distanceFromRoot) {
                vertex.distanceFromRoot = distanceFromRoot;
            }
            ++vertex.numRecoveredSourceVertices;
            switch (sourceRecoveredEvent.getSourceVertexState()) {
                case NEW: {
                    break;
                }
                case INITED: {
                    ++vertex.numInitedSourceVertices;
                    break;
                }
                case SUCCEEDED: 
                case RUNNING: {
                    ++vertex.numInitedSourceVertices;
                    ++vertex.numStartedSourceVertices;
                    if (sourceRecoveredEvent.getCompletedTaskAttempts() == null) break;
                    vertex.pendingReportedSrcCompletions.addAll(sourceRecoveredEvent.getCompletedTaskAttempts());
                    break;
                }
                case ERROR: 
                case KILLED: 
                case FAILED: {
                    break;
                }
                default: {
                    LOG.warn("Received invalid SourceVertexRecovered event, vertex=" + vertex.logIdentifier + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID() + ", sourceVertexState=" + (Object)((Object)sourceRecoveredEvent.getSourceVertexState()));
                    return vertex.finished(VertexState.ERROR);
                }
            }
            if (vertex.numRecoveredSourceVertices != vertex.getInputVerticesCount()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Waiting for source vertices to recover, vertex=" + vertex.logIdentifier + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices + ", totalSourceVertices=" + vertex.getInputVerticesCount());
                }
                return VertexState.RECOVERING;
            }
            VertexState endState = VertexState.NEW;
            LinkedList completedTaskAttempts = Lists.newLinkedList();
            switch (vertex.recoveredState) {
                case NEW: {
                    Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
                    while (iterator.hasNext()) {
                        if (!iterator.next().getEventType().equals((Object)EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) continue;
                        iterator.remove();
                    }
                    if (vertex.numInitedSourceVertices == vertex.getInputVerticesCount()) {
                        vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_INIT));
                    }
                    if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
                        vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_START));
                    }
                    endState = VertexState.NEW;
                    break;
                }
                case INITED: {
                    boolean successSetParallelism;
                    String msg2;
                    vertex.vertexAlreadyInitialized = true;
                    try {
                        vertex.initializeCommitters();
                    }
                    catch (Exception e) {
                        msg2 = "Failed to initialize committers, vertex=" + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace((Throwable)e);
                        LOG.error(msg2);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg2);
                        endState = VertexState.FAILED;
                        break;
                    }
                    try {
                        vertex.setParallelism(0, null, vertex.recoveredSourceEdgeProperties, vertex.recoveredRootInputSpecUpdates, true, false);
                        successSetParallelism = true;
                    }
                    catch (Exception e) {
                        successSetParallelism = false;
                    }
                    if (!successSetParallelism) {
                        msg2 = "Failed to recover edge managers, vertex=" + vertex.logIdentifier;
                        LOG.error(msg2);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg2);
                        endState = VertexState.FAILED;
                        break;
                    }
                    if (vertex.tasks != null) {
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId()));
                        }
                    }
                    if (vertex.numInitedSourceVertices != vertex.getInputVerticesCount()) {
                        LOG.info("Vertex already initialized but source vertices have not initialized, vertexId=" + vertex.logIdentifier + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices);
                    } else if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
                        vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_START));
                    }
                    endState = VertexState.INITED;
                    break;
                }
                case RUNNING: {
                    boolean successSetParallelism;
                    String msg;
                    String msg2;
                    if (vertex.recoveryCommitInProgress) {
                        LOG.info("Recovered vertex was in the middle of a commit, failing Vertex=" + vertex.logIdentifier);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.COMMIT_FAILURE, null);
                        endState = VertexState.FAILED;
                        break;
                    }
                    try {
                        vertex.initializeCommitters();
                    }
                    catch (Exception e) {
                        msg = "Failed to initialize committers, vertex=" + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace((Throwable)e);
                        LOG.error(msg);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
                        endState = VertexState.FAILED;
                        break;
                    }
                    try {
                        vertex.setParallelism(vertex.numTasks, null, vertex.recoveredSourceEdgeProperties, vertex.recoveredRootInputSpecUpdates, true, false);
                        successSetParallelism = true;
                    }
                    catch (Exception e) {
                        successSetParallelism = false;
                    }
                    if (!successSetParallelism) {
                        msg2 = "Failed to recover edge managers for vertex:" + vertex.logIdentifier;
                        LOG.error(msg2);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg2);
                        endState = VertexState.FAILED;
                        break;
                    }
                    assert (vertex.tasks.size() == vertex.numTasks);
                    if (vertex.tasks != null && vertex.numTasks != 0) {
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId()));
                        }
                        try {
                            vertex.recoveryCodeSimulatingStart();
                            vertex.unsetTasksNotYetScheduled();
                            endState = VertexState.RUNNING;
                        }
                        catch (AMUserCodeException e) {
                            msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + vertex.getLogIdentifier();
                            LOG.error(msg, (Throwable)((Object)e));
                            vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                            endState = VertexState.FAILED;
                        }
                        break;
                    }
                    endState = VertexState.SUCCEEDED;
                    vertex.finished(endState);
                    break;
                }
                case SUCCEEDED: {
                    TaskState taskState;
                    assert (vertex.tasks.size() == vertex.numTasks);
                    if (vertex.tasks != null && vertex.numTasks != 0) {
                        taskState = TaskState.SUCCEEDED;
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId(), taskState));
                        }
                        try {
                            vertex.recoveryCodeSimulatingStart();
                            vertex.unsetTasksNotYetScheduled();
                            endState = VertexState.RUNNING;
                        }
                        catch (AMUserCodeException e) {
                            String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                            LOG.error(msg, (Throwable)((Object)e));
                            vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                            endState = VertexState.FAILED;
                        }
                        break;
                    }
                    endState = vertex.recoveredState;
                    vertex.finished(endState);
                    break;
                }
                case KILLED: 
                case FAILED: {
                    TaskState taskState;
                    if (vertex.tasks != null && vertex.numTasks != 0) {
                        taskState = TaskState.FAILED;
                        if (vertex.recoveredState == VertexState.KILLED) {
                            taskState = TaskState.KILLED;
                        }
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId(), taskState, false));
                        }
                    }
                    endState = vertex.recoveredState;
                    vertex.finished(endState);
                    break;
                }
                default: {
                    LOG.warn("Invalid recoveredState found when trying to recover vertex, recoveredState=" + (Object)((Object)vertex.recoveredState));
                    vertex.finished(VertexState.ERROR);
                    endState = VertexState.ERROR;
                }
            }
            LOG.info("Recovered Vertex State, vertexId=" + vertex.logIdentifier + ", state=" + (Object)((Object)endState) + ", numInitedSourceVertices" + vertex.numInitedSourceVertices + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices + ", tasksIsNull=" + (vertex.tasks == null) + ", numTasks=" + (vertex.tasks == null ? 0 : vertex.tasks.size()));
            for (Map.Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
                vertex.eventHandler.handle((Event)new VertexEventSourceVertexRecovered(entry.getKey().getVertexId(), vertex.vertexId, endState, completedTaskAttempts, vertex.getDistanceFromRoot()));
            }
            if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED).contains((Object)endState)) {
                vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
                vertex.recoveredEvents.clear();
                if (!vertex.pendingRouteEvents.isEmpty()) {
                    try {
                        vertex.handleRoutedTezEvents(vertex.pendingRouteEvents, false, true);
                        vertex.pendingRouteEvents.clear();
                    }
                    catch (AMUserCodeException e) {
                        String msg = "Exception in " + (Object)((Object)e.getSource()) + ", vertex=" + vertex.getLogIdentifier();
                        LOG.error(msg, (Throwable)((Object)e));
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                        endState = VertexState.FAILED;
                    }
                }
            } else if (!vertex.recoveredEvents.isEmpty()) {
                throw new RuntimeException("Invalid Vertex state, found non-zero recovered events in invalid state, recoveredState=" + (Object)((Object)endState) + ", recoveredEvents=" + vertex.recoveredEvents.size());
            }
            return endState;
        }
    }

    public static class BufferDataRecoverTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
            LOG.info("Received upstream event while still recovering, vertexId=" + vertex.logIdentifier + ", vertexEventType=" + vertexEvent.getType());
            if (((VertexEventType)vertexEvent.getType()).equals((Object)VertexEventType.V_ROUTE_EVENT)) {
                VertexEventRouteEvent evt = (VertexEventRouteEvent)vertexEvent;
                vertex.pendingRouteEvents.addAll(evt.getEvents());
            } else if (((VertexEventType)vertexEvent.getType()).equals((Object)VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)) {
                VertexEventSourceTaskAttemptCompleted evt = (VertexEventSourceTaskAttemptCompleted)vertexEvent;
                vertex.pendingReportedSrcCompletions.add(evt.getCompletionEvent().getTaskAttemptId());
            } else if (((VertexEventType)vertexEvent.getType()).equals((Object)VertexEventType.V_SOURCE_VERTEX_STARTED)) {
                VertexEventSourceVertexStarted startEvent = (VertexEventSourceVertexStarted)vertexEvent;
                int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1;
                if (vertex.distanceFromRoot < distanceFromRoot) {
                    vertex.distanceFromRoot = distanceFromRoot;
                }
                ++vertex.numStartedSourceVertices;
            } else if (((VertexEventType)vertexEvent.getType()).equals((Object)VertexEventType.V_INIT)) {
                ++vertex.numInitedSourceVertices;
            }
        }
    }

    public static class NullEdgeInitializedTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
            VertexEventNullEdgeInitialized event = (VertexEventNullEdgeInitialized)vertexEvent;
            Edge edge = event.getEdge();
            Vertex otherVertex = event.getVertex();
            Preconditions.checkState((vertex.getState() == VertexState.NEW || vertex.getState() == VertexState.INITIALIZING ? 1 : 0) != 0, (Object)("Unexpected state " + (Object)((Object)vertex.getState()) + " for vertex: " + vertex.logIdentifier));
            Preconditions.checkState((vertex.sourceVertices == null || vertex.sourceVertices.containsKey(otherVertex) || vertex.targetVertices == null || vertex.targetVertices.containsKey(otherVertex) ? 1 : 0) != 0, (Object)("Not connected to vertex " + otherVertex.getLogIdentifier() + " from vertex: " + vertex.logIdentifier));
            LOG.info("Edge initialized for connection to vertex " + otherVertex.getLogIdentifier() + " at vertex : " + vertex.logIdentifier);
            vertex.uninitializedEdges.remove(edge);
            if (vertex.getState() == VertexState.INITIALIZING && vertex.canInitVertex()) {
                return VertexInitializedTransition.doTransition(vertex);
            }
            return vertex.getState();
        }
    }

    public static class TerminateDuringRecoverTransition
    implements SingleArcTransition<VertexImpl, VertexEvent> {
        public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
            LOG.info("Received a terminate during recovering, setting recovered state to KILLED");
            vertex.recoveredState = VertexState.KILLED;
        }
    }

    public static class StartRecoverTransition
    implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
        public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
            VertexState endState;
            VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex)vertexEvent;
            VertexState desiredState = recoverEvent.getDesiredState();
            switch (desiredState) {
                case RUNNING: {
                    break;
                }
                case ERROR: 
                case KILLED: 
                case FAILED: 
                case SUCCEEDED: {
                    if (desiredState == VertexState.SUCCEEDED) {
                        vertex.succeededTaskCount = vertex.numTasks;
                        vertex.completedTaskCount = vertex.numTasks;
                    } else if (desiredState == VertexState.KILLED) {
                        vertex.killedTaskCount = vertex.numTasks;
                    } else if (desiredState == VertexState.FAILED || desiredState == VertexState.ERROR) {
                        vertex.failedTaskCount = vertex.numTasks;
                    }
                    if (vertex.tasks != null) {
                        TaskState taskState = TaskState.KILLED;
                        if (desiredState == VertexState.SUCCEEDED) {
                            taskState = TaskState.SUCCEEDED;
                        } else if (desiredState == VertexState.KILLED) {
                            taskState = TaskState.KILLED;
                        } else if (desiredState == VertexState.FAILED || desiredState == VertexState.ERROR) {
                            taskState = TaskState.FAILED;
                        }
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId(), taskState, false));
                        }
                    }
                    LOG.info("DAG informed Vertex of its final completed state, vertex=" + vertex.logIdentifier + ", state=" + (Object)((Object)desiredState));
                    return desiredState;
                }
                default: {
                    LOG.info("Unhandled desired state provided by DAG, vertex=" + vertex.logIdentifier + ", state=" + (Object)((Object)desiredState));
                    vertex.finished(VertexState.ERROR);
                }
            }
            switch (vertex.recoveredState) {
                case NEW: {
                    Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
                    while (iterator.hasNext()) {
                        if (!iterator.next().getEventType().equals((Object)EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) continue;
                        iterator.remove();
                    }
                    vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_INIT));
                    vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_START));
                    endState = VertexState.NEW;
                    break;
                }
                case INITED: {
                    String msg2;
                    try {
                        vertex.initializeCommitters();
                    }
                    catch (Exception e) {
                        msg2 = "Failed to initialize committers, vertex=" + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace((Throwable)e);
                        LOG.error(msg2);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg2);
                        endState = VertexState.FAILED;
                        break;
                    }
                    if (vertex.tasks != null) {
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId()));
                        }
                    }
                    vertex.eventHandler.handle((Event)new VertexEvent(vertex.vertexId, VertexEventType.V_START));
                    if (vertex.getInputVertices().isEmpty()) {
                        endState = VertexState.INITED;
                        break;
                    }
                    endState = VertexState.RECOVERING;
                    break;
                }
                case RUNNING: {
                    String msg;
                    String msg2;
                    try {
                        vertex.initializeCommitters();
                    }
                    catch (Exception e) {
                        msg2 = "Failed to initialize committers, vertex=" + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace((Throwable)e);
                        LOG.error(msg2);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg2);
                        endState = VertexState.FAILED;
                        break;
                    }
                    if (vertex.recoveryCommitInProgress) {
                        msg = "Recovered vertex was in the middle of a commit, failing Vertex=" + vertex.logIdentifier;
                        LOG.warn(msg);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.COMMIT_FAILURE, msg);
                        endState = VertexState.FAILED;
                        break;
                    }
                    assert (vertex.tasks.size() == vertex.numTasks);
                    if (vertex.tasks != null && vertex.numTasks != 0) {
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId()));
                        }
                        try {
                            vertex.recoveryCodeSimulatingStart();
                            vertex.unsetTasksNotYetScheduled();
                            endState = VertexState.RUNNING;
                        }
                        catch (AMUserCodeException e) {
                            msg2 = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                            LOG.error(msg2, (Throwable)((Object)e));
                            vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg2 + ", " + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                            endState = VertexState.FAILED;
                        }
                        break;
                    }
                    endState = VertexState.SUCCEEDED;
                    vertex.finished(endState);
                    break;
                }
                case SUCCEEDED: {
                    TaskState taskState;
                    String msg;
                    if (vertex.hasCommitter && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) {
                        msg = "Cannot recover vertex as all recovery events not found, vertex=" + vertex.logIdentifier + ", hasCommitters=" + vertex.hasCommitter + ", summaryCompletionSeen=" + vertex.summaryCompleteSeen + ", finalCompletionSeen=" + vertex.vertexCompleteSeen;
                        LOG.warn(msg);
                        vertex.finished(VertexState.FAILED, VertexTerminationCause.COMMIT_FAILURE, msg);
                        endState = VertexState.FAILED;
                        break;
                    }
                    if (vertex.tasks != null && vertex.numTasks != 0) {
                        taskState = TaskState.SUCCEEDED;
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId(), taskState));
                        }
                        try {
                            vertex.recoveryCodeSimulatingStart();
                            vertex.unsetTasksNotYetScheduled();
                            endState = VertexState.RUNNING;
                        }
                        catch (AMUserCodeException e) {
                            String msg3 = "Exception in " + (Object)((Object)e.getSource()) + ", vertex:" + vertex.getLogIdentifier();
                            LOG.error(msg3, (Throwable)((Object)e));
                            vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg3 + "," + ExceptionUtils.getStackTrace((Throwable)e.getCause()));
                            endState = VertexState.FAILED;
                        }
                        break;
                    }
                    endState = vertex.recoveredState;
                    vertex.finished(endState);
                    break;
                }
                case KILLED: 
                case FAILED: {
                    TaskState taskState;
                    if (vertex.tasks != null && vertex.numTasks != 0) {
                        taskState = TaskState.FAILED;
                        if (vertex.recoveredState == VertexState.KILLED) {
                            taskState = TaskState.KILLED;
                        }
                        for (Task task : vertex.tasks.values()) {
                            vertex.eventHandler.handle((Event)new TaskEventRecoverTask(task.getTaskId(), taskState, false));
                        }
                    }
                    endState = vertex.recoveredState;
                    vertex.finished(endState);
                    break;
                }
                default: {
                    LOG.warn("Invalid recoveredState found when trying to recover vertex, vertex=" + vertex.logIdentifier + ", recoveredState=" + (Object)((Object)vertex.recoveredState));
                    vertex.finished(VertexState.ERROR);
                    endState = VertexState.ERROR;
                }
            }
            if (!endState.equals((Object)VertexState.RECOVERING)) {
                LOG.info("Recovered Vertex State, vertexId=" + vertex.logIdentifier + ", state=" + (Object)((Object)endState) + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices + ", recoveredEvents=" + (vertex.recoveredEvents == null ? "null" : Integer.valueOf(vertex.recoveredEvents.size())) + ", tasksIsNull=" + (vertex.tasks == null) + ", numTasks=" + (vertex.tasks == null ? "null" : Integer.valueOf(vertex.tasks.size())));
                for (Map.Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
                    vertex.eventHandler.handle((Event)new VertexEventSourceVertexRecovered(entry.getKey().getVertexId(), vertex.vertexId, endState, null, vertex.getDistanceFromRoot()));
                }
            }
            if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED).contains((Object)endState)) {
                vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
                vertex.recoveredEvents.clear();
            } else if (!vertex.recoveredEvents.isEmpty()) {
                throw new RuntimeException("Invalid Vertex state, found non-zero recovered events in invalid state, vertex=" + vertex.logIdentifier + ", recoveredState=" + (Object)((Object)endState) + ", recoveredEvents=" + vertex.recoveredEvents.size());
            }
            return endState;
        }
    }

    class VertexStatisticsImpl
    implements VertexStatistics {
        final Map<String, IOStatisticsImpl> ioStats;
        final BitSet taskSet;

        public VertexStatisticsImpl() {
            this.ioStats = Maps.newHashMapWithExpectedSize((int)VertexImpl.this.ioIndices.size());
            this.taskSet = new BitSet();
            for (String name : VertexImpl.this.getIOIndices().keySet()) {
                this.ioStats.put(name, new IOStatisticsImpl());
            }
        }

        public IOStatisticsImpl getIOStatistics(String ioName) {
            return this.ioStats.get(ioName);
        }

        void mergeFrom(TaskStatistics taskStats) {
            if (taskStats == null) {
                return;
            }
            for (Map.Entry entry : taskStats.getIOStatistics().entrySet()) {
                String ioName = (String)entry.getKey();
                IOStatisticsImpl myIOStat = this.ioStats.get(ioName);
                Preconditions.checkState((myIOStat != null ? 1 : 0) != 0, (Object)("Unexpected IO name: " + ioName + " for vertex:" + VertexImpl.this.getLogIdentifier()));
                myIOStat.mergeFrom((IOStatistics)entry.getValue());
            }
        }

        public InputStatistics getInputStatistics(String inputName) {
            return this.getIOStatistics(inputName);
        }

        public OutputStatistics getOutputStatistics(String outputName) {
            return this.getIOStatistics(outputName);
        }

        void addTask(TezTaskID taskID) {
            this.taskSet.set(taskID.getId());
        }

        boolean containsTask(TezTaskID taskID) {
            return this.taskSet.get(taskID.getId());
        }
    }

    static class IOStatisticsImpl
    extends IOStatistics
    implements InputStatistics,
    OutputStatistics {
        IOStatisticsImpl() {
        }

        public long getDataSize() {
            return super.getDataSize();
        }

        public long getItemsProcessed() {
            return super.getItemsProcessed();
        }
    }

    static class EventInfo {
        final TezEvent tezEvent;
        final Edge eventEdge;
        final int eventTaskIndex;
        boolean isObsolete = false;

        EventInfo(TezEvent tezEvent, Edge eventEdge, int eventTaskIndex) {
            this.tezEvent = tezEvent;
            this.eventEdge = eventEdge;
            this.eventTaskIndex = eventTaskIndex;
        }
    }
}

