/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.scheduler.NodeSelector;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.execution.scheduler.SourcePartitionedScheduler;
import com.facebook.presto.execution.scheduler.SplitPlacementPolicy;
import com.facebook.presto.execution.scheduler.SplitPlacementResult;
import com.facebook.presto.execution.scheduler.StageScheduler;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.Node;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.sql.planner.NodePartitionMap;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class FixedSourcePartitionedScheduler
implements StageScheduler {
    private static final Logger log = Logger.get(FixedSourcePartitionedScheduler.class);
    private final SqlStageExecution stage;
    private final NodePartitionMap partitioning;
    private final Queue<SourcePartitionedScheduler> sourcePartitionedSchedulers;
    private boolean scheduledTasks;

    public FixedSourcePartitionedScheduler(SqlStageExecution stage, Map<PlanNodeId, SplitSource> splitSources, List<PlanNodeId> schedulingOrder, NodePartitionMap partitioning, int splitBatchSize, NodeSelector nodeSelector) {
        Objects.requireNonNull(stage, "stage is null");
        Objects.requireNonNull(splitSources, "splitSources is null");
        Objects.requireNonNull(partitioning, "partitioning is null");
        this.stage = stage;
        this.partitioning = partitioning;
        Preconditions.checkArgument((boolean)splitSources.keySet().equals(ImmutableSet.copyOf(schedulingOrder)));
        FixedSplitPlacementPolicy splitPlacementPolicy = new FixedSplitPlacementPolicy(nodeSelector, partitioning, stage::getAllTasks);
        this.sourcePartitionedSchedulers = new ArrayDeque<SourcePartitionedScheduler>(schedulingOrder.stream().map(sourceId -> new SourcePartitionedScheduler(stage, (PlanNodeId)sourceId, (SplitSource)splitSources.get(sourceId), splitPlacementPolicy, splitBatchSize)).collect(Collectors.toList()));
    }

    @Override
    public ScheduleResult schedule() {
        Object newTasks = ImmutableList.of();
        if (!this.scheduledTasks) {
            newTasks = (List)this.partitioning.getPartitionToNode().entrySet().stream().map(entry -> this.stage.scheduleTask((Node)entry.getValue(), (Integer)entry.getKey())).collect(ImmutableList.toImmutableList());
            this.scheduledTasks = true;
        }
        ListenableFuture<?> blocked = Futures.immediateFuture(null);
        ScheduleResult.BlockedReason blockedReason = null;
        int splitsScheduled = 0;
        while (!this.sourcePartitionedSchedulers.isEmpty()) {
            ScheduleResult schedule = this.sourcePartitionedSchedulers.peek().schedule();
            splitsScheduled += schedule.getSplitsScheduled();
            blocked = schedule.getBlocked();
            blockedReason = schedule.getBlockedReason().isPresent() ? schedule.getBlockedReason().get() : null;
            if (!blocked.isDone() || !schedule.isFinished()) break;
            this.sourcePartitionedSchedulers.remove().close();
        }
        if (blockedReason != null) {
            return new ScheduleResult(this.sourcePartitionedSchedulers.isEmpty(), (Iterable<? extends RemoteTask>)newTasks, blocked, blockedReason, splitsScheduled);
        }
        Preconditions.checkState((boolean)blocked.isDone(), (Object)"blockedReason not provided when scheduler is blocked");
        return new ScheduleResult(this.sourcePartitionedSchedulers.isEmpty(), (Iterable<? extends RemoteTask>)newTasks, splitsScheduled);
    }

    @Override
    public void close() {
        while (!this.sourcePartitionedSchedulers.isEmpty()) {
            try {
                this.sourcePartitionedSchedulers.remove().close();
            }
            catch (Throwable t) {
                log.warn(t, "Error closing split source");
            }
        }
    }

    private static class FixedSplitPlacementPolicy
    implements SplitPlacementPolicy {
        private final NodeSelector nodeSelector;
        private final NodePartitionMap partitioning;
        private final Supplier<? extends List<RemoteTask>> remoteTasks;

        public FixedSplitPlacementPolicy(NodeSelector nodeSelector, NodePartitionMap partitioning, Supplier<? extends List<RemoteTask>> remoteTasks) {
            this.nodeSelector = nodeSelector;
            this.partitioning = partitioning;
            this.remoteTasks = remoteTasks;
        }

        @Override
        public SplitPlacementResult computeAssignments(Set<Split> splits) {
            return this.nodeSelector.computeAssignments(splits, this.remoteTasks.get(), this.partitioning);
        }

        @Override
        public void lockDownNodes() {
        }

        @Override
        public List<Node> allNodes() {
            return ImmutableList.copyOf(this.partitioning.getPartitionToNode().values());
        }
    }
}

