/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.planner;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.ScheduledSplit;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spark.PrestoSparkSessionProperties;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.MutablePartitionId;
import com.facebook.presto.spark.classloader_interface.PrestoSparkMutableRow;
import com.facebook.presto.spark.classloader_interface.PrestoSparkPartitioner;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleSerializer;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskExecutorFactoryProvider;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskProcessor;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskRdd;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskSourceRdd;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskSource;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.split.CloseableSplitSourceProvider;
import com.facebook.presto.split.SplitManager;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.split.SplitSourceProvider;
import com.facebook.presto.sql.planner.PartitioningHandle;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.SplitSourceFactory;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.function.ToIntFunction;
import java.util.stream.IntStream;
import javax.inject.Inject;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rdd.ShuffledRDD;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.util.CollectionAccumulator;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

public class PrestoSparkRddFactory {
    private final SplitManager splitManager;
    private final PartitioningProviderManager partitioningProviderManager;
    private final JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec;
    private final JsonCodec<TaskSource> taskSourceJsonCodec;

    @Inject
    public PrestoSparkRddFactory(SplitManager splitManager, PartitioningProviderManager partitioningProviderManager, JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec, JsonCodec<TaskSource> taskSourceJsonCodec) {
        this.splitManager = Objects.requireNonNull(splitManager, "splitManager is null");
        this.partitioningProviderManager = Objects.requireNonNull(partitioningProviderManager, "partitioningProviderManager is null");
        this.taskDescriptorJsonCodec = Objects.requireNonNull(taskDescriptorJsonCodec, "taskDescriptorJsonCodec is null");
        this.taskSourceJsonCodec = Objects.requireNonNull(taskSourceJsonCodec, "taskSourceJsonCodec is null");
    }

