/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.planner;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.presto.Session;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.ScheduledSplit;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spark.PrestoSparkSessionProperties;
import com.facebook.presto.spark.planner.PrestoSparkSplitAssigner;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.split.SplitSource;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.Future;

public class PrestoSparkSourceDistributionSplitAssigner
implements PrestoSparkSplitAssigner {
    private final PlanNodeId tableScanNodeId;
    private final SplitSource splitSource;
    private final int maxBatchSize;
    private final long maxSplitsSizePerPartitionInBytes;
    private final int initialPartitionCount;
    private final boolean partitionCountAutoTuneEnabled;
    private final int minSparkInputPartitionCountForAutoTune;
    private final int maxSparkInputPartitionCountForAutoTune;
    private final PriorityQueue<Partition> queue = new PriorityQueue();
    private int sequenceId;
    private int partitionCount;

    public static PrestoSparkSourceDistributionSplitAssigner create(Session session, PlanNodeId tableScanNodeId, SplitSource splitSource) {
        return new PrestoSparkSourceDistributionSplitAssigner(tableScanNodeId, splitSource, PrestoSparkSessionProperties.getSplitAssignmentBatchSize(session), PrestoSparkSessionProperties.getMaxSplitsDataSizePerSparkPartition(session).toBytes(), PrestoSparkSessionProperties.getSparkInitialPartitionCount(session), PrestoSparkSessionProperties.isSparkPartitionCountAutoTuneEnabled(session), PrestoSparkSessionProperties.getMinSparkInputPartitionCountForAutoTune(session), PrestoSparkSessionProperties.getMaxSparkInputPartitionCountForAutoTune(session));
    }

    public PrestoSparkSourceDistributionSplitAssigner(PlanNodeId tableScanNodeId, SplitSource splitSource, int maxBatchSize, long maxSplitsSizePerPartitionInBytes, int initialPartitionCount, boolean partitionCountAutoTuneEnabled, int minSparkInputPartitionCountForAutoTune, int maxSparkInputPartitionCountForAutoTune) {
        this.tableScanNodeId = Objects.requireNonNull(tableScanNodeId, "tableScanNodeId is null");
        this.splitSource = Objects.requireNonNull(splitSource, "splitSource is null");
        this.maxBatchSize = maxBatchSize;
        Preconditions.checkArgument((maxBatchSize > 0 ? 1 : 0) != 0, (Object)"maxBatchSize must be greater than zero");
        this.maxSplitsSizePerPartitionInBytes = maxSplitsSizePerPartitionInBytes;
        Preconditions.checkArgument((maxSplitsSizePerPartitionInBytes > 0L ? 1 : 0) != 0, (String)"maxSplitsSizePerPartitionInBytes must be greater than zero: %s", (long)maxSplitsSizePerPartitionInBytes);
        this.initialPartitionCount = initialPartitionCount;
        Preconditions.checkArgument((initialPartitionCount > 0 ? 1 : 0) != 0, (String)"initialPartitionCount must be greater then zero: %s", (int)initialPartitionCount);
        this.partitionCountAutoTuneEnabled = partitionCountAutoTuneEnabled;
        this.minSparkInputPartitionCountForAutoTune = minSparkInputPartitionCountForAutoTune;
        this.maxSparkInputPartitionCountForAutoTune = maxSparkInputPartitionCountForAutoTune;
        Preconditions.checkArgument((minSparkInputPartitionCountForAutoTune >= 1 && minSparkInputPartitionCountForAutoTune <= maxSparkInputPartitionCountForAutoTune ? 1 : 0) != 0, (String)"Min partition count for auto tune (%s) should be a positive integer and not larger than max partition count (%s)", (int)minSparkInputPartitionCountForAutoTune, (int)maxSparkInputPartitionCountForAutoTune);
    }

    @Override
    public Optional<SetMultimap<Integer, ScheduledSplit>> getNextBatch() {
        int remaining;
        if (this.splitSource.isFinished()) {
            return Optional.empty();
        }
        ArrayList<ScheduledSplit> scheduledSplits = new ArrayList<ScheduledSplit>();
        while ((remaining = this.maxBatchSize - scheduledSplits.size()) > 0) {
            SplitSource.SplitBatch splitBatch = (SplitSource.SplitBatch)MoreFutures.getFutureValue((Future)this.splitSource.getNextBatch(NotPartitionedPartitionHandle.NOT_PARTITIONED, Lifespan.taskWide(), Math.min(remaining, 1000)));
            for (Split split : splitBatch.getSplits()) {
                scheduledSplits.add(new ScheduledSplit((long)this.sequenceId++, this.tableScanNodeId, split));
            }
            if (!splitBatch.isLastBatch() && !this.splitSource.isFinished()) continue;
            break;
        }
        return Optional.of(this.assignSplitsToTasks(scheduledSplits));
    }

    private SetMultimap<Integer, ScheduledSplit> assignSplitsToTasks(List<ScheduledSplit> splits) {
        HashMultimap result = HashMultimap.create();
        boolean splitsDataSizeAvailable = splits.stream().allMatch(split -> split.getSplit().getConnectorSplit().getSplitSizeInBytes().isPresent());
        if (!splitsDataSizeAvailable) {
            for (int splitIndex = 0; splitIndex < splits.size(); ++splitIndex) {
                result.put((Object)(splitIndex % this.initialPartitionCount), (Object)splits.get(splitIndex));
            }
            return result;
        }
        splits.sort((o1, o2) -> {
            long size1 = o1.getSplit().getConnectorSplit().getSplitSizeInBytes().getAsLong();
            long size2 = o2.getSplit().getConnectorSplit().getSplitSizeInBytes().getAsLong();
            return Long.compare(size2, size1);
        });
        if (this.partitionCountAutoTuneEnabled) {
            for (int splitIndex = 0; splitIndex < splits.size(); ++splitIndex) {
                int partitionId;
                long splitSizeInBytes = splits.get(splitIndex).getSplit().getConnectorSplit().getSplitSizeInBytes().getAsLong();
                if (this.partitionCount >= this.minSparkInputPartitionCountForAutoTune && this.queue.peek().getSplitsInBytes() + splitSizeInBytes <= this.maxSplitsSizePerPartitionInBytes || this.partitionCount == this.maxSparkInputPartitionCountForAutoTune) {
                    Partition partition = this.queue.poll();
                    partitionId = partition.getPartitionId();
                    partition.assignSplitWithSize(splitSizeInBytes);
                    this.queue.add(partition);
                } else {
                    ++this.partitionCount;
                    Partition newPartition = new Partition(partitionId);
                    newPartition.assignSplitWithSize(splitSizeInBytes);
                    this.queue.add(newPartition);
                }
                result.put((Object)partitionId, (Object)splits.get(splitIndex));
            }
        } else {
            for (int splitIndex = 0; splitIndex < splits.size(); ++splitIndex) {
                int partitionId;
                long splitSizeInBytes = splits.get(splitIndex).getSplit().getConnectorSplit().getSplitSizeInBytes().getAsLong();
                if (this.partitionCount < this.initialPartitionCount) {
                    ++this.partitionCount;
                    Partition newPartition = new Partition(partitionId);
                    newPartition.assignSplitWithSize(splitSizeInBytes);
                    this.queue.add(newPartition);
                } else {
                    Partition partition = this.queue.poll();
                    partitionId = partition.getPartitionId();
                    partition.assignSplitWithSize(splitSizeInBytes);
                    this.queue.add(partition);
                }
                result.put((Object)partitionId, (Object)splits.get(splitIndex));
            }
        }
        return result;
    }

    @Override
    public void close() {
        this.splitSource.close();
    }

    private static class Partition
    implements Comparable<Partition> {
        private final int partitionId;
        private long splitsInBytes;

        public Partition(int partitionId) {
            this.partitionId = partitionId;
        }

        public int getPartitionId() {
            return this.partitionId;
        }

        public void assignSplitWithSize(long splitSizeInBytes) {
            this.splitsInBytes += splitSizeInBytes;
        }

        public long getSplitsInBytes() {
            return this.splitsInBytes;
        }

        @Override
        public int compareTo(Partition o) {
            return ComparisonChain.start().compare(this.splitsInBytes, o.splitsInBytes).compare(this.partitionId, o.partitionId).result();
        }
    }
}

