/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.execution.BasicStageStats;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStateMachine;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.RemoteTaskFactory;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.StageState;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.scheduler.BroadcastOutputBufferManager;
import com.facebook.presto.execution.scheduler.BucketNodeMap;
import com.facebook.presto.execution.scheduler.DynamicSplitPlacementPolicy;
import com.facebook.presto.execution.scheduler.ExecutionPolicy;
import com.facebook.presto.execution.scheduler.ExecutionSchedule;
import com.facebook.presto.execution.scheduler.FixedBucketNodeMap;
import com.facebook.presto.execution.scheduler.FixedCountScheduler;
import com.facebook.presto.execution.scheduler.FixedSourcePartitionedScheduler;
import com.facebook.presto.execution.scheduler.NodeScheduler;
import com.facebook.presto.execution.scheduler.NodeSelector;
import com.facebook.presto.execution.scheduler.OutputBufferManager;
import com.facebook.presto.execution.scheduler.PartitionedOutputBufferManager;
import com.facebook.presto.execution.scheduler.ScaledOutputBufferManager;
import com.facebook.presto.execution.scheduler.ScaledWriterScheduler;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.execution.scheduler.SourcePartitionedScheduler;
import com.facebook.presto.execution.scheduler.SplitSchedulerStats;
import com.facebook.presto.execution.scheduler.StageScheduler;
import com.facebook.presto.failureDetector.FailureDetector;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.sql.planner.NodePartitionMap;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.PartitioningHandle;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.SplitSourceFactory;
import com.facebook.presto.sql.planner.SubPlan;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.util.Failures;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.graph.Traverser;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.SetThreadName;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.stats.TimeStat;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class SqlQueryScheduler {
    private final QueryStateMachine queryStateMachine;
    private final ExecutionPolicy executionPolicy;
    private final SubPlan plan;
    private final StreamingPlanSection sectionedPlan;
    private final Map<StageId, SqlStageExecution> stages;
    private final ExecutorService executor;
    private final StageId rootStageId;
    private final Map<StageId, StageScheduler> stageSchedulers;
    private final Map<StageId, StageLinkage> stageLinkages;
    private final SplitSchedulerStats schedulerStats;
    private final boolean summarizeTaskInfo;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean scheduling = new AtomicBoolean();
    private final int maxConcurrentMaterializations;

    public static SqlQueryScheduler createSqlQueryScheduler(QueryStateMachine queryStateMachine, LocationFactory locationFactory, SubPlan plan, NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, Session session, boolean summarizeTaskInfo, int splitBatchSize, ExecutorService queryExecutor, ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, OutputBuffers rootOutputBuffers, NodeTaskMap nodeTaskMap, ExecutionPolicy executionPolicy, SplitSchedulerStats schedulerStats) {
        SqlQueryScheduler sqlQueryScheduler = new SqlQueryScheduler(queryStateMachine, locationFactory, plan, nodePartitioningManager, nodeScheduler, remoteTaskFactory, splitSourceFactory, session, summarizeTaskInfo, splitBatchSize, queryExecutor, schedulerExecutor, failureDetector, rootOutputBuffers, nodeTaskMap, executionPolicy, schedulerStats);
        sqlQueryScheduler.initialize();
        return sqlQueryScheduler;
    }

    private SqlQueryScheduler(QueryStateMachine queryStateMachine, LocationFactory locationFactory, SubPlan plan, NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, Session session, boolean summarizeTaskInfo, int splitBatchSize, ExecutorService queryExecutor, ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, OutputBuffers rootOutputBuffers, NodeTaskMap nodeTaskMap, ExecutionPolicy executionPolicy, SplitSchedulerStats schedulerStats) {
        this.queryStateMachine = Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
        this.plan = Objects.requireNonNull(plan, "plan is null");
        this.executionPolicy = Objects.requireNonNull(executionPolicy, "schedulerPolicyFactory is null");
        this.schedulerStats = Objects.requireNonNull(schedulerStats, "schedulerStats is null");
        this.summarizeTaskInfo = summarizeTaskInfo;
        ImmutableMap.Builder stageSchedulers = ImmutableMap.builder();
        ImmutableMap.Builder stageLinkages = ImmutableMap.builder();
        OutputBuffers.OutputBufferId rootBufferId = (OutputBuffers.OutputBufferId)Iterables.getOnlyElement(rootOutputBuffers.getBuffers().keySet());
        this.sectionedPlan = SqlQueryScheduler.extractStreamingSections(plan);
        List<SqlStageExecution> stages = this.createStages((fragmentId, tasks, noMoreExchangeLocations) -> SqlQueryScheduler.updateQueryOutputLocations(queryStateMachine, rootBufferId, tasks, noMoreExchangeLocations), locationFactory, this.sectionedPlan, Optional.of(new int[1]), rootOutputBuffers, nodeScheduler, remoteTaskFactory, splitSourceFactory, session, splitBatchSize, nodePartitioningManager, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, (ImmutableMap.Builder<StageId, StageScheduler>)stageSchedulers, (ImmutableMap.Builder<StageId, StageLinkage>)stageLinkages);
        this.rootStageId = stages.get(0).getStageId();
        this.stages = (Map)stages.stream().collect(ImmutableMap.toImmutableMap(SqlStageExecution::getStageId, Function.identity()));
        this.stageSchedulers = stageSchedulers.build();
        this.stageLinkages = stageLinkages.build();
        this.executor = queryExecutor;
        this.maxConcurrentMaterializations = SystemSessionProperties.getMaxConcurrentMaterializations(session);
    }

    private void initialize() {
        SqlStageExecution rootStage = this.stages.get(this.rootStageId);
        rootStage.addStateChangeListener(state -> {
            if (state == StageState.FINISHED) {
                this.queryStateMachine.transitionToFinishing();
            } else if (state == StageState.CANCELED) {
                this.queryStateMachine.transitionToCanceled();
            }
        });
        for (SqlStageExecution stage : this.stages.values()) {
            stage.addStateChangeListener(state -> {
                if (this.queryStateMachine.isDone()) {
                    return;
                }
                if (state == StageState.FAILED) {
                    this.queryStateMachine.transitionToFailed(stage.getStageInfo().getFailureCause().get().toException());
                } else if (state == StageState.ABORTED) {
                    this.queryStateMachine.transitionToFailed((Throwable)new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Query stage was aborted"));
                } else if (state == StageState.FINISHED) {
                    this.startScheduling();
                } else if (this.queryStateMachine.getQueryState() == QueryState.STARTING && stage.hasTasks()) {
                    this.queryStateMachine.transitionToRunning();
                }
            });
        }
        this.queryStateMachine.addStateChangeListener(newState -> {
            if (newState.isDone()) {
                this.queryStateMachine.updateQueryInfo(Optional.of(this.getStageInfo()));
            }
        });
        for (SqlStageExecution stage : this.stages.values()) {
            stage.addFinalStageInfoListener(status -> this.queryStateMachine.updateQueryInfo(Optional.of(this.getStageInfo())));
        }
    }

    private static void updateQueryOutputLocations(QueryStateMachine queryStateMachine, OutputBuffers.OutputBufferId rootBufferId, Set<RemoteTask> tasks, boolean noMoreExchangeLocations) {
        Map bufferLocations = (Map)tasks.stream().collect(ImmutableMap.toImmutableMap(task -> SqlQueryScheduler.getBufferLocation(task, rootBufferId), RemoteTask::getTaskId));
        queryStateMachine.updateOutputLocations(bufferLocations, noMoreExchangeLocations);
    }

    private static URI getBufferLocation(RemoteTask remoteTask, OutputBuffers.OutputBufferId rootBufferId) {
        URI location = remoteTask.getTaskStatus().getSelf();
        return HttpUriBuilder.uriBuilderFrom((URI)location).appendPath("results").appendPath(rootBufferId.toString()).build();
    }

    private List<SqlStageExecution> createStages(ExchangeLocationsConsumer locationsConsumer, LocationFactory locationFactory, StreamingPlanSection section, Optional<int[]> bucketToPartition, OutputBuffers outputBuffers, NodeScheduler nodeScheduler, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, Session session, int splitBatchSize, NodePartitioningManager nodePartitioningManager, ExecutorService queryExecutor, ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, NodeTaskMap nodeTaskMap, ImmutableMap.Builder<StageId, StageScheduler> stageSchedulers, ImmutableMap.Builder<StageId, StageLinkage> stageLinkages) {
        ImmutableList.Builder stages = ImmutableList.builder();
        HashMap partitioningCache = new HashMap();
        List<SqlStageExecution> sectionStages = this.createStreamingLinkedStages(locationsConsumer, locationFactory, section.getPlan().withBucketToPartition(bucketToPartition), nodeScheduler, remoteTaskFactory, splitSourceFactory, session, splitBatchSize, partitioningHandle -> partitioningCache.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap(session, (PartitioningHandle)handle)), nodePartitioningManager, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, stageSchedulers, stageLinkages, Optional.empty());
        sectionStages.get(0).setOutputBuffers(outputBuffers);
        stages.addAll(sectionStages);
        for (StreamingPlanSection childSection : section.getChildren()) {
            stages.addAll(this.createStages(SqlQueryScheduler.discardingLocationConsumer(), locationFactory, childSection, Optional.empty(), OutputBuffers.createDiscardingOutputBuffers(), nodeScheduler, remoteTaskFactory, splitSourceFactory, session, splitBatchSize, nodePartitioningManager, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, stageSchedulers, stageLinkages));
        }
        return stages.build();
    }

    private List<SqlStageExecution> createStreamingLinkedStages(ExchangeLocationsConsumer parent, LocationFactory locationFactory, StreamingSubPlan plan, NodeScheduler nodeScheduler, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, Session session, int splitBatchSize, Function<PartitioningHandle, NodePartitionMap> partitioningCache, NodePartitioningManager nodePartitioningManager, ExecutorService queryExecutor, ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, NodeTaskMap nodeTaskMap, ImmutableMap.Builder<StageId, StageScheduler> stageSchedulers, ImmutableMap.Builder<StageId, StageLinkage> stageLinkages, Optional<SqlStageExecution> parentStageExecution) {
        Optional<Object> bucketToPartition;
        Map<PlanNodeId, SplitSource> splitSources;
        ImmutableList.Builder stages = ImmutableList.builder();
        PlanFragmentId fragmentId = plan.getFragment().getId();
        StageId stageId = this.getStageId(fragmentId);
        SqlStageExecution stage = SqlStageExecution.createSqlStageExecution(stageId, locationFactory.createStageLocation(stageId), plan.getFragment(), remoteTaskFactory, session, this.summarizeTaskInfo, nodeTaskMap, queryExecutor, failureDetector, this.schedulerStats);
        stages.add((Object)stage);
        PartitioningHandle partitioningHandle = plan.getFragment().getPartitioning();
        if (partitioningHandle.equals(SystemPartitioningHandle.SOURCE_DISTRIBUTION)) {
            splitSources = splitSourceFactory.createSplitSources(plan.getFragment(), session);
            Map.Entry entry = (Map.Entry)Iterables.getOnlyElement(splitSources.entrySet());
            PlanNodeId planNodeId = (PlanNodeId)entry.getKey();
            SplitSource splitSource = (SplitSource)entry.getValue();
            ConnectorId connectorId = splitSource.getConnectorId();
            if (ConnectorId.isInternalSystemConnector((ConnectorId)connectorId)) {
                connectorId = null;
            }
            NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId);
            DynamicSplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stage::getAllTasks);
            Preconditions.checkArgument((!plan.getFragment().getStageExecutionDescriptor().isStageGroupedExecution() ? 1 : 0) != 0);
            stageSchedulers.put((Object)stageId, (Object)SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler(stage, planNodeId, splitSource, placementPolicy, splitBatchSize));
            bucketToPartition = Optional.of(new int[1]);
        } else if (partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
            bucketToPartition = Optional.of(new int[1]);
        } else {
            splitSources = splitSourceFactory.createSplitSources(plan.getFragment(), session);
            if (!splitSources.isEmpty()) {
                ArrayList<InternalNode> stageNodeList;
                BucketNodeMap bucketNodeMap;
                Object connectorPartitionHandles;
                List<PlanNodeId> schedulingOrder = plan.getFragment().getTableScanSchedulingOrder();
                ConnectorId connectorId = partitioningHandle.getConnectorId().orElseThrow(IllegalStateException::new);
                boolean groupedExecutionForStage = plan.getFragment().getStageExecutionDescriptor().isStageGroupedExecution();
                if (groupedExecutionForStage) {
                    connectorPartitionHandles = nodePartitioningManager.listPartitionHandles(session, partitioningHandle);
                    Preconditions.checkState((!ImmutableList.of((Object)NotPartitionedPartitionHandle.NOT_PARTITIONED).equals(connectorPartitionHandles) ? 1 : 0) != 0);
                } else {
                    connectorPartitionHandles = ImmutableList.of((Object)NotPartitionedPartitionHandle.NOT_PARTITIONED);
                }
                if (plan.getFragment().getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == ExchangeNode.Type.REPLICATE)) {
                    boolean dynamicLifespanSchedule = plan.getFragment().getStageExecutionDescriptor().isDynamicLifespanSchedule();
                    bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, dynamicLifespanSchedule);
                    Verify.verify((bucketNodeMap.isDynamic() == dynamicLifespanSchedule ? 1 : 0) != 0);
                    stageNodeList = !bucketNodeMap.isDynamic() ? (ArrayList<InternalNode>)((FixedBucketNodeMap)bucketNodeMap).getBucketToNode().stream().distinct().collect(ImmutableList.toImmutableList()) : new ArrayList<InternalNode>(nodeScheduler.createNodeSelector(connectorId).selectRandomNodes(SystemSessionProperties.getMaxTasksPerStage(session)));
                    bucketToPartition = Optional.empty();
                } else {
                    Verify.verify((!plan.getFragment().getStageExecutionDescriptor().isDynamicLifespanSchedule() ? 1 : 0) != 0);
                    NodePartitionMap nodePartitionMap = partitioningCache.apply(plan.getFragment().getPartitioning());
                    if (groupedExecutionForStage) {
                        Preconditions.checkState((connectorPartitionHandles.size() == nodePartitionMap.getBucketToPartition().length ? 1 : 0) != 0);
                    }
                    stageNodeList = nodePartitionMap.getPartitionToNode();
                    bucketNodeMap = nodePartitionMap.asBucketNodeMap();
                    bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
                }
                FixedSourcePartitionedScheduler stageScheduler = new FixedSourcePartitionedScheduler(stage, splitSources, plan.getFragment().getStageExecutionDescriptor(), schedulingOrder, stageNodeList, bucketNodeMap, splitBatchSize, SystemSessionProperties.getConcurrentLifespansPerNode(session), nodeScheduler.createNodeSelector(connectorId), (List<ConnectorPartitionHandle>)connectorPartitionHandles);
                stageSchedulers.put((Object)stageId, (Object)stageScheduler);
                if (plan.getFragment().getStageExecutionDescriptor().isRecoverableGroupedExecution()) {
                    stage.registerStageTaskRecoveryCallback(taskId -> {
                        Preconditions.checkArgument((boolean)taskId.getStageId().equals(stageId), (Object)"The task did not execute this stage");
                        Preconditions.checkArgument((boolean)parentStageExecution.isPresent(), (Object)"Parent stage execution must exist");
                        Preconditions.checkArgument((((SqlStageExecution)parentStageExecution.get()).getAllTasks().size() == 1 ? 1 : 0) != 0, (Object)"Parent stage should only have one task for recoverable grouped execution");
                        ((SqlStageExecution)parentStageExecution.get()).removeRemoteSourceIfSingleTaskStage(taskId);
                        stageScheduler.recover(taskId);
                    });
                }
            } else {
                NodePartitionMap nodePartitionMap = partitioningCache.apply(plan.getFragment().getPartitioning());
                List<InternalNode> partitionToNode = nodePartitionMap.getPartitionToNode();
                Failures.checkCondition(!partitionToNode.isEmpty(), (ErrorCodeSupplier)StandardErrorCode.NO_NODES_AVAILABLE, "No worker nodes available", new Object[0]);
                stageSchedulers.put((Object)stageId, (Object)new FixedCountScheduler(stage, partitionToNode));
                bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
            }
        }
        ImmutableSet.Builder childStagesBuilder = ImmutableSet.builder();
        for (StreamingSubPlan stagePlan : plan.getChildren()) {
            List<SqlStageExecution> subTree = this.createStreamingLinkedStages(stage::addExchangeLocations, locationFactory, stagePlan.withBucketToPartition(bucketToPartition), nodeScheduler, remoteTaskFactory, splitSourceFactory, session, splitBatchSize, partitioningCache, nodePartitioningManager, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, stageSchedulers, stageLinkages, Optional.of(stage));
            stages.addAll(subTree);
            SqlStageExecution childStage = subTree.get(0);
            childStagesBuilder.add((Object)childStage);
        }
        ImmutableSet childStages = childStagesBuilder.build();
        stage.addStateChangeListener(arg_0 -> SqlQueryScheduler.lambda$createStreamingLinkedStages$10((Set)childStages, arg_0));
        stageLinkages.put((Object)stageId, (Object)new StageLinkage(fragmentId, parent, (Set<SqlStageExecution>)childStages));
        if (partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
            Supplier<Collection<TaskStatus>> sourceTasksProvider = () -> SqlQueryScheduler.lambda$createStreamingLinkedStages$11((Set)childStages);
            Supplier<Collection<TaskStatus>> writerTasksProvider = () -> stage.getAllTasks().stream().map(RemoteTask::getTaskStatus).collect(Collectors.toList());
            ScaledWriterScheduler scheduler = new ScaledWriterScheduler(stage, sourceTasksProvider, writerTasksProvider, nodeScheduler.createNodeSelector(null), schedulerExecutor, SystemSessionProperties.getWriterMinSize(session));
            SqlQueryScheduler.whenAllStages((Collection<SqlStageExecution>)childStages, StageState::isDone).addListener(scheduler::finish, MoreExecutors.directExecutor());
            stageSchedulers.put((Object)stageId, (Object)scheduler);
        }
        return stages.build();
    }

    public BasicStageStats getBasicStageStats() {
        List stageStats = (List)this.stages.values().stream().map(SqlStageExecution::getBasicStageStats).collect(ImmutableList.toImmutableList());
        return BasicStageStats.aggregateBasicStageStats(stageStats);
    }

    public StageInfo getStageInfo() {
        Map stageInfos = (Map)this.stages.values().stream().map(SqlStageExecution::getStageInfo).collect(ImmutableMap.toImmutableMap(StageInfo::getStageId, Function.identity()));
        return this.buildStageInfo(this.plan, stageInfos);
    }

    private StageInfo buildStageInfo(SubPlan subPlan, Map<StageId, StageInfo> stageInfos) {
        StageInfo stageInfo = stageInfos.get(this.getStageId(subPlan.getFragment().getId()));
        Preconditions.checkArgument((stageInfo != null ? 1 : 0) != 0, (String)"No stageInfo for %s", (Object)stageInfo);
        if (subPlan.getChildren().isEmpty()) {
            return stageInfo;
        }
        return new StageInfo(stageInfo.getStageId(), stageInfo.getState(), stageInfo.getSelf(), stageInfo.getPlan(), stageInfo.getTypes(), stageInfo.getStageStats(), stageInfo.getTasks(), (List)subPlan.getChildren().stream().map(plan -> this.buildStageInfo((SubPlan)plan, stageInfos)).collect(ImmutableList.toImmutableList()), stageInfo.getFailureCause());
    }

    public long getUserMemoryReservation() {
        return this.stages.values().stream().mapToLong(SqlStageExecution::getUserMemoryReservation).sum();
    }

    public long getTotalMemoryReservation() {
        return this.stages.values().stream().mapToLong(SqlStageExecution::getTotalMemoryReservation).sum();
    }

    public Duration getTotalCpuTime() {
        long millis = this.stages.values().stream().mapToLong(stage -> stage.getTotalCpuTime().toMillis()).sum();
        return new Duration((double)millis, TimeUnit.MILLISECONDS);
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.startScheduling();
        }
    }

    private void startScheduling() {
        Objects.requireNonNull(this.stages);
        if (this.scheduling.get()) {
            return;
        }
        this.executor.submit(this::schedule);
    }

    private void schedule() {
        if (!this.scheduling.compareAndSet(false, true)) {
            return;
        }
        ArrayList scheduledStages = new ArrayList();
        try {
            SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});
            Object object = null;
            try {
                HashSet<StageId> completedStages = new HashSet<StageId>();
                LinkedList<ExecutionSchedule> sectionExecutionSchedules = new LinkedList<ExecutionSchedule>();
                block34: while (!Thread.currentThread().isInterrupted()) {
                    sectionExecutionSchedules.removeIf(ExecutionSchedule::isFinished);
                    List<StreamingPlanSection> sectionsReadyForExecution = this.getSectionsReadyForExecution();
                    if (sectionsReadyForExecution.isEmpty() && sectionExecutionSchedules.isEmpty()) break;
                    List<List<SqlStageExecution>> sectionStageExecutions = this.getStageExecutions(sectionsReadyForExecution);
                    sectionStageExecutions.forEach(scheduledStages::addAll);
                    sectionStageExecutions.stream().map(this.executionPolicy::createExecutionSchedule).forEach(sectionExecutionSchedules::add);
                    while (sectionExecutionSchedules.stream().noneMatch(ExecutionSchedule::isFinished)) {
                        Object stage222;
                        ArrayList blockedStages = new ArrayList();
                        List stagesToSchedule = (List)sectionExecutionSchedules.stream().flatMap(schedule -> schedule.getStagesToSchedule().stream()).collect(ImmutableList.toImmutableList());
                        block36: for (Object stage222 : stagesToSchedule) {
                            ((SqlStageExecution)stage222).beginScheduling();
                            ScheduleResult scheduleResult = this.stageSchedulers.get(((SqlStageExecution)stage222).getStageId()).schedule();
                            if (scheduleResult.isFinished()) {
                                ((SqlStageExecution)stage222).schedulingComplete();
                            } else if (!scheduleResult.getBlocked().isDone()) {
                                blockedStages.add(scheduleResult.getBlocked());
                            }
                            this.stageLinkages.get(((SqlStageExecution)stage222).getStageId()).processScheduleResults(((SqlStageExecution)stage222).getState(), scheduleResult.getNewTasks());
                            this.schedulerStats.getSplitsScheduledPerIteration().add((long)scheduleResult.getSplitsScheduled());
                            if (!scheduleResult.getBlockedReason().isPresent()) continue;
                            switch (scheduleResult.getBlockedReason().get()) {
                                case WRITER_SCALING: {
                                    continue block36;
                                }
                                case WAITING_FOR_SOURCE: {
                                    this.schedulerStats.getWaitingForSource().update(1L);
                                    continue block36;
                                }
                                case SPLIT_QUEUES_FULL: {
                                    this.schedulerStats.getSplitQueuesFull().update(1L);
                                    continue block36;
                                }
                                case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE: {
                                    this.schedulerStats.getMixedSplitQueuesFullAndWaitingForSource().update(1L);
                                    continue block36;
                                }
                                case NO_ACTIVE_DRIVER_GROUP: {
                                    this.schedulerStats.getNoActiveDriverGroup().update(1L);
                                    continue block36;
                                }
                            }
                            throw new UnsupportedOperationException("Unknown blocked reason: " + (Object)((Object)scheduleResult.getBlockedReason().get()));
                        }
                        boolean stageFinishedExecution = false;
                        stage222 = scheduledStages.iterator();
                        while (stage222.hasNext()) {
                            SqlStageExecution sqlStageExecution = (SqlStageExecution)stage222.next();
                            if (completedStages.contains(sqlStageExecution.getStageId()) || !sqlStageExecution.getState().isDone()) continue;
                            this.stageLinkages.get(sqlStageExecution.getStageId()).processScheduleResults(sqlStageExecution.getState(), (Set<RemoteTask>)ImmutableSet.of());
                            completedStages.add(sqlStageExecution.getStageId());
                            stageFinishedExecution = true;
                        }
                        if (stageFinishedExecution) continue block34;
                        if (blockedStages.isEmpty()) continue;
                        Throwable throwable = null;
                        try (TimeStat.BlockTimer timer = this.schedulerStats.getSleepTime().time();){
                            MoreFutures.tryGetFutureValue((Future)MoreFutures.whenAnyComplete(blockedStages), (int)1, (TimeUnit)TimeUnit.SECONDS);
                        }
                        catch (Throwable throwable2) {
                            Throwable throwable3 = throwable2;
                            throw throwable2;
                        }
                        for (ListenableFuture listenableFuture : blockedStages) {
                            listenableFuture.cancel(true);
                        }
                    }
                }
                for (SqlStageExecution stage : scheduledStages) {
                    StageState state = stage.getState();
                    if (state == StageState.SCHEDULED || state == StageState.RUNNING || state.isDone()) continue;
                    throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Scheduling is complete, but stage %s is in state %s", new Object[]{stage.getStageId(), state}));
                }
                this.scheduling.set(false);
                if (!this.getSectionsReadyForExecution().isEmpty()) {
                    this.startScheduling();
                }
            }
            catch (Throwable completedStages) {
                object = completedStages;
                throw completedStages;
            }
            finally {
                if (ignored != null) {
                    if (object != null) {
                        try {
                            ignored.close();
                        }
                        catch (Throwable completedStages) {
                            ((Throwable)object).addSuppressed(completedStages);
                        }
                    } else {
                        ignored.close();
                    }
                }
            }
        }
        catch (Throwable t) {
            this.scheduling.set(false);
            this.queryStateMachine.transitionToFailed(t);
            throw t;
        }
        finally {
            RuntimeException closeError = new RuntimeException();
            for (SqlStageExecution stage : scheduledStages) {
                try {
                    this.stageSchedulers.get(stage.getStageId()).close();
                }
                catch (Throwable t) {
                    this.queryStateMachine.transitionToFailed(t);
                    if (closeError == t) continue;
                    closeError.addSuppressed(t);
                }
            }
            if (closeError.getSuppressed().length > 0) {
                throw closeError;
            }
        }
    }

    private List<StreamingPlanSection> getSectionsReadyForExecution() {
        long runningPlanSections = Streams.stream((Iterable)Traverser.forTree(StreamingPlanSection::getChildren).depthFirstPreOrder((Object)this.sectionedPlan)).map(section -> this.getStageExecution(section.getPlan().getFragment().getId()).getState()).filter(state -> !state.isDone() && state != StageState.PLANNED).count();
        return (List)Streams.stream((Iterable)Traverser.forTree(StreamingPlanSection::getChildren).depthFirstPreOrder((Object)this.sectionedPlan)).filter(this::isReadyForExecution).limit((long)this.maxConcurrentMaterializations - runningPlanSections).collect(ImmutableList.toImmutableList());
    }

    private boolean isReadyForExecution(StreamingPlanSection section) {
        SqlStageExecution stage = this.getStageExecution(section.getPlan().getFragment().getId());
        if (stage.getState() != StageState.PLANNED) {
            return false;
        }
        for (StreamingPlanSection child : section.getChildren()) {
            SqlStageExecution childRootStage = this.getStageExecution(child.getPlan().getFragment().getId());
            if (childRootStage.getState() == StageState.FINISHED) continue;
            return false;
        }
        return true;
    }

    private List<List<SqlStageExecution>> getStageExecutions(List<StreamingPlanSection> sections) {
        return (List)sections.stream().map(section -> (ImmutableList)Streams.stream((Iterable)Traverser.forTree(StreamingSubPlan::getChildren).depthFirstPreOrder((Object)section.getPlan())).collect(ImmutableList.toImmutableList())).map(plans -> (ImmutableList)plans.stream().map(StreamingSubPlan::getFragment).map(PlanFragment::getId).map(this::getStageExecution).collect(ImmutableList.toImmutableList())).collect(ImmutableList.toImmutableList());
    }

    private SqlStageExecution getStageExecution(PlanFragmentId planFragmentId) {
        return this.stages.get(this.getStageId(planFragmentId));
    }

    private StageId getStageId(PlanFragmentId fragmentId) {
        return new StageId(this.queryStateMachine.getQueryId(), fragmentId.getId());
    }

    public void cancelStage(StageId stageId) {
        try (SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});){
            SqlStageExecution sqlStageExecution = this.stages.get(stageId);
            SqlStageExecution stage = Objects.requireNonNull(sqlStageExecution, () -> String.format("Stage %s does not exist", stageId));
            stage.cancel();
        }
    }

    public void abort() {
        try (SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});){
            this.stages.values().forEach(SqlStageExecution::abort);
        }
    }

    private static ListenableFuture<?> whenAllStages(Collection<SqlStageExecution> stages, Predicate<StageState> predicate) {
        Preconditions.checkArgument((!stages.isEmpty() ? 1 : 0) != 0, (Object)"stages is empty");
        Set stageIds = Sets.newConcurrentHashSet((Iterable)stages.stream().map(SqlStageExecution::getStageId).collect(Collectors.toSet()));
        SettableFuture future = SettableFuture.create();
        for (SqlStageExecution stage : stages) {
            stage.addStateChangeListener(state -> {
                if (predicate.test((StageState)((Object)state)) && stageIds.remove(stage.getStageId()) && stageIds.isEmpty()) {
                    future.set(null);
                }
            });
        }
        return future;
    }

    public static StreamingPlanSection extractStreamingSections(SubPlan subPlan) {
        ImmutableList.Builder materializedExchangeChildren = ImmutableList.builder();
        StreamingSubPlan streamingSection = SqlQueryScheduler.extractStreamingSection(subPlan, (ImmutableList.Builder<SubPlan>)materializedExchangeChildren);
        return new StreamingPlanSection(streamingSection, (List)materializedExchangeChildren.build().stream().map(SqlQueryScheduler::extractStreamingSections).collect(ImmutableList.toImmutableList()));
    }

    private static StreamingSubPlan extractStreamingSection(SubPlan subPlan, ImmutableList.Builder<SubPlan> materializedExchangeChildren) {
        ImmutableList.Builder streamingSources = ImmutableList.builder();
        Set streamingFragmentIds = (Set)subPlan.getFragment().getRemoteSourceNodes().stream().map(RemoteSourceNode::getSourceFragmentIds).flatMap(Collection::stream).collect(ImmutableSet.toImmutableSet());
        for (SubPlan child : subPlan.getChildren()) {
            if (streamingFragmentIds.contains(child.getFragment().getId())) {
                streamingSources.add((Object)SqlQueryScheduler.extractStreamingSection(child, materializedExchangeChildren));
                continue;
            }
            materializedExchangeChildren.add((Object)child);
        }
        return new StreamingSubPlan(subPlan.getFragment(), (List<StreamingSubPlan>)streamingSources.build());
    }

    private static ExchangeLocationsConsumer discardingLocationConsumer() {
        return (fragmentId, tasks, noMoreExchangeLocations) -> {};
    }

    private static /* synthetic */ Collection lambda$createStreamingLinkedStages$11(Set childStages) {
        return childStages.stream().map(SqlStageExecution::getAllTasks).flatMap(Collection::stream).map(RemoteTask::getTaskStatus).collect(Collectors.toList());
    }

    private static /* synthetic */ void lambda$createStreamingLinkedStages$10(Set childStages, StageState newState) {
        if (newState.isDone()) {
            childStages.forEach(SqlStageExecution::cancel);
        }
    }

    private static class StreamingSubPlan {
        private final PlanFragment fragment;
        private final List<StreamingSubPlan> children;

        public StreamingSubPlan(PlanFragment fragment, List<StreamingSubPlan> children) {
            this.fragment = Objects.requireNonNull(fragment, "fragment is null");
            this.children = ImmutableList.copyOf((Collection)Objects.requireNonNull(children, "children is null"));
        }

        public PlanFragment getFragment() {
            return this.fragment;
        }

        public List<StreamingSubPlan> getChildren() {
            return this.children;
        }

        public StreamingSubPlan withBucketToPartition(Optional<int[]> bucketToPartition) {
            return new StreamingSubPlan(this.fragment.withBucketToPartition(bucketToPartition), this.children);
        }
    }

    private static class StreamingPlanSection {
        private final StreamingSubPlan plan;
        private final List<StreamingPlanSection> children;

        public StreamingPlanSection(StreamingSubPlan plan, List<StreamingPlanSection> children) {
            this.plan = Objects.requireNonNull(plan, "plan is null");
            this.children = ImmutableList.copyOf((Collection)Objects.requireNonNull(children, "children is null"));
        }

        public StreamingSubPlan getPlan() {
            return this.plan;
        }

        public List<StreamingPlanSection> getChildren() {
            return this.children;
        }
    }

    private static class StageLinkage {
        private final PlanFragmentId currentStageFragmentId;
        private final ExchangeLocationsConsumer parent;
        private final Set<OutputBufferManager> childOutputBufferManagers;
        private final Set<StageId> childStageIds;

        public StageLinkage(PlanFragmentId fragmentId, ExchangeLocationsConsumer parent, Set<SqlStageExecution> children) {
            this.currentStageFragmentId = fragmentId;
            this.parent = parent;
            this.childOutputBufferManagers = (Set)children.stream().map(childStage -> {
                PartitioningHandle partitioningHandle = childStage.getFragment().getPartitioningScheme().getPartitioning().getHandle();
                if (partitioningHandle.equals(SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION)) {
                    return new BroadcastOutputBufferManager(childStage::setOutputBuffers);
                }
                if (partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
                    return new ScaledOutputBufferManager(childStage::setOutputBuffers);
                }
                int partitionCount = Ints.max((int[])childStage.getFragment().getPartitioningScheme().getBucketToPartition().get()) + 1;
                return new PartitionedOutputBufferManager(partitioningHandle, partitionCount, childStage::setOutputBuffers);
            }).collect(ImmutableSet.toImmutableSet());
            this.childStageIds = (Set)children.stream().map(SqlStageExecution::getStageId).collect(ImmutableSet.toImmutableSet());
        }

        public Set<StageId> getChildStageIds() {
            return this.childStageIds;
        }

        public void processScheduleResults(StageState newState, Set<RemoteTask> newTasks) {
            boolean noMoreTasks = false;
            switch (newState) {
                case PLANNED: 
                case SCHEDULING: {
                    break;
                }
                case FINISHED_TASK_SCHEDULING: 
                case SCHEDULING_SPLITS: 
                case SCHEDULED: 
                case RUNNING: 
                case FINISHED: 
                case CANCELED: {
                    noMoreTasks = true;
                }
            }
            this.parent.addExchangeLocations(this.currentStageFragmentId, newTasks, noMoreTasks);
            if (!this.childOutputBufferManagers.isEmpty()) {
                List newOutputBuffers = (List)newTasks.stream().map(task -> new OutputBuffers.OutputBufferId(task.getTaskId().getId())).collect(ImmutableList.toImmutableList());
                for (OutputBufferManager child : this.childOutputBufferManagers) {
                    child.addOutputBuffers(newOutputBuffers, noMoreTasks);
                }
            }
        }
    }

    private static interface ExchangeLocationsConsumer {
        public void addExchangeLocations(PlanFragmentId var1, Set<RemoteTask> var2, boolean var3);
    }
}

