/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.execution.BasicStageExecutionStats;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.Location;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.RemoteTaskFactory;
import com.facebook.presto.execution.StageExecutionId;
import com.facebook.presto.execution.StageExecutionInfo;
import com.facebook.presto.execution.StageExecutionState;
import com.facebook.presto.execution.StageExecutionStateMachine;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.scheduler.SplitSchedulerStats;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.failureDetector.FailureDetector;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.RemoteTransactionHandle;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.operator.ExchangeOperator;
import com.facebook.presto.spi.ErrorCode;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.split.RemoteSplit;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public final class SqlStageExecution {
    public static final Set<ErrorCode> RECOVERABLE_ERROR_CODES = ImmutableSet.of((Object)StandardErrorCode.TOO_MANY_REQUESTS_FAILED.toErrorCode(), (Object)StandardErrorCode.PAGE_TRANSPORT_ERROR.toErrorCode(), (Object)StandardErrorCode.PAGE_TRANSPORT_TIMEOUT.toErrorCode(), (Object)StandardErrorCode.REMOTE_TASK_MISMATCH.toErrorCode(), (Object)StandardErrorCode.REMOTE_TASK_ERROR.toErrorCode());
    private final Session session;
    private final StageExecutionStateMachine stateMachine;
    private final PlanFragment planFragment;
    private final RemoteTaskFactory remoteTaskFactory;
    private final NodeTaskMap nodeTaskMap;
    private final boolean summarizeTaskInfo;
    private final Executor executor;
    private final FailureDetector failureDetector;
    private final double maxFailedTaskPercentage;
    private final Map<PlanFragmentId, RemoteSourceNode> exchangeSources;
    private final TableWriteInfo tableWriteInfo;
    private final Map<InternalNode, Set<RemoteTask>> tasks = new ConcurrentHashMap<InternalNode, Set<RemoteTask>>();
    @GuardedBy(value="this")
    private final AtomicInteger nextTaskId = new AtomicInteger();
    @GuardedBy(value="this")
    private final Set<TaskId> allTasks = Sets.newConcurrentHashSet();
    @GuardedBy(value="this")
    private final Set<TaskId> finishedTasks = Sets.newConcurrentHashSet();
    @GuardedBy(value="this")
    private final Set<TaskId> failedTasks = Sets.newConcurrentHashSet();
    @GuardedBy(value="this")
    private final Set<TaskId> tasksWithFinalInfo = Sets.newConcurrentHashSet();
    private final Set<Lifespan> finishedLifespans = ConcurrentHashMap.newKeySet();
    private final int totalLifespans;
    @GuardedBy(value="this")
    private final AtomicBoolean splitsScheduled = new AtomicBoolean();
    @GuardedBy(value="this")
    private final Multimap<PlanNodeId, RemoteTask> sourceTasks = HashMultimap.create();
    @GuardedBy(value="this")
    private final Set<PlanNodeId> completeSources = Sets.newConcurrentHashSet();
    @GuardedBy(value="this")
    private final Set<PlanFragmentId> completeSourceFragments = Sets.newConcurrentHashSet();
    private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference();
    private final ListenerManager<Set<Lifespan>> completedLifespansChangeListeners = new ListenerManager();
    @GuardedBy(value="this")
    private Optional<StageTaskRecoveryCallback> stageTaskRecoveryCallback = Optional.empty();

    public static SqlStageExecution createSqlStageExecution(StageExecutionId stageExecutionId, PlanFragment fragment, RemoteTaskFactory remoteTaskFactory, Session session, boolean summarizeTaskInfo, NodeTaskMap nodeTaskMap, ExecutorService executor, FailureDetector failureDetector, SplitSchedulerStats schedulerStats, TableWriteInfo tableWriteInfo) {
        Objects.requireNonNull(stageExecutionId, "stageId is null");
        Objects.requireNonNull(fragment, "fragment is null");
        Objects.requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
        Objects.requireNonNull(session, "session is null");
        Objects.requireNonNull(nodeTaskMap, "nodeTaskMap is null");
        Objects.requireNonNull(executor, "executor is null");
        Objects.requireNonNull(failureDetector, "failureDetector is null");
        Objects.requireNonNull(schedulerStats, "schedulerStats is null");
        Objects.requireNonNull(tableWriteInfo, "tableWriteInfo is null");
        SqlStageExecution sqlStageExecution = new SqlStageExecution(session, new StageExecutionStateMachine(stageExecutionId, executor, schedulerStats, !fragment.getTableScanSchedulingOrder().isEmpty()), fragment, remoteTaskFactory, nodeTaskMap, summarizeTaskInfo, executor, failureDetector, SystemSessionProperties.getMaxFailedTaskPercentage(session), tableWriteInfo);
        sqlStageExecution.initialize();
        return sqlStageExecution;
    }

    private SqlStageExecution(Session session, StageExecutionStateMachine stateMachine, PlanFragment planFragment, RemoteTaskFactory remoteTaskFactory, NodeTaskMap nodeTaskMap, boolean summarizeTaskInfo, Executor executor, FailureDetector failureDetector, double maxFailedTaskPercentage, TableWriteInfo tableWriteInfo) {
        this.session = Objects.requireNonNull(session, "session is null");
        this.stateMachine = stateMachine;
        this.planFragment = Objects.requireNonNull(planFragment, "planFragment is null");
        this.remoteTaskFactory = Objects.requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
        this.nodeTaskMap = Objects.requireNonNull(nodeTaskMap, "nodeTaskMap is null");
        this.summarizeTaskInfo = summarizeTaskInfo;
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.failureDetector = Objects.requireNonNull(failureDetector, "failureDetector is null");
        this.tableWriteInfo = Objects.requireNonNull(tableWriteInfo);
        this.maxFailedTaskPercentage = maxFailedTaskPercentage;
        ImmutableMap.Builder fragmentToExchangeSource = ImmutableMap.builder();
        for (RemoteSourceNode remoteSourceNode : planFragment.getRemoteSourceNodes()) {
            for (PlanFragmentId planFragmentId : remoteSourceNode.getSourceFragmentIds()) {
                fragmentToExchangeSource.put((Object)planFragmentId, (Object)remoteSourceNode);
            }
        }
        this.exchangeSources = fragmentToExchangeSource.build();
        this.totalLifespans = planFragment.getStageExecutionDescriptor().getTotalLifespans();
    }

    private void initialize() {
        this.stateMachine.addStateChangeListener(newState -> this.checkAllTaskFinal());
        this.completedLifespansChangeListeners.addListener(lifespans -> this.finishedLifespans.addAll((Collection<Lifespan>)lifespans));
    }

    public StageExecutionId getStageExecutionId() {
        return this.stateMachine.getStageExecutionId();
    }

    public StageExecutionState getState() {
        return this.stateMachine.getState();
    }

    public void addStateChangeListener(StateMachine.StateChangeListener<StageExecutionState> stateChangeListener) {
        this.stateMachine.addStateChangeListener(stateChangeListener);
    }

    public void addFinalStageInfoListener(StateMachine.StateChangeListener<StageExecutionInfo> stateChangeListener) {
        this.stateMachine.addFinalStageInfoListener(stateChangeListener);
    }

    public void addCompletedDriverGroupsChangedListener(Consumer<Set<Lifespan>> newlyCompletedDriverGroupConsumer) {
        this.completedLifespansChangeListeners.addListener(newlyCompletedDriverGroupConsumer);
    }

    public synchronized void registerStageTaskRecoveryCallback(StageTaskRecoveryCallback stageTaskRecoveryCallback) {
        Preconditions.checkState((!this.stageTaskRecoveryCallback.isPresent() ? 1 : 0) != 0, (Object)"stageTaskRecoveryCallback should be registered only once");
        this.stageTaskRecoveryCallback = Optional.of(Objects.requireNonNull(stageTaskRecoveryCallback, "stageTaskRecoveryCallback is null"));
    }

    public PlanFragment getFragment() {
        return this.planFragment;
    }

    public OutputBuffers getOutputBuffers() {
        return this.outputBuffers.get();
    }

    public void beginScheduling() {
        this.stateMachine.transitionToScheduling();
    }

    public synchronized void transitionToFinishedTaskScheduling() {
        this.stateMachine.transitionToFinishedTaskScheduling();
    }

    public synchronized void transitionToSchedulingSplits() {
        this.stateMachine.transitionToSchedulingSplits();
    }

    public synchronized void schedulingComplete() {
        if (!this.stateMachine.transitionToScheduled()) {
            return;
        }
        if (this.getAllTasks().stream().anyMatch(task -> this.getState() == StageExecutionState.RUNNING)) {
            this.stateMachine.transitionToRunning();
        }
        if (this.finishedTasks.size() == this.allTasks.size()) {
            this.stateMachine.transitionToFinished();
        }
        for (PlanNodeId tableScanPlanNodeId : this.planFragment.getTableScanSchedulingOrder()) {
            this.schedulingComplete(tableScanPlanNodeId);
        }
    }

    public synchronized void schedulingComplete(PlanNodeId partitionedSource) {
        for (RemoteTask task : this.getAllTasks()) {
            task.noMoreSplits(partitionedSource);
        }
        this.completeSources.add(partitionedSource);
    }

    public synchronized void cancel() {
        this.stateMachine.transitionToCanceled();
        this.getAllTasks().forEach(RemoteTask::cancel);
    }

    public synchronized void abort() {
        this.stateMachine.transitionToAborted();
        this.getAllTasks().forEach(RemoteTask::abort);
    }

    public long getUserMemoryReservation() {
        return this.stateMachine.getUserMemoryReservation();
    }

    public long getTotalMemoryReservation() {
        return this.stateMachine.getTotalMemoryReservation();
    }

    public synchronized Duration getTotalCpuTime() {
        long millis = this.getAllTasks().stream().mapToLong(task -> task.getTaskInfo().getStats().getTotalCpuTime().toMillis()).sum();
        return new Duration((double)millis, TimeUnit.MILLISECONDS);
    }

    public BasicStageExecutionStats getBasicStageStats() {
        return this.stateMachine.getBasicStageStats(this::getAllTaskInfo);
    }

    public StageExecutionInfo getStageExecutionInfo() {
        return this.stateMachine.getStageExecutionInfo(this::getAllTaskInfo, this.finishedLifespans.size(), this.totalLifespans);
    }

    private Iterable<TaskInfo> getAllTaskInfo() {
        return (Iterable)this.getAllTasks().stream().map(RemoteTask::getTaskInfo).collect(ImmutableList.toImmutableList());
    }

    public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set<RemoteTask> sourceTasks, boolean noMoreExchangeLocations) {
        Objects.requireNonNull(fragmentId, "fragmentId is null");
        Objects.requireNonNull(sourceTasks, "sourceTasks is null");
        RemoteSourceNode remoteSource = this.exchangeSources.get(fragmentId);
        Preconditions.checkArgument((remoteSource != null ? 1 : 0) != 0, (String)"Unknown remote source %s. Known sources are %s", (Object)fragmentId, this.exchangeSources.keySet());
        this.sourceTasks.putAll((Object)remoteSource.getId(), sourceTasks);
        for (RemoteTask task : this.getAllTasks()) {
            ImmutableMultimap.Builder newSplits = ImmutableMultimap.builder();
            for (RemoteTask sourceTask : sourceTasks) {
                TaskStatus sourceTaskStatus = sourceTask.getTaskStatus();
                newSplits.put((Object)remoteSource.getId(), (Object)SqlStageExecution.createRemoteSplitFor(task.getTaskId(), sourceTask.getRemoteTaskLocation(), sourceTask.getTaskId()));
            }
            task.addSplits((Multimap<PlanNodeId, Split>)newSplits.build());
        }
        if (noMoreExchangeLocations) {
            this.completeSourceFragments.add(fragmentId);
            if (this.completeSourceFragments.containsAll(remoteSource.getSourceFragmentIds())) {
                this.completeSources.add(remoteSource.getId());
                for (RemoteTask task : this.getAllTasks()) {
                    task.noMoreSplits(remoteSource.getId());
                }
            }
        }
    }

    public synchronized void setOutputBuffers(OutputBuffers outputBuffers) {
        OutputBuffers currentOutputBuffers;
        Objects.requireNonNull(outputBuffers, "outputBuffers is null");
        do {
            if ((currentOutputBuffers = this.outputBuffers.get()) == null) continue;
            if (outputBuffers.getVersion() <= currentOutputBuffers.getVersion()) {
                return;
            }
            currentOutputBuffers.checkValidTransition(outputBuffers);
        } while (!this.outputBuffers.compareAndSet(currentOutputBuffers, outputBuffers));
        for (RemoteTask task : this.getAllTasks()) {
            task.setOutputBuffers(outputBuffers);
        }
    }

    public boolean hasTasks() {
        return !this.tasks.isEmpty();
    }

    public List<RemoteTask> getAllTasks() {
        return (List)this.tasks.values().stream().flatMap(Collection::stream).collect(ImmutableList.toImmutableList());
    }

    public void removeRemoteSourceIfSingleTaskStage(TaskId remoteSourceTaskId) {
        List<RemoteTask> allTasks = this.getAllTasks();
        if (allTasks.size() > 1) {
            return;
        }
        ((RemoteTask)Iterables.getOnlyElement(allTasks)).removeRemoteSource(remoteSourceTaskId);
    }

    public synchronized Optional<RemoteTask> scheduleTask(InternalNode node, int partition) {
        Objects.requireNonNull(node, "node is null");
        if (this.stateMachine.getState().isDone()) {
            return Optional.empty();
        }
        Preconditions.checkState((!this.splitsScheduled.get() ? 1 : 0) != 0, (Object)"scheduleTask can not be called once splits have been scheduled");
        return Optional.of(this.scheduleTask(node, new TaskId(this.stateMachine.getStageExecutionId(), partition), (Multimap<PlanNodeId, Split>)ImmutableMultimap.of()));
    }

    public synchronized Set<RemoteTask> scheduleSplits(InternalNode node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification) {
        RemoteTask task;
        Objects.requireNonNull(node, "node is null");
        Objects.requireNonNull(splits, "splits is null");
        if (this.stateMachine.getState().isDone()) {
            return ImmutableSet.of();
        }
        this.splitsScheduled.set(true);
        Preconditions.checkArgument((boolean)this.planFragment.getTableScanSchedulingOrder().containsAll(splits.keySet()), (Object)"Invalid splits");
        ImmutableSet.Builder newTasks = ImmutableSet.builder();
        Collection tasks = this.tasks.get(node);
        if (tasks == null) {
            TaskId taskId = new TaskId(this.stateMachine.getStageExecutionId(), this.nextTaskId.getAndIncrement());
            task = this.scheduleTask(node, taskId, splits);
            newTasks.add((Object)task);
        } else {
            task = (RemoteTask)tasks.iterator().next();
            task.addSplits(splits);
        }
        if (noMoreSplitsNotification.size() > 1) {
            throw new UnsupportedOperationException("This assumption no longer holds: noMoreSplitsNotification.size() < 1");
        }
        for (Map.Entry entry : noMoreSplitsNotification.entries()) {
            task.noMoreSplits((PlanNodeId)entry.getKey(), (Lifespan)entry.getValue());
        }
        return newTasks.build();
    }

    private synchronized RemoteTask scheduleTask(InternalNode node, TaskId taskId, Multimap<PlanNodeId, Split> sourceSplits) {
        Preconditions.checkArgument((!this.allTasks.contains(taskId) ? 1 : 0) != 0, (String)"A task with id %s already exists", (Object)taskId);
        ImmutableMultimap.Builder initialSplits = ImmutableMultimap.builder();
        initialSplits.putAll(sourceSplits);
        this.sourceTasks.forEach((planNodeId, task) -> {
            TaskStatus status = task.getTaskStatus();
            if (status.getState() != TaskState.FINISHED) {
                initialSplits.put(planNodeId, (Object)SqlStageExecution.createRemoteSplitFor(taskId, task.getRemoteTaskLocation(), task.getTaskId()));
            }
        });
        OutputBuffers outputBuffers = this.outputBuffers.get();
        Preconditions.checkState((outputBuffers != null ? 1 : 0) != 0, (Object)"Initial output buffers must be set before a task can be scheduled");
        RemoteTask task2 = this.remoteTaskFactory.createRemoteTask(this.session, taskId, node, this.planFragment, (Multimap<PlanNodeId, Split>)initialSplits.build(), outputBuffers, this.nodeTaskMap.createPartitionedSplitCountTracker(node, taskId), this.summarizeTaskInfo, this.tableWriteInfo);
        this.completeSources.forEach(task2::noMoreSplits);
        this.allTasks.add(taskId);
        this.tasks.computeIfAbsent(node, key -> Sets.newConcurrentHashSet()).add(task2);
        this.nodeTaskMap.addTask(node, task2);
        task2.addStateChangeListener(new StageTaskListener(taskId));
        task2.addFinalTaskInfoListener(this::updateFinalTaskInfo);
        if (!this.stateMachine.getState().isDone()) {
            task2.start();
        } else {
            task2.abort();
        }
        return task2;
    }

    public Set<InternalNode> getScheduledNodes() {
        return ImmutableSet.copyOf(this.tasks.keySet());
    }

    public void recordGetSplitTime(long start) {
        this.stateMachine.recordGetSplitTime(start);
    }

    private static Split createRemoteSplitFor(TaskId taskId, URI remoteSourceTaskLocation, TaskId remoteSourceTaskId) {
        String splitLocation = remoteSourceTaskLocation.toASCIIString() + "/results/" + taskId.getId();
        return new Split(ExchangeOperator.REMOTE_CONNECTOR_ID, new RemoteTransactionHandle(), new RemoteSplit(new Location(splitLocation), remoteSourceTaskId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateTaskStatus(TaskId taskId, TaskStatus taskStatus) {
        try {
            StageExecutionState stageExecutionState = this.getState();
            if (stageExecutionState.isDone()) {
                return;
            }
            TaskState taskState = taskStatus.getState();
            if (taskState == TaskState.FAILED) {
                this.failedTasks.add(taskId);
                RuntimeException failure = taskStatus.getFailures().stream().findFirst().map(this::rewriteTransportFailure).map(ExecutionFailureInfo::toException).orElse((RuntimeException)((Object)new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason")));
                if (this.isRecoverable(taskStatus.getFailures())) {
                    try {
                        this.stageTaskRecoveryCallback.get().recover(taskId);
                        this.finishedTasks.add(taskId);
                    }
                    catch (Throwable t) {
                        failure.addSuppressed(new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_RECOVERY_ERROR, String.format("Encountered error when trying to recover task %s", taskId), t));
                        this.stateMachine.transitionToFailed(failure);
                    }
                } else {
                    this.stateMachine.transitionToFailed(failure);
                }
            } else if (taskState == TaskState.ABORTED) {
                this.stateMachine.transitionToFailed(new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "A task is in the ABORTED state but stage is " + (Object)((Object)stageExecutionState)));
            } else if (taskState == TaskState.FINISHED) {
                this.finishedTasks.add(taskId);
            }
            stageExecutionState = this.getState();
            if (stageExecutionState == StageExecutionState.SCHEDULED || stageExecutionState == StageExecutionState.RUNNING) {
                if (taskState == TaskState.RUNNING) {
                    this.stateMachine.transitionToRunning();
                }
                if (this.finishedTasks.size() == this.allTasks.size()) {
                    this.stateMachine.transitionToFinished();
                }
            }
        }
        finally {
            this.checkAllTaskFinal();
        }
    }

    private boolean isRecoverable(List<ExecutionFailureInfo> failures) {
        for (ExecutionFailureInfo failure : failures) {
            if (RECOVERABLE_ERROR_CODES.contains(failure.getErrorCode())) continue;
            return false;
        }
        return this.stageTaskRecoveryCallback.isPresent() && (double)this.failedTasks.size() < (double)this.allTasks.size() * this.maxFailedTaskPercentage;
    }

    private synchronized void updateFinalTaskInfo(TaskInfo finalTaskInfo) {
        this.tasksWithFinalInfo.add(finalTaskInfo.getTaskId());
        this.checkAllTaskFinal();
    }

    private synchronized void checkAllTaskFinal() {
        if (this.stateMachine.getState().isDone() && this.tasksWithFinalInfo.containsAll(this.allTasks)) {
            if (this.getFragment().getStageExecutionDescriptor().isStageGroupedExecution()) {
                Preconditions.checkState((this.finishedLifespans.size() <= this.totalLifespans ? 1 : 0) != 0, (Object)String.format("Number of finished lifespans (%s) exceeds number of total lifespans (%s)", this.finishedLifespans.size(), this.totalLifespans));
            } else {
                Preconditions.checkState((boolean)this.finishedLifespans.isEmpty());
            }
            List finalTaskInfos = (List)this.getAllTasks().stream().map(RemoteTask::getTaskInfo).collect(ImmutableList.toImmutableList());
            this.stateMachine.setAllTasksFinal(finalTaskInfos, this.totalLifespans);
        }
    }

    private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo executionFailureInfo) {
        if (executionFailureInfo.getRemoteHost() == null || this.failureDetector.getState(executionFailureInfo.getRemoteHost()) != FailureDetector.State.GONE) {
            return executionFailureInfo;
        }
        return new ExecutionFailureInfo(executionFailureInfo.getType(), executionFailureInfo.getMessage(), executionFailureInfo.getCause(), executionFailureInfo.getSuppressed(), executionFailureInfo.getStack(), executionFailureInfo.getErrorLocation(), StandardErrorCode.REMOTE_HOST_GONE.toErrorCode(), executionFailureInfo.getRemoteHost());
    }

    public String toString() {
        return this.stateMachine.toString();
    }

    private static class ListenerManager<T> {
        private final List<Consumer<T>> listeners = new ArrayList<Consumer<T>>();
        private boolean frozen;

        private ListenerManager() {
        }

        public synchronized void addListener(Consumer<T> listener) {
            Preconditions.checkState((!this.frozen ? 1 : 0) != 0, (Object)"Listeners have been invoked");
            this.listeners.add(listener);
        }

        public synchronized void invoke(T payload, Executor executor) {
            this.frozen = true;
            for (Consumer listener : this.listeners) {
                executor.execute(() -> listener.accept(payload));
            }
        }
    }

    @FunctionalInterface
    public static interface StageTaskRecoveryCallback {
        public void recover(TaskId var1);
    }

    private class StageTaskListener
    implements StateMachine.StateChangeListener<TaskStatus> {
        private long previousUserMemory;
        private long previousSystemMemory;
        private final Set<Lifespan> completedDriverGroups = new HashSet<Lifespan>();
        private final TaskId taskId;

        public StageTaskListener(TaskId taskId) {
            this.taskId = Objects.requireNonNull(taskId, "taskId is null");
        }

        @Override
        public void stateChanged(TaskStatus taskStatus) {
            try {
                this.updateMemoryUsage(taskStatus);
                this.updateCompletedDriverGroups(taskStatus);
            }
            finally {
                SqlStageExecution.this.updateTaskStatus(this.taskId, taskStatus);
            }
        }

        private synchronized void updateMemoryUsage(TaskStatus taskStatus) {
            long currentUserMemory = taskStatus.getMemoryReservationInBytes();
            long currentSystemMemory = taskStatus.getSystemMemoryReservationInBytes();
            long deltaUserMemoryInBytes = currentUserMemory - this.previousUserMemory;
            long deltaTotalMemoryInBytes = currentUserMemory + currentSystemMemory - (this.previousUserMemory + this.previousSystemMemory);
            this.previousUserMemory = currentUserMemory;
            this.previousSystemMemory = currentSystemMemory;
            SqlStageExecution.this.stateMachine.updateMemoryUsage(deltaUserMemoryInBytes, deltaTotalMemoryInBytes);
        }

        private synchronized void updateCompletedDriverGroups(TaskStatus taskStatus) {
            ImmutableSet newlyCompletedDriverGroups = ImmutableSet.copyOf((Collection)Sets.difference(taskStatus.getCompletedDriverGroups(), this.completedDriverGroups));
            if (newlyCompletedDriverGroups.isEmpty()) {
                return;
            }
            SqlStageExecution.this.completedLifespansChangeListeners.invoke(newlyCompletedDriverGroups, SqlStageExecution.this.executor);
            this.completedDriverGroups.addAll((Collection<Lifespan>)newlyCompletedDriverGroups);
        }
    }
}

