/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStateMachine;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.RemoteTaskFactory;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.StageState;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.scheduler.BroadcastOutputBufferManager;
import com.facebook.presto.execution.scheduler.DynamicSplitPlacementPolicy;
import com.facebook.presto.execution.scheduler.ExecutionPolicy;
import com.facebook.presto.execution.scheduler.ExecutionSchedule;
import com.facebook.presto.execution.scheduler.FixedCountScheduler;
import com.facebook.presto.execution.scheduler.FixedSourcePartitionedScheduler;
import com.facebook.presto.execution.scheduler.NodeScheduler;
import com.facebook.presto.execution.scheduler.NodeSelector;
import com.facebook.presto.execution.scheduler.OutputBufferManager;
import com.facebook.presto.execution.scheduler.PartitionedOutputBufferManager;
import com.facebook.presto.execution.scheduler.ScaledOutputBufferManager;
import com.facebook.presto.execution.scheduler.ScaledWriterScheduler;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.execution.scheduler.SourcePartitionedScheduler;
import com.facebook.presto.execution.scheduler.SplitSchedulerStats;
import com.facebook.presto.execution.scheduler.StageScheduler;
import com.facebook.presto.failureDetector.FailureDetector;
import com.facebook.presto.operator.PipelineExecutionStrategy;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.sql.planner.NodePartitionMap;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.PartitioningHandle;
import com.facebook.presto.sql.planner.StageExecutionPlan;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.util.Failures;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.SetThreadName;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.stats.TimeStat;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class SqlQueryScheduler {
    private final QueryStateMachine queryStateMachine;
    private final ExecutionPolicy executionPolicy;
    private final Map<StageId, SqlStageExecution> stages;
    private final ExecutorService executor;
    private final FailureDetector failureDetector;
    private final StageId rootStageId;
    private final Map<StageId, StageScheduler> stageSchedulers;
    private final Map<StageId, StageLinkage> stageLinkages;
    private final SplitSchedulerStats schedulerStats;
    private final boolean summarizeTaskInfo;
    private final AtomicBoolean started = new AtomicBoolean();
    private final SettableFuture<QueryExecution.QueryOutputInfo> rootStageOutputBufferLocations = SettableFuture.create();

    public SqlQueryScheduler(QueryStateMachine queryStateMachine, LocationFactory locationFactory, StageExecutionPlan plan, NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, RemoteTaskFactory remoteTaskFactory, Session session, boolean summarizeTaskInfo, int splitBatchSize, ExecutorService queryExecutor, ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, OutputBuffers rootOutputBuffers, NodeTaskMap nodeTaskMap, ExecutionPolicy executionPolicy, SplitSchedulerStats schedulerStats) {
        this.queryStateMachine = Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
        this.executionPolicy = Objects.requireNonNull(executionPolicy, "schedulerPolicyFactory is null");
        this.schedulerStats = Objects.requireNonNull(schedulerStats, "schedulerStats is null");
        this.summarizeTaskInfo = summarizeTaskInfo;
        ImmutableMap.Builder stageSchedulers = ImmutableMap.builder();
        ImmutableMap.Builder stageLinkages = ImmutableMap.builder();
        HashMap partitioningCache = new HashMap();
        OutputBuffers.OutputBufferId rootBufferId = (OutputBuffers.OutputBufferId)Iterables.getOnlyElement(rootOutputBuffers.getBuffers().keySet());
        List<SqlStageExecution> stages = this.createStages((fragmentId, tasks, noMoreExchangeLocations) -> SqlQueryScheduler.updateQueryOutputLocations(queryStateMachine, rootBufferId, tasks, noMoreExchangeLocations), new AtomicInteger(), locationFactory, plan.withBucketToPartition(Optional.of(new int[1])), nodeScheduler, remoteTaskFactory, session, splitBatchSize, partitioningHandle -> partitioningCache.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap(session, (PartitioningHandle)handle)), nodePartitioningManager, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, (ImmutableMap.Builder<StageId, StageScheduler>)stageSchedulers, (ImmutableMap.Builder<StageId, StageLinkage>)stageLinkages);
        SqlStageExecution rootStage = stages.get(0);
        rootStage.setOutputBuffers(rootOutputBuffers);
        this.rootStageId = rootStage.getStageId();
        this.stages = (Map)stages.stream().collect(ImmutableMap.toImmutableMap(SqlStageExecution::getStageId, Function.identity()));
        this.stageSchedulers = stageSchedulers.build();
        this.stageLinkages = stageLinkages.build();
        this.executor = queryExecutor;
        this.failureDetector = failureDetector;
        rootStage.addStateChangeListener(state -> {
            if (state == StageState.FINISHED) {
                queryStateMachine.transitionToFinishing();
            } else if (state == StageState.CANCELED) {
                queryStateMachine.transitionToCanceled();
            }
        });
        for (SqlStageExecution stage : stages) {
            stage.addStateChangeListener(state -> {
                if (queryStateMachine.isDone()) {
                    return;
                }
                if (state == StageState.FAILED) {
                    queryStateMachine.transitionToFailed(stage.getStageInfo().getFailureCause().toException());
                } else if (state == StageState.ABORTED) {
                    queryStateMachine.transitionToFailed(new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Query stage was aborted"));
                } else if (queryStateMachine.getQueryState() == QueryState.STARTING && stage.hasTasks()) {
                    queryStateMachine.transitionToRunning();
                }
            });
        }
    }

    private static void updateQueryOutputLocations(QueryStateMachine queryStateMachine, OutputBuffers.OutputBufferId rootBufferId, Set<RemoteTask> tasks, boolean noMoreExchangeLocations) {
        Set bufferLocations = (Set)tasks.stream().map(task -> task.getTaskStatus().getSelf()).map(location -> HttpUriBuilder.uriBuilderFrom((URI)location).appendPath("results").appendPath(rootBufferId.toString()).build()).collect(ImmutableSet.toImmutableSet());
        queryStateMachine.updateOutputLocations(bufferLocations, noMoreExchangeLocations);
    }

    private List<SqlStageExecution> createStages(ExchangeLocationsConsumer parent, AtomicInteger nextStageId, LocationFactory locationFactory, StageExecutionPlan plan, NodeScheduler nodeScheduler, RemoteTaskFactory remoteTaskFactory, Session session, int splitBatchSize, Function<PartitioningHandle, NodePartitionMap> partitioningCache, NodePartitioningManager nodePartitioningManager, ExecutorService queryExecutor, ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, NodeTaskMap nodeTaskMap, ImmutableMap.Builder<StageId, StageScheduler> stageSchedulers, ImmutableMap.Builder<StageId, StageLinkage> stageLinkages) {
        Optional<int[]> bucketToPartition;
        ImmutableList.Builder stages = ImmutableList.builder();
        StageId stageId = new StageId(this.queryStateMachine.getQueryId(), nextStageId.getAndIncrement());
        SqlStageExecution stage = new SqlStageExecution(stageId, locationFactory.createStageLocation(stageId), plan.getFragment(), remoteTaskFactory, session, this.summarizeTaskInfo, nodeTaskMap, queryExecutor, failureDetector, this.schedulerStats);
        stages.add((Object)stage);
        PartitioningHandle partitioningHandle = plan.getFragment().getPartitioning();
        if (partitioningHandle.equals(SystemPartitioningHandle.SOURCE_DISTRIBUTION)) {
            Map.Entry entry = (Map.Entry)Iterables.getOnlyElement(plan.getSplitSources().entrySet());
            PlanNodeId planNodeId = (PlanNodeId)entry.getKey();
            SplitSource splitSource = (SplitSource)entry.getValue();
            ConnectorId connectorId = splitSource.getConnectorId();
            if (ConnectorId.isInternalSystemConnector(connectorId)) {
                connectorId = null;
            }
            NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId);
            DynamicSplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stage::getAllTasks);
            Preconditions.checkArgument((plan.getFragment().getPipelineExecutionStrategy() == PipelineExecutionStrategy.UNGROUPED_EXECUTION ? 1 : 0) != 0);
            stageSchedulers.put((Object)stageId, (Object)SourcePartitionedScheduler.simpleSourcePartitionedScheduler(stage, planNodeId, splitSource, placementPolicy, splitBatchSize));
            bucketToPartition = Optional.of(new int[1]);
        } else if (partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
            bucketToPartition = Optional.of(new int[1]);
        } else {
            NodePartitionMap nodePartitionMap = partitioningCache.apply(plan.getFragment().getPartitioning());
            long nodeCount = nodePartitionMap.getPartitionToNode().values().stream().distinct().count();
            OptionalInt concurrentLifespansPerTask = SystemSessionProperties.getConcurrentLifespansPerNode(session);
            Map<PlanNodeId, SplitSource> splitSources = plan.getSplitSources();
            if (!splitSources.isEmpty()) {
                Object connectorPartitionHandles;
                List<PlanNodeId> schedulingOrder = plan.getFragment().getPartitionedSources();
                switch (plan.getFragment().getPipelineExecutionStrategy()) {
                    case GROUPED_EXECUTION: {
                        connectorPartitionHandles = nodePartitioningManager.listPartitionHandles(session, partitioningHandle);
                        Preconditions.checkState((!ImmutableList.of((Object)NotPartitionedPartitionHandle.NOT_PARTITIONED).equals(connectorPartitionHandles) ? 1 : 0) != 0);
                        break;
                    }
                    case UNGROUPED_EXECUTION: {
                        connectorPartitionHandles = ImmutableList.of((Object)NotPartitionedPartitionHandle.NOT_PARTITIONED);
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException();
                    }
                }
                stageSchedulers.put((Object)stageId, (Object)new FixedSourcePartitionedScheduler(stage, splitSources, plan.getFragment().getPipelineExecutionStrategy(), schedulingOrder, nodePartitionMap, splitBatchSize, concurrentLifespansPerTask.isPresent() ? OptionalInt.of(Math.toIntExact((long)concurrentLifespansPerTask.getAsInt() * nodeCount)) : OptionalInt.empty(), nodeScheduler.createNodeSelector(null), (List<ConnectorPartitionHandle>)connectorPartitionHandles));
                bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
            } else {
                Map<Integer, Node> partitionToNode = nodePartitionMap.getPartitionToNode();
                Failures.checkCondition(!partitionToNode.isEmpty(), (ErrorCodeSupplier)StandardErrorCode.NO_NODES_AVAILABLE, "No worker nodes available", new Object[0]);
                stageSchedulers.put((Object)stageId, (Object)new FixedCountScheduler(stage, partitionToNode));
                bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
            }
        }
        ImmutableSet.Builder childStagesBuilder = ImmutableSet.builder();
        for (StageExecutionPlan subStagePlan : plan.getSubStages()) {
            List<SqlStageExecution> subTree = this.createStages(stage::addExchangeLocations, nextStageId, locationFactory, subStagePlan.withBucketToPartition(bucketToPartition), nodeScheduler, remoteTaskFactory, session, splitBatchSize, partitioningCache, nodePartitioningManager, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, stageSchedulers, stageLinkages);
            stages.addAll(subTree);
            SqlStageExecution childStage = subTree.get(0);
            childStagesBuilder.add((Object)childStage);
        }
        ImmutableSet childStages = childStagesBuilder.build();
        stage.addStateChangeListener(arg_0 -> SqlQueryScheduler.lambda$createStages$7((Set)childStages, arg_0));
        stageLinkages.put((Object)stageId, (Object)new StageLinkage(plan.getFragment().getId(), parent, (Set<SqlStageExecution>)childStages));
        if (partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
            Supplier<Collection<TaskStatus>> sourceTasksProvider = () -> SqlQueryScheduler.lambda$createStages$8((Set)childStages);
            Supplier<Collection<TaskStatus>> writerTasksProvider = () -> stage.getAllTasks().stream().map(RemoteTask::getTaskStatus).collect(Collectors.toList());
            ScaledWriterScheduler scheduler = new ScaledWriterScheduler(stage, sourceTasksProvider, writerTasksProvider, nodeScheduler.createNodeSelector(null), schedulerExecutor, SystemSessionProperties.getWriterMinSize(session));
            SqlQueryScheduler.whenAllStages((Collection<SqlStageExecution>)childStages, StageState::isDone).addListener(scheduler::finish, MoreExecutors.directExecutor());
            stageSchedulers.put((Object)stageId, (Object)scheduler);
        }
        return stages.build();
    }

    public ListenableFuture<QueryExecution.QueryOutputInfo> getRootStageOutputBufferLocations() {
        return Futures.nonCancellationPropagating(this.rootStageOutputBufferLocations);
    }

    public StageInfo getStageInfo() {
        Map stageInfos = (Map)this.stages.values().stream().map(SqlStageExecution::getStageInfo).collect(ImmutableMap.toImmutableMap(StageInfo::getStageId, Function.identity()));
        return this.buildStageInfo(this.rootStageId, stageInfos);
    }

    private StageInfo buildStageInfo(StageId stageId, Map<StageId, StageInfo> stageInfos) {
        StageInfo parent = stageInfos.get(stageId);
        Preconditions.checkArgument((parent != null ? 1 : 0) != 0, (String)"No stageInfo for %s", (Object)parent);
        List childStages = (List)this.stageLinkages.get(stageId).getChildStageIds().stream().map(childStageId -> this.buildStageInfo((StageId)childStageId, stageInfos)).collect(ImmutableList.toImmutableList());
        if (childStages.isEmpty()) {
            return parent;
        }
        return new StageInfo(parent.getStageId(), parent.getState(), parent.getSelf(), parent.getPlan(), parent.getTypes(), parent.getStageStats(), parent.getTasks(), childStages, parent.getFailureCause());
    }

    public long getTotalMemoryReservation() {
        return this.stages.values().stream().mapToLong(SqlStageExecution::getMemoryReservation).sum();
    }

    public Duration getTotalCpuTime() {
        long millis = this.stages.values().stream().mapToLong(stage -> stage.getTotalCpuTime().toMillis()).sum();
        return new Duration((double)millis, TimeUnit.MILLISECONDS);
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.executor.submit(this::schedule);
        }
    }

    private void schedule() {
        RuntimeException closeError;
        try {
            SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});
            Object object = null;
            try {
                HashSet<StageId> completedStages = new HashSet<StageId>();
                ExecutionSchedule executionSchedule = this.executionPolicy.createExecutionSchedule(this.stages.values());
                while (!executionSchedule.isFinished()) {
                    ArrayList blockedStages = new ArrayList();
                    block33: for (SqlStageExecution sqlStageExecution : executionSchedule.getStagesToSchedule()) {
                        sqlStageExecution.beginScheduling();
                        ScheduleResult result = this.stageSchedulers.get(sqlStageExecution.getStageId()).schedule();
                        if (result.isFinished()) {
                            sqlStageExecution.schedulingComplete();
                        } else if (!result.getBlocked().isDone()) {
                            blockedStages.add(result.getBlocked());
                        }
                        this.stageLinkages.get(sqlStageExecution.getStageId()).processScheduleResults(sqlStageExecution.getState(), result.getNewTasks());
                        this.schedulerStats.getSplitsScheduledPerIteration().add((long)result.getSplitsScheduled());
                        if (!result.getBlockedReason().isPresent()) continue;
                        switch (result.getBlockedReason().get()) {
                            case WRITER_SCALING: {
                                continue block33;
                            }
                            case WAITING_FOR_SOURCE: {
                                this.schedulerStats.getWaitingForSource().update(1L);
                                continue block33;
                            }
                            case SPLIT_QUEUES_FULL: {
                                this.schedulerStats.getSplitQueuesFull().update(1L);
                                continue block33;
                            }
                            case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE: 
                            case NO_ACTIVE_DRIVER_GROUP: {
                                continue block33;
                            }
                        }
                        throw new UnsupportedOperationException("Unknown blocked reason: " + (Object)((Object)result.getBlockedReason().get()));
                    }
                    for (SqlStageExecution sqlStageExecution : this.stages.values()) {
                        if (completedStages.contains(sqlStageExecution.getStageId()) || !sqlStageExecution.getState().isDone()) continue;
                        this.stageLinkages.get(sqlStageExecution.getStageId()).processScheduleResults(sqlStageExecution.getState(), (Set<RemoteTask>)ImmutableSet.of());
                        completedStages.add(sqlStageExecution.getStageId());
                    }
                    if (blockedStages.isEmpty()) continue;
                    Throwable throwable = null;
                    try (TimeStat.BlockTimer timer = this.schedulerStats.getSleepTime().time();){
                        MoreFutures.tryGetFutureValue((Future)MoreFutures.whenAnyComplete(blockedStages), (int)1, (TimeUnit)TimeUnit.SECONDS);
                    }
                    catch (Throwable throwable2) {
                        Throwable throwable3 = throwable2;
                        throw throwable2;
                    }
                    for (ListenableFuture listenableFuture : blockedStages) {
                        listenableFuture.cancel(true);
                    }
                }
                for (SqlStageExecution stage : this.stages.values()) {
                    StageState stageState = stage.getState();
                    if (stageState == StageState.SCHEDULED || stageState == StageState.RUNNING || stageState.isDone()) continue;
                    throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Scheduling is complete, but stage %s is in state %s", new Object[]{stage.getStageId(), stageState}));
                }
            }
            catch (Throwable completedStages) {
                object = completedStages;
                throw completedStages;
            }
            finally {
                if (ignored != null) {
                    if (object != null) {
                        try {
                            ignored.close();
                        }
                        catch (Throwable completedStages) {
                            ((Throwable)object).addSuppressed(completedStages);
                        }
                    } else {
                        ignored.close();
                    }
                }
            }
            closeError = new RuntimeException();
        }
        catch (Throwable t) {
            try {
                this.queryStateMachine.transitionToFailed(t);
                throw Throwables.propagate((Throwable)t);
            }
            catch (Throwable throwable) {
                RuntimeException closeError2 = new RuntimeException();
                for (StageScheduler scheduler : this.stageSchedulers.values()) {
                    try {
                        scheduler.close();
                    }
                    catch (Throwable t2) {
                        this.queryStateMachine.transitionToFailed(t2);
                        if (closeError2 == t2) continue;
                        closeError2.addSuppressed(t2);
                    }
                }
                if (closeError2.getSuppressed().length > 0) {
                    throw closeError2;
                }
                throw throwable;
            }
        }
        for (StageScheduler scheduler : this.stageSchedulers.values()) {
            try {
                scheduler.close();
            }
            catch (Throwable t) {
                this.queryStateMachine.transitionToFailed(t);
                if (closeError == t) continue;
                closeError.addSuppressed(t);
            }
        }
        if (closeError.getSuppressed().length > 0) {
            throw closeError;
        }
    }

    public void cancelStage(StageId stageId) {
        try (SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});){
            SqlStageExecution sqlStageExecution = this.stages.get(stageId);
            SqlStageExecution stage = Objects.requireNonNull(sqlStageExecution, () -> String.format("Stage %s does not exist", stageId));
            stage.cancel();
        }
    }

    public void abort() {
        try (SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});){
            this.stages.values().stream().forEach(SqlStageExecution::abort);
        }
    }

    private static ListenableFuture<?> whenAllStages(Collection<SqlStageExecution> stages, Predicate<StageState> predicate) {
        Preconditions.checkArgument((!stages.isEmpty() ? 1 : 0) != 0, (Object)"stages is empty");
        Set stageIds = Sets.newConcurrentHashSet((Iterable)stages.stream().map(SqlStageExecution::getStageId).collect(Collectors.toSet()));
        SettableFuture future = SettableFuture.create();
        for (SqlStageExecution stage : stages) {
            stage.addStateChangeListener(state -> {
                if (predicate.test((StageState)((Object)state)) && stageIds.remove(stage.getStageId()) && stageIds.isEmpty()) {
                    future.set(null);
                }
            });
        }
        return future;
    }

    private static /* synthetic */ Collection lambda$createStages$8(Set childStages) {
        return childStages.stream().map(SqlStageExecution::getAllTasks).flatMap(Collection::stream).map(RemoteTask::getTaskStatus).collect(Collectors.toList());
    }

    private static /* synthetic */ void lambda$createStages$7(Set childStages, StageState newState) {
        if (newState.isDone()) {
            childStages.forEach(SqlStageExecution::cancel);
        }
    }

    private static class StageLinkage {
        private final PlanFragmentId currentStageFragmentId;
        private final ExchangeLocationsConsumer parent;
        private final Set<OutputBufferManager> childOutputBufferManagers;
        private final Set<StageId> childStageIds;

        public StageLinkage(PlanFragmentId fragmentId, ExchangeLocationsConsumer parent, Set<SqlStageExecution> children) {
            this.currentStageFragmentId = fragmentId;
            this.parent = parent;
            this.childOutputBufferManagers = (Set)children.stream().map(childStage -> {
                PartitioningHandle partitioningHandle = childStage.getFragment().getPartitioningScheme().getPartitioning().getHandle();
                if (partitioningHandle.equals(SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION)) {
                    return new BroadcastOutputBufferManager(childStage::setOutputBuffers);
                }
                if (partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
                    return new ScaledOutputBufferManager(childStage::setOutputBuffers);
                }
                int partitionCount = Ints.max((int[])childStage.getFragment().getPartitioningScheme().getBucketToPartition().get()) + 1;
                return new PartitionedOutputBufferManager(partitioningHandle, partitionCount, childStage::setOutputBuffers);
            }).collect(ImmutableSet.toImmutableSet());
            this.childStageIds = (Set)children.stream().map(SqlStageExecution::getStageId).collect(ImmutableSet.toImmutableSet());
        }

        public Set<StageId> getChildStageIds() {
            return this.childStageIds;
        }

        public void processScheduleResults(StageState newState, Set<RemoteTask> newTasks) {
            boolean noMoreTasks = false;
            switch (newState) {
                case PLANNED: 
                case SCHEDULING: {
                    break;
                }
                case SCHEDULING_SPLITS: 
                case SCHEDULED: 
                case RUNNING: 
                case FINISHED: 
                case CANCELED: {
                    noMoreTasks = true;
                }
            }
            this.parent.addExchangeLocations(this.currentStageFragmentId, newTasks, noMoreTasks);
            if (!this.childOutputBufferManagers.isEmpty()) {
                List newOutputBuffers = (List)newTasks.stream().map(task -> new OutputBuffers.OutputBufferId(task.getTaskId().getId())).collect(ImmutableList.toImmutableList());
                for (OutputBufferManager child : this.childOutputBufferManagers) {
                    child.addOutputBuffers(newOutputBuffers, noMoreTasks);
                }
            }
        }
    }

    private static interface ExchangeLocationsConsumer {
        public void addExchangeLocations(PlanFragmentId var1, Set<RemoteTask> var2, boolean var3);
    }
}

