/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.execution.scheduler.SplitPlacementPolicy;
import com.facebook.presto.execution.scheduler.SplitPlacementResult;
import com.facebook.presto.execution.scheduler.StageScheduler;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.split.EmptySplit;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.util.Failures;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public class SourcePartitionedScheduler
implements StageScheduler {
    private final SqlStageExecution stage;
    private final SplitSource splitSource;
    private final SplitPlacementPolicy splitPlacementPolicy;
    private final int splitBatchSize;
    private final PlanNodeId partitionedNode;
    private ListenableFuture<List<Split>> batchFuture;
    private Set<Split> pendingSplits = ImmutableSet.of();
    private State state = State.INITIALIZED;

    public SourcePartitionedScheduler(SqlStageExecution stage, PlanNodeId partitionedNode, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int splitBatchSize) {
        this.stage = Objects.requireNonNull(stage, "stage is null");
        this.splitSource = Objects.requireNonNull(splitSource, "splitSource is null");
        this.splitPlacementPolicy = Objects.requireNonNull(splitPlacementPolicy, "splitPlacementPolicy is null");
        Preconditions.checkArgument((splitBatchSize > 0 ? 1 : 0) != 0, (Object)"splitBatchSize must be at least one");
        this.splitBatchSize = splitBatchSize;
        this.partitionedNode = partitionedNode;
    }

    @Override
    public synchronized ScheduleResult schedule() {
        if (this.pendingSplits.isEmpty()) {
            if (this.batchFuture == null) {
                if (this.splitSource.isFinished()) {
                    return this.handleNoMoreSplits();
                }
                this.batchFuture = this.splitSource.getNextBatch(this.splitBatchSize);
                final long start = System.nanoTime();
                Futures.addCallback(this.batchFuture, (FutureCallback)new FutureCallback<List<Split>>(){

                    public void onSuccess(List<Split> result) {
                        SourcePartitionedScheduler.this.stage.recordGetSplitTime(start);
                    }

                    public void onFailure(Throwable t) {
                    }
                });
            }
            if (!this.batchFuture.isDone()) {
                ListenableFuture blocked = Futures.nonCancellationPropagating(this.batchFuture);
                return new ScheduleResult(false, (Iterable<? extends RemoteTask>)ImmutableSet.of(), blocked, ScheduleResult.BlockedReason.WAITING_FOR_SOURCE, 0);
            }
            this.pendingSplits = ImmutableSet.copyOf((Collection)((Collection)MoreFutures.getFutureValue(this.batchFuture)));
            this.batchFuture = null;
        }
        if (!this.pendingSplits.isEmpty() && this.state == State.INITIALIZED) {
            this.state = State.SPLITS_SCHEDULED;
        }
        SplitPlacementResult splitPlacementResult = this.splitPlacementPolicy.computeAssignments(this.pendingSplits);
        Multimap<Node, Split> splitAssignment = splitPlacementResult.getAssignments();
        ImmutableSet newTasks = this.assignSplits(splitAssignment);
        this.pendingSplits = ImmutableSet.copyOf((Collection)Sets.difference(this.pendingSplits, (Set)ImmutableSet.copyOf((Collection)splitAssignment.values())));
        if (!this.pendingSplits.isEmpty()) {
            newTasks = ImmutableSet.builder().addAll(newTasks).addAll(this.finalizeTaskCreationIfNecessary()).build();
            return new ScheduleResult(false, (Iterable<? extends RemoteTask>)newTasks, splitPlacementResult.getBlocked(), ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL, splitAssignment.values().size());
        }
        boolean finished = this.splitSource.isFinished();
        if (finished) {
            this.splitSource.close();
        }
        return new ScheduleResult(finished, (Iterable<? extends RemoteTask>)newTasks, splitAssignment.values().size());
    }

    private ScheduleResult handleNoMoreSplits() {
        switch (this.state) {
            case INITIALIZED: {
                return this.scheduleEmptySplit();
            }
            case SPLITS_SCHEDULED: {
                this.state = State.FINISHED;
                this.splitSource.close();
                return new ScheduleResult(true, (Iterable<? extends RemoteTask>)ImmutableSet.of(), 0);
            }
        }
        throw new IllegalStateException("SourcePartitionedScheduler expected to be in INITIALIZED or SPLITS_SCHEDULED state but is in " + (Object)((Object)this.state));
    }

    @Override
    public void close() {
        this.splitSource.close();
    }

    private ScheduleResult scheduleEmptySplit() {
        this.state = State.SPLITS_SCHEDULED;
        List<Node> nodes = this.splitPlacementPolicy.allNodes();
        Failures.checkCondition(!nodes.isEmpty(), (ErrorCodeSupplier)StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query", new Object[0]);
        Node node = nodes.iterator().next();
        Split emptySplit = new Split(this.splitSource.getConnectorId(), this.splitSource.getTransactionHandle(), new EmptySplit(this.splitSource.getConnectorId()));
        Set<RemoteTask> emptyTask = this.assignSplits((Multimap<Node, Split>)ImmutableMultimap.of((Object)node, (Object)emptySplit));
        return new ScheduleResult(false, emptyTask, 1);
    }

    private Set<RemoteTask> assignSplits(Multimap<Node, Split> splitAssignment) {
        ImmutableSet.Builder newTasks = ImmutableSet.builder();
        for (Map.Entry taskSplits : splitAssignment.asMap().entrySet()) {
            newTasks.addAll(this.stage.scheduleSplits((Node)taskSplits.getKey(), (Multimap<PlanNodeId, Split>)ImmutableMultimap.builder().putAll((Object)this.partitionedNode, (Iterable)taskSplits.getValue()).build()));
        }
        return newTasks.build();
    }

    private Set<RemoteTask> finalizeTaskCreationIfNecessary() {
        if (this.stage.getFragment().isLeaf()) {
            return ImmutableSet.of();
        }
        this.splitPlacementPolicy.lockDownNodes();
        Set<Node> scheduledNodes = this.stage.getScheduledNodes();
        Set newTasks = (Set)this.splitPlacementPolicy.allNodes().stream().filter(node -> !scheduledNodes.contains(node)).flatMap(node -> this.stage.scheduleSplits((Node)node, (Multimap<PlanNodeId, Split>)ImmutableMultimap.of()).stream()).collect(ImmutableSet.toImmutableSet());
        this.stage.transitionToSchedulingSplits();
        return newTasks;
    }

    private static enum State {
        INITIALIZED,
        SPLITS_SCHEDULED,
        FINISHED;

    }
}

