/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.scheduler.NodeSelector;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.execution.scheduler.SourcePartitionedScheduler;
import com.facebook.presto.execution.scheduler.SplitPlacementPolicy;
import com.facebook.presto.execution.scheduler.SplitPlacementResult;
import com.facebook.presto.execution.scheduler.StageScheduler;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.operator.PipelineExecutionStrategy;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.sql.planner.NodePartitionMap;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntListIterator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;

public class FixedSourcePartitionedScheduler
implements StageScheduler {
    private static final Logger log = Logger.get(FixedSourcePartitionedScheduler.class);
    private final SqlStageExecution stage;
    private final NodePartitionMap partitioning;
    private final List<SourcePartitionedScheduler> sourcePartitionedSchedulers;
    private final List<ConnectorPartitionHandle> partitionHandles;
    private boolean scheduledTasks;
    private final Optional<LifespanScheduler> groupedLifespanScheduler;

    public FixedSourcePartitionedScheduler(SqlStageExecution stage, Map<PlanNodeId, SplitSource> splitSources, PipelineExecutionStrategy pipelineExecutionStrategy, List<PlanNodeId> schedulingOrder, NodePartitionMap partitioning, int splitBatchSize, OptionalInt concurrentLifespans, NodeSelector nodeSelector, List<ConnectorPartitionHandle> partitionHandles) {
        Objects.requireNonNull(stage, "stage is null");
        Objects.requireNonNull(splitSources, "splitSources is null");
        Objects.requireNonNull(partitioning, "partitioning is null");
        Objects.requireNonNull(partitionHandles, "partitionHandles is null");
        this.stage = stage;
        this.partitioning = partitioning;
        this.partitionHandles = ImmutableList.copyOf(partitionHandles);
        Preconditions.checkArgument((boolean)splitSources.keySet().equals(ImmutableSet.copyOf(schedulingOrder)));
        FixedSplitPlacementPolicy splitPlacementPolicy = new FixedSplitPlacementPolicy(nodeSelector, partitioning, stage::getAllTasks);
        ArrayList<SourcePartitionedScheduler> sourcePartitionedSchedulers = new ArrayList<SourcePartitionedScheduler>();
        Preconditions.checkArgument((partitionHandles.equals(ImmutableList.of((Object)NotPartitionedPartitionHandle.NOT_PARTITIONED)) == (pipelineExecutionStrategy == PipelineExecutionStrategy.UNGROUPED_EXECUTION) ? 1 : 0) != 0, (Object)"PartitionHandles should be [NOT_PARTITIONED] if and only if the execution strategy is UNGROUPED_EXECUTION");
        int effectiveConcurrentLifespans = !concurrentLifespans.isPresent() || concurrentLifespans.getAsInt() > partitionHandles.size() ? partitionHandles.size() : concurrentLifespans.getAsInt();
        boolean firstPlanNode = true;
        Optional<Object> groupedLifespanScheduler = Optional.empty();
        block4: for (PlanNodeId planNodeId : schedulingOrder) {
            SplitSource splitSource = splitSources.get(planNodeId);
            SourcePartitionedScheduler sourcePartitionedScheduler = SourcePartitionedScheduler.managedSourcePartitionedScheduler(stage, planNodeId, splitSource, splitPlacementPolicy, Math.max(splitBatchSize / effectiveConcurrentLifespans, 1));
            sourcePartitionedSchedulers.add(sourcePartitionedScheduler);
            if (!firstPlanNode) continue;
            firstPlanNode = false;
            switch (pipelineExecutionStrategy) {
                case UNGROUPED_EXECUTION: {
                    sourcePartitionedScheduler.startLifespan(Lifespan.taskWide(), NotPartitionedPartitionHandle.NOT_PARTITIONED);
                    continue block4;
                }
                case GROUPED_EXECUTION: {
                    LifespanScheduler lifespanScheduler = new LifespanScheduler(partitioning, partitionHandles);
                    lifespanScheduler.scheduleInitial(sourcePartitionedScheduler);
                    stage.addCompletedDriverGroupsChangedListener(lifespanScheduler::onLifespanFinished);
                    groupedLifespanScheduler = Optional.of(lifespanScheduler);
                    continue block4;
                }
            }
            throw new IllegalArgumentException("Unknown pipelineExecutionStrategy");
        }
        this.groupedLifespanScheduler = groupedLifespanScheduler;
        this.sourcePartitionedSchedulers = sourcePartitionedSchedulers;
    }

    private ConnectorPartitionHandle partitionHandleFor(Lifespan lifespan) {
        if (lifespan.isTaskWide()) {
            return NotPartitionedPartitionHandle.NOT_PARTITIONED;
        }
        return this.partitionHandles.get(lifespan.getId());
    }

    @Override
    public ScheduleResult schedule() {
        Object newTasks = ImmutableList.of();
        if (!this.scheduledTasks) {
            OptionalInt totalPartitions = OptionalInt.of(this.partitioning.getPartitionToNode().size());
            newTasks = (List)this.partitioning.getPartitionToNode().entrySet().stream().map(entry -> this.stage.scheduleTask((Node)entry.getValue(), (Integer)entry.getKey(), totalPartitions)).collect(ImmutableList.toImmutableList());
            this.scheduledTasks = true;
        }
        boolean allBlocked = true;
        ArrayList<Object> blocked = new ArrayList<Object>();
        ScheduleResult.BlockedReason blockedReason = ScheduleResult.BlockedReason.NO_ACTIVE_DRIVER_GROUP;
        if (this.groupedLifespanScheduler.isPresent()) {
            SettableFuture newDriverGroupReady = this.groupedLifespanScheduler.get().schedule(this.sourcePartitionedSchedulers.get(0));
            blocked.add(newDriverGroupReady);
        }
        int splitsScheduled = 0;
        Iterator<SourcePartitionedScheduler> schedulerIterator = this.sourcePartitionedSchedulers.iterator();
        Object driverGroupsToStart = ImmutableList.of();
        while (schedulerIterator.hasNext()) {
            SourcePartitionedScheduler sourcePartitionedScheduler = schedulerIterator.next();
            for (Lifespan lifespan : driverGroupsToStart) {
                sourcePartitionedScheduler.startLifespan(lifespan, this.partitionHandleFor(lifespan));
            }
            ScheduleResult schedule = sourcePartitionedScheduler.schedule();
            splitsScheduled += schedule.getSplitsScheduled();
            if (schedule.getBlockedReason().isPresent()) {
                blocked.add(schedule.getBlocked());
                blockedReason = blockedReason.combineWith(schedule.getBlockedReason().get());
            } else {
                Verify.verify((boolean)schedule.getBlocked().isDone(), (String)"blockedReason not provided when scheduler is blocked", (Object[])new Object[0]);
                allBlocked = false;
            }
            driverGroupsToStart = sourcePartitionedScheduler.drainCompletedLifespans();
            if (!schedule.isFinished()) continue;
            schedulerIterator.remove();
            sourcePartitionedScheduler.close();
        }
        if (allBlocked) {
            return new ScheduleResult(this.sourcePartitionedSchedulers.isEmpty(), (Iterable<? extends RemoteTask>)newTasks, MoreFutures.whenAnyComplete(blocked), blockedReason, splitsScheduled);
        }
        return new ScheduleResult(this.sourcePartitionedSchedulers.isEmpty(), (Iterable<? extends RemoteTask>)newTasks, splitsScheduled);
    }

    @Override
    public void close() {
        for (SourcePartitionedScheduler sourcePartitionedScheduler : this.sourcePartitionedSchedulers) {
            try {
                sourcePartitionedScheduler.close();
            }
            catch (Throwable t) {
                log.warn(t, "Error closing split source");
            }
        }
        this.sourcePartitionedSchedulers.clear();
    }

    private static class LifespanScheduler {
        private final Int2ObjectMap<Node> driverGroupToNodeMap;
        private final Map<Node, IntListIterator> nodeToDriverGroupsMap;
        private final List<ConnectorPartitionHandle> partitionHandles;
        private boolean initialScheduled;
        private SettableFuture<?> newDriverGroupReady = SettableFuture.create();
        @GuardedBy(value="this")
        private final List<Lifespan> recentlyCompletedDriverGroups = new ArrayList<Lifespan>();

        public LifespanScheduler(NodePartitionMap nodePartitionMap, List<ConnectorPartitionHandle> partitionHandles) {
            HashMap<Node, IntList> nodeToDriverGroupMap = new HashMap<Node, IntList>();
            Int2ObjectOpenHashMap driverGroupToNodeMap = new Int2ObjectOpenHashMap();
            int[] bucketToPartition = nodePartitionMap.getBucketToPartition();
            Map<Integer, Node> partitionToNode = nodePartitionMap.getPartitionToNode();
            for (int bucket = 0; bucket < bucketToPartition.length; ++bucket) {
                int partition = bucketToPartition[bucket];
                Node node = partitionToNode.get(partition);
                nodeToDriverGroupMap.computeIfAbsent(node, key -> new IntArrayList()).add(bucket);
                driverGroupToNodeMap.put(bucket, (Object)node);
            }
            this.driverGroupToNodeMap = driverGroupToNodeMap;
            this.nodeToDriverGroupsMap = (Map)nodeToDriverGroupMap.entrySet().stream().collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, entry -> ((IntList)entry.getValue()).iterator()));
            this.partitionHandles = Objects.requireNonNull(partitionHandles, "partitionHandles is null");
        }

        public void scheduleInitial(SourcePartitionedScheduler scheduler) {
            Preconditions.checkState((!this.initialScheduled ? 1 : 0) != 0);
            this.initialScheduled = true;
            for (Map.Entry<Node, IntListIterator> entry : this.nodeToDriverGroupsMap.entrySet()) {
                int driverGroupId = entry.getValue().nextInt();
                scheduler.startLifespan(Lifespan.driverGroup(driverGroupId), this.partitionHandles.get(driverGroupId));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onLifespanFinished(Iterable<Lifespan> newlyCompletedDriverGroups) {
            Preconditions.checkState((boolean)this.initialScheduled);
            LifespanScheduler lifespanScheduler = this;
            synchronized (lifespanScheduler) {
                for (Lifespan newlyCompletedDriverGroup : newlyCompletedDriverGroups) {
                    Preconditions.checkArgument((!newlyCompletedDriverGroup.isTaskWide() ? 1 : 0) != 0);
                    this.recentlyCompletedDriverGroups.add(newlyCompletedDriverGroup);
                }
                this.newDriverGroupReady.set(null);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SettableFuture schedule(SourcePartitionedScheduler scheduler) {
            ImmutableList recentlyCompletedDriverGroups;
            Preconditions.checkState((boolean)this.initialScheduled);
            LifespanScheduler lifespanScheduler = this;
            synchronized (lifespanScheduler) {
                recentlyCompletedDriverGroups = ImmutableList.copyOf(this.recentlyCompletedDriverGroups);
                this.recentlyCompletedDriverGroups.clear();
                this.newDriverGroupReady = SettableFuture.create();
            }
            for (Lifespan driverGroup : recentlyCompletedDriverGroups) {
                IntListIterator driverGroupsIterator = this.nodeToDriverGroupsMap.get(this.driverGroupToNodeMap.get(driverGroup.getId()));
                if (!driverGroupsIterator.hasNext()) continue;
                int driverGroupId = driverGroupsIterator.nextInt();
                scheduler.startLifespan(Lifespan.driverGroup(driverGroupId), this.partitionHandles.get(driverGroupId));
            }
            return this.newDriverGroupReady;
        }
    }

    public static class FixedSplitPlacementPolicy
    implements SplitPlacementPolicy {
        private final NodeSelector nodeSelector;
        private final NodePartitionMap partitioning;
        private final Supplier<? extends List<RemoteTask>> remoteTasks;

        public FixedSplitPlacementPolicy(NodeSelector nodeSelector, NodePartitionMap partitioning, Supplier<? extends List<RemoteTask>> remoteTasks) {
            this.nodeSelector = nodeSelector;
            this.partitioning = partitioning;
            this.remoteTasks = remoteTasks;
        }

        @Override
        public SplitPlacementResult computeAssignments(Set<Split> splits) {
            return this.nodeSelector.computeAssignments(splits, this.remoteTasks.get(), this.partitioning);
        }

        @Override
        public void lockDownNodes() {
        }

        @Override
        public List<Node> allNodes() {
            return ImmutableList.copyOf(this.partitioning.getPartitionToNode().values());
        }

        public Node getNodeForBucket(int bucketId) {
            return this.partitioning.getPartitionToNode().get(this.partitioning.getBucketToPartition()[bucketId]);
        }
    }
}

