package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.Node;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.util.ImmutableCollectors;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/* loaded from: input_file:com/facebook/presto/execution/scheduler/SourcePartitionedScheduler.class */
public class SourcePartitionedScheduler implements StageScheduler {
    private final SqlStageExecution stage;
    private final SplitSource splitSource;
    private final SplitPlacementPolicy splitPlacementPolicy;
    private final int splitBatchSize;
    private final PlanNodeId partitionedNode;
    private ListenableFuture<List<Split>> batchFuture;
    private Set<Split> pendingSplits = ImmutableSet.of();

    public SourcePartitionedScheduler(SqlStageExecution sqlStageExecution, PlanNodeId planNodeId, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int i) {
        this.stage = (SqlStageExecution) Objects.requireNonNull(sqlStageExecution, "stage is null");
        this.splitSource = (SplitSource) Objects.requireNonNull(splitSource, "splitSource is null");
        this.splitPlacementPolicy = (SplitPlacementPolicy) Objects.requireNonNull(splitPlacementPolicy, "splitPlacementPolicy is null");
        Preconditions.checkArgument(i > 0, "splitBatchSize must be at least one");
        this.splitBatchSize = i;
        this.partitionedNode = planNodeId;
    }

    @Override // com.facebook.presto.execution.scheduler.StageScheduler
    public synchronized ScheduleResult schedule() {
        if (this.pendingSplits.isEmpty()) {
            if (this.batchFuture == null) {
                if (this.splitSource.isFinished()) {
                    this.splitSource.close();
                    return new ScheduleResult(true, ImmutableSet.of(), 0);
                }
                final long nanoTime = System.nanoTime();
                this.batchFuture = this.splitSource.getNextBatch(this.splitBatchSize);
                Futures.addCallback(this.batchFuture, new FutureCallback<List<Split>>() { // from class: com.facebook.presto.execution.scheduler.SourcePartitionedScheduler.1
                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onSuccess(List<Split> list) {
                        SourcePartitionedScheduler.this.stage.recordGetSplitTime(nanoTime);
                    }

                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onFailure(Throwable th) {
                    }
                });
            }
            if (!this.batchFuture.isDone()) {
                return new ScheduleResult(false, (Iterable<? extends RemoteTask>) ImmutableSet.of(), (ListenableFuture<?>) Futures.nonCancellationPropagating(this.batchFuture), ScheduleResult.BlockedReason.WAITING_FOR_SOURCE, 0);
            }
            this.pendingSplits = ImmutableSet.copyOf((Collection) MoreFutures.getFutureValue(this.batchFuture));
            this.batchFuture = null;
        }
        SplitPlacementResult computeAssignments = this.splitPlacementPolicy.computeAssignments(this.pendingSplits);
        Multimap<Node, Split> assignments = computeAssignments.getAssignments();
        Set<RemoteTask> assignSplits = assignSplits(assignments);
        this.pendingSplits = ImmutableSet.copyOf((Collection) Sets.difference(this.pendingSplits, ImmutableSet.copyOf((Collection) assignments.values())));
        if (!this.pendingSplits.isEmpty()) {
            return new ScheduleResult(false, (Iterable<? extends RemoteTask>) ImmutableSet.builder().addAll((Iterable) assignSplits).addAll((Iterable) finalizeTaskCreationIfNecessary()).build(), computeAssignments.getBlocked(), ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL, assignments.values().size());
        }
        boolean isFinished = this.splitSource.isFinished();
        if (isFinished) {
            this.splitSource.close();
        }
        return new ScheduleResult(isFinished, assignSplits, assignments.values().size());
    }

    @Override // com.facebook.presto.execution.scheduler.StageScheduler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.splitSource.close();
    }

    private Set<RemoteTask> assignSplits(Multimap<Node, Split> multimap) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        for (Map.Entry<Node, Collection<Split>> entry : multimap.asMap().entrySet()) {
            builder.addAll((Iterable) this.stage.scheduleSplits(entry.getKey(), ImmutableMultimap.builder().putAll((ImmutableMultimap.Builder) this.partitionedNode, (Iterable) entry.getValue()).build()));
        }
        return builder.build();
    }

    private Set<RemoteTask> finalizeTaskCreationIfNecessary() {
        if (this.stage.getFragment().isLeaf()) {
            return ImmutableSet.of();
        }
        this.splitPlacementPolicy.lockDownNodes();
        Set<Node> scheduledNodes = this.stage.getScheduledNodes();
        Set<RemoteTask> set = (Set) this.splitPlacementPolicy.allNodes().stream().filter(node -> {
            return !scheduledNodes.contains(node);
        }).flatMap(node2 -> {
            return this.stage.scheduleSplits(node2, ImmutableMultimap.of()).stream();
        }).collect(ImmutableCollectors.toImmutableSet());
        this.stage.transitionToSchedulingSplits();
        return set;
    }
}
