/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.scheduler.NodeSelector;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.execution.scheduler.SourcePartitionedScheduler;
import com.facebook.presto.execution.scheduler.SplitPlacementPolicy;
import com.facebook.presto.execution.scheduler.SplitPlacementResult;
import com.facebook.presto.execution.scheduler.StageScheduler;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.operator.PipelineExecutionStrategy;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.sql.planner.NodePartitionMap;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

public class FixedSourcePartitionedScheduler
implements StageScheduler {
    private static final Logger log = Logger.get(FixedSourcePartitionedScheduler.class);
    private final SqlStageExecution stage;
    private final NodePartitionMap partitioning;
    private final List<SourcePartitionedScheduler> sourcePartitionedSchedulers;
    private final List<ConnectorPartitionHandle> partitionHandles;
    private boolean scheduledTasks;

    public FixedSourcePartitionedScheduler(SqlStageExecution stage, Map<PlanNodeId, SplitSource> splitSources, PipelineExecutionStrategy pipelineExecutionStrategy, List<PlanNodeId> schedulingOrder, NodePartitionMap partitioning, int splitBatchSize, OptionalInt concurrentLifespans, NodeSelector nodeSelector, List<ConnectorPartitionHandle> partitionHandles) {
        Objects.requireNonNull(stage, "stage is null");
        Objects.requireNonNull(splitSources, "splitSources is null");
        Objects.requireNonNull(partitioning, "partitioning is null");
        Objects.requireNonNull(partitionHandles, "partitionHandles is null");
        this.stage = stage;
        this.partitioning = partitioning;
        this.partitionHandles = ImmutableList.copyOf(partitionHandles);
        Preconditions.checkArgument((boolean)splitSources.keySet().equals(ImmutableSet.copyOf(schedulingOrder)));
        FixedSplitPlacementPolicy splitPlacementPolicy = new FixedSplitPlacementPolicy(nodeSelector, partitioning, stage::getAllTasks);
        ArrayList<SourcePartitionedScheduler> sourcePartitionedSchedulers = new ArrayList<SourcePartitionedScheduler>();
        Preconditions.checkArgument((partitionHandles.equals(ImmutableList.of((Object)NotPartitionedPartitionHandle.NOT_PARTITIONED)) == (pipelineExecutionStrategy == PipelineExecutionStrategy.UNGROUPED_EXECUTION) ? 1 : 0) != 0, (Object)"PartitionHandles should be [NOT_PARTITIONED] if and only if the execution strategy is UNGROUPED_EXECUTION");
        int effectiveConcurrentLifespans = !concurrentLifespans.isPresent() || concurrentLifespans.getAsInt() > partitionHandles.size() ? partitionHandles.size() : concurrentLifespans.getAsInt();
        boolean firstPlanNode = true;
        block4: for (PlanNodeId planNodeId : schedulingOrder) {
            SplitSource splitSource = splitSources.get(planNodeId);
            SourcePartitionedScheduler sourcePartitionedScheduler = SourcePartitionedScheduler.managedSourcePartitionedScheduler(stage, planNodeId, splitSource, splitPlacementPolicy, Math.max(splitBatchSize / effectiveConcurrentLifespans, 1));
            sourcePartitionedSchedulers.add(sourcePartitionedScheduler);
            if (!firstPlanNode) continue;
            firstPlanNode = false;
            switch (pipelineExecutionStrategy) {
                case UNGROUPED_EXECUTION: {
                    sourcePartitionedScheduler.startLifespan(Lifespan.taskWide(), NotPartitionedPartitionHandle.NOT_PARTITIONED);
                    break;
                }
                case GROUPED_EXECUTION: {
                    AtomicInteger nextDriverGroupIndex = new AtomicInteger();
                    stage.addCompletedDriverGroupsChangedListener(newlyCompletedDriverGroups -> {
                        for (Lifespan ignored : newlyCompletedDriverGroups) {
                            this.scheduleNextDriverGroup(sourcePartitionedScheduler, nextDriverGroupIndex);
                        }
                    });
                    for (int i = 0; i < effectiveConcurrentLifespans; ++i) {
                        this.scheduleNextDriverGroup(sourcePartitionedScheduler, nextDriverGroupIndex);
                    }
                    continue block4;
                }
                default: {
                    throw new IllegalArgumentException("Unknown pipelineExecutionStrategy");
                }
            }
        }
        this.sourcePartitionedSchedulers = sourcePartitionedSchedulers;
    }

    private void scheduleNextDriverGroup(SourcePartitionedScheduler scheduler, AtomicInteger nextDriverGroupIndex) {
        int driverGroupIndex = nextDriverGroupIndex.getAndIncrement();
        if (driverGroupIndex >= this.partitionHandles.size()) {
            return;
        }
        Lifespan lifespan = Lifespan.driverGroup(driverGroupIndex);
        scheduler.startLifespan(lifespan, this.partitionHandleFor(lifespan));
    }

    private ConnectorPartitionHandle partitionHandleFor(Lifespan lifespan) {
        if (lifespan.isTaskWide()) {
            return NotPartitionedPartitionHandle.NOT_PARTITIONED;
        }
        return this.partitionHandles.get(lifespan.getId());
    }

    @Override
    public ScheduleResult schedule() {
        Object newTasks = ImmutableList.of();
        if (!this.scheduledTasks) {
            newTasks = (List)this.partitioning.getPartitionToNode().entrySet().stream().map(entry -> this.stage.scheduleTask((Node)entry.getValue(), (Integer)entry.getKey())).collect(ImmutableList.toImmutableList());
            this.scheduledTasks = true;
        }
        boolean allBlocked = true;
        ArrayList blocked = new ArrayList();
        ScheduleResult.BlockedReason blockedReason = ScheduleResult.BlockedReason.NO_ACTIVE_DRIVER_GROUP;
        int splitsScheduled = 0;
        Iterator<SourcePartitionedScheduler> schedulerIterator = this.sourcePartitionedSchedulers.iterator();
        Object driverGroupsToStart = ImmutableList.of();
        while (schedulerIterator.hasNext()) {
            SourcePartitionedScheduler sourcePartitionedScheduler = schedulerIterator.next();
            for (Lifespan lifespan : driverGroupsToStart) {
                sourcePartitionedScheduler.startLifespan(lifespan, this.partitionHandleFor(lifespan));
            }
            ScheduleResult schedule = sourcePartitionedScheduler.schedule();
            splitsScheduled += schedule.getSplitsScheduled();
            if (schedule.getBlockedReason().isPresent()) {
                blocked.add(schedule.getBlocked());
                blockedReason = blockedReason.combineWith(schedule.getBlockedReason().get());
            } else {
                Verify.verify((boolean)schedule.getBlocked().isDone(), (String)"blockedReason not provided when scheduler is blocked", (Object[])new Object[0]);
                allBlocked = false;
            }
            driverGroupsToStart = sourcePartitionedScheduler.drainCompletedLifespans();
            if (!schedule.isFinished()) continue;
            schedulerIterator.remove();
            sourcePartitionedScheduler.close();
        }
        if (allBlocked) {
            return new ScheduleResult(this.sourcePartitionedSchedulers.isEmpty(), (Iterable<? extends RemoteTask>)newTasks, MoreFutures.whenAnyComplete(blocked), blockedReason, splitsScheduled);
        }
        return new ScheduleResult(this.sourcePartitionedSchedulers.isEmpty(), (Iterable<? extends RemoteTask>)newTasks, splitsScheduled);
    }

    @Override
    public void close() {
        for (SourcePartitionedScheduler sourcePartitionedScheduler : this.sourcePartitionedSchedulers) {
            try {
                sourcePartitionedScheduler.close();
            }
            catch (Throwable t) {
                log.warn(t, "Error closing split source");
            }
        }
        this.sourcePartitionedSchedulers.clear();
    }

    public static class FixedSplitPlacementPolicy
    implements SplitPlacementPolicy {
        private final NodeSelector nodeSelector;
        private final NodePartitionMap partitioning;
        private final Supplier<? extends List<RemoteTask>> remoteTasks;

        public FixedSplitPlacementPolicy(NodeSelector nodeSelector, NodePartitionMap partitioning, Supplier<? extends List<RemoteTask>> remoteTasks) {
            this.nodeSelector = nodeSelector;
            this.partitioning = partitioning;
            this.remoteTasks = remoteTasks;
        }

        @Override
        public SplitPlacementResult computeAssignments(Set<Split> splits) {
            return this.nodeSelector.computeAssignments(splits, this.remoteTasks.get(), this.partitioning);
        }

        @Override
        public void lockDownNodes() {
        }

        @Override
        public List<Node> allNodes() {
            return ImmutableList.copyOf(this.partitioning.getPartitionToNode().values());
        }

        public Node getNodeForBucket(int bucketId) {
            return this.partitioning.getPartitionToNode().get(this.partitioning.getBucketToPartition()[bucketId]);
        }
    }
}