    public <T extends PrestoSparkTaskOutput> JavaPairRDD<MutablePartitionId, T> createSparkRdd(JavaSparkContext sparkContext, Session session, PlanFragment fragment, Map<PlanFragmentId, JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow>> rddInputs, Map<PlanFragmentId, Broadcast<List<PrestoSparkSerializedPage>>> broadcastInputs, PrestoSparkTaskExecutorFactoryProvider executorFactoryProvider, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, TableWriteInfo tableWriteInfo, Class<T> outputType) {
        Preconditions.checkArgument((!fragment.getStageExecutionDescriptor().isStageGroupedExecution() ? 1 : 0) != 0, (String)"unexpected grouped execution fragment: %s", (Object)fragment.getId());
        PartitioningHandle partitioning = fragment.getPartitioning();
        if (partitioning.equals((Object)SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, "Automatic writers scaling is not supported by Presto on Spark");
        }
        Preconditions.checkArgument((!partitioning.equals((Object)SystemPartitioningHandle.COORDINATOR_DISTRIBUTION) ? 1 : 0) != 0, (Object)"COORDINATOR_DISTRIBUTION fragment must be run on the driver");
        Preconditions.checkArgument((!partitioning.equals((Object)SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION) ? 1 : 0) != 0, (Object)"FIXED_BROADCAST_DISTRIBUTION can only be set as an output partitioning scheme, and not as a fragment distribution");
        Preconditions.checkArgument((!partitioning.equals((Object)SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION) ? 1 : 0) != 0, (Object)"FIXED_PASSTHROUGH_DISTRIBUTION can only be set as local exchange partitioning");
        Preconditions.checkArgument((!partitioning.equals((Object)SystemPartitioningHandle.ARBITRARY_DISTRIBUTION) ? 1 : 0) != 0, (Object)"ARBITRARY_DISTRIBUTION is not expected to be set as a fragment distribution");
        fragment = this.configureOutputPartitioning(session, fragment);
        if (partitioning.equals((Object)SystemPartitioningHandle.SINGLE_DISTRIBUTION) || partitioning.equals((Object)SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION) || partitioning.equals((Object)SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals((Object)SystemPartitioningHandle.SOURCE_DISTRIBUTION) || partitioning.getConnectorId().isPresent()) {
            for (RemoteSourceNode remoteSource : fragment.getRemoteSourceNodes()) {
                if (!remoteSource.isEnsureSourceOrdering() && !remoteSource.getOrderingScheme().isPresent()) continue;
                throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, String.format("Order sensitive exchange is not supported by Presto on Spark. fragmentId: %s, sourceFragmentIds: %s", fragment.getId(), remoteSource.getSourceFragmentIds()));
            }
            Map partitionedInputs = (Map)rddInputs.entrySet().stream().collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, entry -> PrestoSparkRddFactory.partitionBy((JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow>)((JavaPairRDD)entry.getValue()), this.createPartitioner(session, partitioning))));
            return this.createRdd(sparkContext, session, fragment, executorFactoryProvider, taskInfoCollector, shuffleStatsCollector, tableWriteInfo, partitionedInputs, broadcastInputs, outputType);
        }
        throw new IllegalArgumentException(String.format("Unexpected fragment partitioning %s, fragmentId: %s", partitioning, fragment.getId()));
    }

    private PlanFragment configureOutputPartitioning(Session session, PlanFragment fragment) {
        PartitioningHandle outputPartitioningHandle = fragment.getPartitioningScheme().getPartitioning().getHandle();
        if (outputPartitioningHandle.equals((Object)SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION)) {
            int hashPartitionCount = SystemSessionProperties.getHashPartitionCount((Session)session);
            return fragment.withBucketToPartition(Optional.of(IntStream.range(0, hashPartitionCount).toArray()));
        }
        if (outputPartitioningHandle.equals((Object)SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION)) {
            int partitionCount = SystemSessionProperties.getHashPartitionCount((Session)session);
            return fragment.withBucketToPartition(Optional.of(IntStream.range(0, partitionCount).toArray()));
        }
        if (outputPartitioningHandle.getConnectorId().isPresent()) {
            int connectorPartitionCount = this.getPartitionCount(session, outputPartitioningHandle);
            return fragment.withBucketToPartition(Optional.of(IntStream.range(0, connectorPartitionCount).toArray()));
        }
        return fragment;
    }

    private static JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow> partitionBy(JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow> rdd, Partitioner partitioner) {
        JavaPairRDD javaPairRdd = rdd.partitionBy(partitioner);
        ShuffledRDD shuffledRdd = (ShuffledRDD)javaPairRdd.rdd();
        shuffledRdd.setSerializer((Serializer)new PrestoSparkShuffleSerializer());
        return JavaPairRDD.fromRDD((RDD)shuffledRdd, PrestoSparkRddFactory.classTag(MutablePartitionId.class), PrestoSparkRddFactory.classTag(PrestoSparkMutableRow.class));
    }

    private Partitioner createPartitioner(Session session, PartitioningHandle partitioning) {
        if (partitioning.equals((Object)SystemPartitioningHandle.SINGLE_DISTRIBUTION)) {
            return new PrestoSparkPartitioner(1);
        }
        if (partitioning.equals((Object)SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION) || partitioning.equals((Object)SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION)) {
            int hashPartitionCount = SystemSessionProperties.getHashPartitionCount((Session)session);
            return new PrestoSparkPartitioner(hashPartitionCount);
        }
        if (partitioning.getConnectorId().isPresent()) {
            int connectorPartitionCount = this.getPartitionCount(session, partitioning);
            return new PrestoSparkPartitioner(connectorPartitionCount);
        }
        throw new IllegalArgumentException(String.format("Unexpected fragment partitioning %s", partitioning));
    }

    private <T extends PrestoSparkTaskOutput> JavaPairRDD<MutablePartitionId, T> createRdd(JavaSparkContext sparkContext, Session session, PlanFragment fragment, PrestoSparkTaskExecutorFactoryProvider executorFactoryProvider, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, TableWriteInfo tableWriteInfo, Map<PlanFragmentId, JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow>> rddInputs, Map<PlanFragmentId, Broadcast<List<PrestoSparkSerializedPage>>> broadcastInputs, Class<T> outputType) {
        Optional<Object> taskSourceRdd;
        PrestoSparkRddFactory.checkInputs(fragment.getRemoteSourceNodes(), rddInputs, broadcastInputs);
        PrestoSparkTaskDescriptor taskDescriptor = new PrestoSparkTaskDescriptor(session.toSessionRepresentation(), session.getIdentity().getExtraCredentials(), fragment, tableWriteInfo);
        SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor = new SerializedPrestoSparkTaskDescriptor(this.taskDescriptorJsonCodec.toJsonBytes((Object)taskDescriptor));
        Optional<Object> numberOfShufflePartitions = Optional.empty();
        HashMap<String, RDD> shuffleInputRddMap = new HashMap<String, RDD>();
        for (Map.Entry<PlanFragmentId, JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow>> input : rddInputs.entrySet()) {
            RDD rdd = input.getValue().rdd();
            shuffleInputRddMap.put(input.getKey().toString(), rdd);
            if (!numberOfShufflePartitions.isPresent()) {
                numberOfShufflePartitions = Optional.of(rdd.getNumPartitions());
                continue;
            }
            Preconditions.checkArgument((((Integer)numberOfShufflePartitions.get()).intValue() == rdd.getNumPartitions() ? 1 : 0) != 0, (String)"Incompatible number of input partitions: %s != %s", (Object)numberOfShufflePartitions.get(), (int)rdd.getNumPartitions());
        }
        PrestoSparkTaskProcessor taskProcessor = new PrestoSparkTaskProcessor(executorFactoryProvider, serializedTaskDescriptor, taskInfoCollector, shuffleStatsCollector, PrestoSparkRddFactory.toTaskProcessorBroadcastInputs(broadcastInputs), outputType);
        List<TableScanNode> tableScans = PrestoSparkRddFactory.findTableScanNodes(fragment.getRoot());
        if (!tableScans.isEmpty()) {
            try (CloseableSplitSourceProvider splitSourceProvider = new CloseableSplitSourceProvider((arg_0, arg_1, arg_2, arg_3) -> ((SplitManager)this.splitManager).getSplits(arg_0, arg_1, arg_2, arg_3));){
                SplitSourceFactory splitSourceFactory = new SplitSourceFactory((SplitSourceProvider)splitSourceProvider, WarningCollector.NOOP);
                Map splitSources = splitSourceFactory.createSplitSources(fragment, session, tableWriteInfo);
                taskSourceRdd = Optional.of(this.createTaskSourcesRdd(sparkContext, session, fragment.getPartitioning(), tableScans, splitSources, numberOfShufflePartitions));
            }
        } else if (rddInputs.size() == 0) {
            Preconditions.checkArgument((boolean)fragment.getPartitioning().equals((Object)SystemPartitioningHandle.SINGLE_DISTRIBUTION), (String)"SINGLE_DISTRIBUTION partitioning is expected: %s", (Object)fragment.getPartitioning());
            taskSourceRdd = Optional.of(new PrestoSparkTaskSourceRdd(sparkContext.sc(), (List)ImmutableList.of((Object)ImmutableList.of())));
        } else {
            taskSourceRdd = Optional.empty();
        }
        return JavaPairRDD.fromRDD((RDD)PrestoSparkTaskRdd.create((SparkContext)sparkContext.sc(), taskSourceRdd, shuffleInputRddMap, (PrestoSparkTaskProcessor)taskProcessor), PrestoSparkRddFactory.classTag(MutablePartitionId.class), PrestoSparkRddFactory.classTag(outputType));
    }

    private PrestoSparkTaskSourceRdd createTaskSourcesRdd(JavaSparkContext sparkContext, Session session, PartitioningHandle partitioning, List<TableScanNode> tableScans, Map<PlanNodeId, SplitSource> splitSources, Optional<Integer> numberOfShufflePartitions) {
        ArrayListMultimap taskSourcesMap = ArrayListMultimap.create();
        for (TableScanNode tableScan : tableScans) {
            SplitSource splitSource = Objects.requireNonNull(splitSources.get(tableScan.getId()), "split source is missing for table scan node with id: " + tableScan.getId());
            List<ScheduledSplit> scheduledSplits = this.getSplitsAndCloseSource(tableScan, splitSource);
            Collections.shuffle(scheduledSplits);
            SetMultimap<Integer, ScheduledSplit> assignedSplits = this.assignSplitsToTasks(session, partitioning, scheduledSplits);
            Multimaps.asMap(assignedSplits).forEach((arg_0, arg_1) -> PrestoSparkRddFactory.lambda$createTaskSourcesRdd$1((ListMultimap)taskSourcesMap, tableScan, arg_0, arg_1));
        }
        ArrayList<List<SerializedPrestoSparkTaskSource>> taskSourcesByPartitionId = new ArrayList<List<SerializedPrestoSparkTaskSource>>();
        if (numberOfShufflePartitions.isPresent()) {
            for (int partitionId = 0; partitionId < numberOfShufflePartitions.get(); ++partitionId) {
                List taskSources = Objects.requireNonNull(taskSourcesMap.removeAll((Object)partitionId), "taskSources is null");
                taskSourcesByPartitionId.add(this.serializeTaskSources(taskSources));
            }
        } else {
            Iterator partitionsIterator = taskSourcesMap.asMap().entrySet().iterator();
            while (partitionsIterator.hasNext()) {
                Map.Entry entry = partitionsIterator.next();
                taskSourcesByPartitionId.add(this.serializeTaskSources((Collection)entry.getValue()));
                partitionsIterator.remove();
                Verify.verify((boolean)taskSourcesMap.get(entry.getKey()).isEmpty());
            }
        }
        return new PrestoSparkTaskSourceRdd(sparkContext.sc(), taskSourcesByPartitionId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<ScheduledSplit> getSplitsAndCloseSource(TableScanNode tableScan, SplitSource splitSource) {
        try {
            ArrayList<ScheduledSplit> splits = new ArrayList<ScheduledSplit>();
            long sequenceId = 0L;
            while (!splitSource.isFinished()) {
                List splitBatch = ((SplitSource.SplitBatch)MoreFutures.getFutureValue((Future)splitSource.getNextBatch(NotPartitionedPartitionHandle.NOT_PARTITIONED, Lifespan.taskWide(), 1000))).getSplits();
                for (Split split : splitBatch) {
                    splits.add(new ScheduledSplit(sequenceId++, tableScan.getId(), split));
                }
            }
            ArrayList<ScheduledSplit> arrayList = splits;
            return arrayList;
        }
        finally {
            splitSource.close();
        }
    }

    private SetMultimap<Integer, ScheduledSplit> assignSplitsToTasks(Session session, PartitioningHandle partitioning, List<ScheduledSplit> splits) {
        if (partitioning.equals((Object)SystemPartitioningHandle.SOURCE_DISTRIBUTION)) {
            return PrestoSparkRddFactory.assignSourceDistributionSplits(session, splits);
        }
        return this.assignPartitionedSplits(session, partitioning, splits);
    }

    @VisibleForTesting
    public static SetMultimap<Integer, ScheduledSplit> assignSourceDistributionSplits(Session session, List<ScheduledSplit> splits) {
        ImmutableSetMultimap.Builder result = ImmutableSetMultimap.builder();
        long maxSplitsSizeInBytesPerPartition = PrestoSparkSessionProperties.getMaxSplitsDataSizePerSparkPartition(session).toBytes();
        Preconditions.checkArgument((maxSplitsSizeInBytesPerPartition > 0L ? 1 : 0) != 0, (String)"maxSplitsSizeInBytesPerPartition must be greater than zero: %s", (long)maxSplitsSizeInBytesPerPartition);
        int initialPartitionCount = PrestoSparkSessionProperties.getSparkInitialPartitionCount(session);
        Preconditions.checkArgument((initialPartitionCount > 0 ? 1 : 0) != 0, (String)"initialPartitionCount must be greater then zero: %s", (int)initialPartitionCount);
        boolean splitsDataSizeAvailable = splits.stream().allMatch(split -> split.getSplit().getConnectorSplit().getSplitSizeInBytes().isPresent());
        if (!splitsDataSizeAvailable) {
            for (int splitIndex = 0; splitIndex < splits.size(); ++splitIndex) {
                result.put((Object)(splitIndex % initialPartitionCount), (Object)splits.get(splitIndex));
            }
            return result.build();
        }
        splits.sort((o1, o2) -> {
            long size2;
            long size1 = o1.getSplit().getConnectorSplit().getSplitSizeInBytes().getAsLong();
            return size1 == (size2 = o2.getSplit().getConnectorSplit().getSplitSizeInBytes().getAsLong()) ? 0 : (size1 > size2 ? -1 : 1);
        });
        PriorityQueue<SparkPartition> queue = new PriorityQueue<SparkPartition>();
        int partitionCount = 0;
        boolean autoTunePartitionCount = PrestoSparkSessionProperties.isSparkPartitionCountAutoTuneEnabled(session);
        if (autoTunePartitionCount) {
            int minPartitionCount = PrestoSparkSessionProperties.getMinSparkInputPartitionCountForAutoTune(session);
            int maxPartitionCount = PrestoSparkSessionProperties.getMaxSparkInputPartitionCountForAutoTune(session);
            Preconditions.checkArgument((minPartitionCount >= 1 && minPartitionCount <= maxPartitionCount ? 1 : 0) != 0, (String)"Min partition count for auto tune (%s) should be a positive integer and not larger than max partition count (%s)", (int)minPartitionCount, (int)maxPartitionCount);
            for (int splitIndex = 0; splitIndex < splits.size(); ++splitIndex) {
                int partitionId;
                long splitSizeInBytes = splits.get(splitIndex).getSplit().getConnectorSplit().getSplitSizeInBytes().getAsLong();
                if (partitionCount >= minPartitionCount && ((SparkPartition)queue.peek()).getSplitsInBytes() + splitSizeInBytes <= maxSplitsSizeInBytesPerPartition || partitionCount == maxPartitionCount) {
                    SparkPartition partition = (SparkPartition)queue.poll();
                    partitionId = partition.getPartitionId();
                    partition.assignSplitWithSize(splitSizeInBytes);
                    queue.add(partition);
                } else {
                    partitionId = partitionCount++;
                    SparkPartition newPartition = new SparkPartition(partitionId);
                    newPartition.assignSplitWithSize(splitSizeInBytes);
                    queue.add(newPartition);
                }
                result.put((Object)partitionId, (Object)splits.get(splitIndex));
            }
        } else {
            for (int splitIndex = 0; splitIndex < splits.size(); ++splitIndex) {
                int partitionId;
                long splitSizeInBytes = splits.get(splitIndex).getSplit().getConnectorSplit().getSplitSizeInBytes().getAsLong();
                if (partitionCount < initialPartitionCount) {
                    partitionId = partitionCount++;
                    SparkPartition newPartition = new SparkPartition(partitionId);
                    newPartition.assignSplitWithSize(splitSizeInBytes);
                    queue.add(newPartition);
                } else {
                    SparkPartition partition = (SparkPartition)queue.poll();
                    partitionId = partition.getPartitionId();
                    partition.assignSplitWithSize(splitSizeInBytes);
                    queue.add(partition);
                }
                result.put((Object)partitionId, (Object)splits.get(splitIndex));
            }
        }
        return result.build();
    }

    private List<SerializedPrestoSparkTaskSource> serializeTaskSources(Collection<TaskSource> taskSources) {
        return (List)taskSources.stream().map(arg_0 -> this.taskSourceJsonCodec.toJsonBytes(arg_0)).map(PrestoSparkUtils::compress).map(SerializedPrestoSparkTaskSource::new).collect(ImmutableList.toImmutableList());
    }

    private SetMultimap<Integer, ScheduledSplit> assignPartitionedSplits(Session session, PartitioningHandle partitioning, List<ScheduledSplit> splits) {
        ToIntFunction<ConnectorSplit> splitBucketFunction = this.getSplitBucketFunction(session, partitioning);
        ImmutableSetMultimap.Builder result = ImmutableSetMultimap.builder();
        for (ScheduledSplit scheduledSplit : splits) {
            int partitionId = splitBucketFunction.applyAsInt(scheduledSplit.getSplit().getConnectorSplit());
            result.put((Object)partitionId, (Object)scheduledSplit);
        }
        return result.build();
    }

    private ToIntFunction<ConnectorSplit> getSplitBucketFunction(Session session, PartitioningHandle partitioning) {
        ConnectorNodePartitioningProvider partitioningProvider = this.getPartitioningProvider(partitioning);
        return partitioningProvider.getSplitBucketFunction((ConnectorTransactionHandle)partitioning.getTransactionHandle().orElse(null), session.toConnectorSession(), partitioning.getConnectorHandle());
    }

    private int getPartitionCount(Session session, PartitioningHandle partitioning) {
        ConnectorNodePartitioningProvider partitioningProvider = this.getPartitioningProvider(partitioning);
        return partitioningProvider.getBucketCount((ConnectorTransactionHandle)partitioning.getTransactionHandle().orElse(null), session.toConnectorSession(), partitioning.getConnectorHandle());
    }

    private ConnectorNodePartitioningProvider getPartitioningProvider(PartitioningHandle partitioning) {
        ConnectorId connectorId = (ConnectorId)partitioning.getConnectorId().orElseThrow(() -> new IllegalArgumentException("Unexpected partitioning: " + partitioning));
        return this.partitioningProviderManager.getPartitioningProvider(connectorId);
    }

    private static List<TableScanNode> findTableScanNodes(PlanNode node) {
        return PlanNodeSearcher.searchFrom((PlanNode)node).where(TableScanNode.class::isInstance).findAll();
    }

    private static Map<String, Broadcast<List<PrestoSparkSerializedPage>>> toTaskProcessorBroadcastInputs(Map<PlanFragmentId, Broadcast<List<PrestoSparkSerializedPage>>> broadcastInputs) {
        return (Map)broadcastInputs.entrySet().stream().collect(ImmutableMap.toImmutableMap(entry -> ((PlanFragmentId)entry.getKey()).toString(), Map.Entry::getValue));
    }

    private static void checkInputs(List<RemoteSourceNode> remoteSources, Map<PlanFragmentId, JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow>> rddInputs, Map<PlanFragmentId, Broadcast<List<PrestoSparkSerializedPage>>> broadcastInputs) {
        Set expectedInputs = (Set)remoteSources.stream().map(RemoteSourceNode::getSourceFragmentIds).flatMap(Collection::stream).collect(ImmutableSet.toImmutableSet());
        Sets.SetView actualInputs = Sets.union(rddInputs.keySet(), broadcastInputs.keySet());
        Sets.SetView missingInputs = Sets.difference((Set)expectedInputs, (Set)actualInputs);
        Sets.SetView extraInputs = Sets.difference((Set)actualInputs, (Set)expectedInputs);
        Preconditions.checkArgument((missingInputs.isEmpty() && extraInputs.isEmpty() ? 1 : 0) != 0, (String)"rddInputs mismatch discovered. expected inputs: %s, actual rdd inputs: %s, actual broadcast inputs: %s, missing inputs: %s, extra inputs: %s", (Object[])new Object[]{expectedInputs, rddInputs.keySet(), broadcastInputs.keySet(), missingInputs, expectedInputs});
    }

    private static <T> ClassTag<T> classTag(Class<T> clazz) {
        return ClassTag$.MODULE$.apply(clazz);
    }

    private static /* synthetic */ void lambda$createTaskSourcesRdd$1(ListMultimap taskSourcesMap, TableScanNode tableScan, Integer partitionId, Set splits) {
        taskSourcesMap.put((Object)partitionId, (Object)new TaskSource(tableScan.getId(), splits, true));
    }

    private static class SparkPartition
    implements Comparable<SparkPartition> {
        private final int partitionId;
        private long splitsInBytes;

        public SparkPartition(int partitionId) {
            this.partitionId = partitionId;
        }

        @Override
        public int compareTo(SparkPartition o) {
            return this.splitsInBytes == o.splitsInBytes ? 0 : (this.splitsInBytes < o.splitsInBytes ? -1 : 1);
        }

        public int getPartitionId() {
            return this.partitionId;
        }

        public void assignSplitWithSize(long splitSizeInBytes) {
            this.splitsInBytes += splitSizeInBytes;
        }

        public long getSplitsInBytes() {
            return this.splitsInBytes;
        }
    }
}

