/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.json.Codec;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.cost.HistoryBasedPlanStatisticsTracker;
import com.facebook.presto.cost.PlanNodeStatsEstimate;
import com.facebook.presto.event.QueryMonitor;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.execution.QueryInfo;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStateTimer;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget;
import com.facebook.presto.execution.scheduler.StreamingPlanSection;
import com.facebook.presto.execution.scheduler.StreamingSubPlan;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spark.ErrorClassifier;
import com.facebook.presto.spark.PrestoSparkBroadcastDependency;
import com.facebook.presto.spark.PrestoSparkMemoryBasedBroadcastDependency;
import com.facebook.presto.spark.PrestoSparkMetadataStorage;
import com.facebook.presto.spark.PrestoSparkQueryData;
import com.facebook.presto.spark.PrestoSparkQueryExecutionFactory;
import com.facebook.presto.spark.PrestoSparkQueryStatusInfo;
import com.facebook.presto.spark.PrestoSparkServiceWaitTimeMetrics;
import com.facebook.presto.spark.PrestoSparkSessionProperties;
import com.facebook.presto.spark.PrestoSparkStorageBasedBroadcastDependency;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.RddAndMore;
import com.facebook.presto.spark.SparkErrorCode;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkQueryExecution;
import com.facebook.presto.spark.classloader_interface.MutablePartitionId;
import com.facebook.presto.spark.classloader_interface.PrestoSparkExecutionException;
import com.facebook.presto.spark.classloader_interface.PrestoSparkMutableRow;
import com.facebook.presto.spark.classloader_interface.PrestoSparkPartitioner;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleSerializer;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats;
import com.facebook.presto.spark.classloader_interface.PrestoSparkStorageHandle;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskExecutorFactoryProvider;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.execution.FragmentExecutionResult;
import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory;
import com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory;
import com.facebook.presto.spark.planner.PrestoSparkPlanFragmenter;
import com.facebook.presto.spark.planner.PrestoSparkQueryPlanner;
import com.facebook.presto.spark.planner.PrestoSparkRddFactory;
import com.facebook.presto.spark.util.PrestoSparkFailureUtils;
import com.facebook.presto.spark.util.PrestoSparkTransactionUtils;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.connector.ConnectorCapabilities;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.statistics.RuntimeSourceInfo;
import com.facebook.presto.spi.statistics.SourceInfo;
import com.facebook.presto.spi.storage.StorageCapabilities;
import com.facebook.presto.spi.storage.TempDataOperationContext;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.sql.planner.PartitioningHandle;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.PartitioningScheme;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.PlanFragmenterUtils;
import com.facebook.presto.sql.planner.SubPlan;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.transaction.TransactionManager;
import com.facebook.presto.util.Failures;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import javax.annotation.concurrent.GuardedBy;
import org.apache.spark.MapOutputStatistics;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SimpleFutureAction;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rdd.ShuffledRDD;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.util.CollectionAccumulator;
import org.apache.spark.util.Utils;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
import scala.Tuple2;

public abstract class AbstractPrestoSparkQueryExecution
implements IPrestoSparkQueryExecution {
    private static final Logger log = Logger.get(AbstractPrestoSparkQueryExecution.class);
    protected final Session session;
    protected final QueryMonitor queryMonitor;
    protected final CollectionAccumulator<SerializedTaskInfo> taskInfoCollector;
    protected final CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector;
    protected final PrestoSparkTaskExecutorFactory taskExecutorFactory;
    protected final PrestoSparkTaskExecutorFactoryProvider taskExecutorFactoryProvider;
    protected final QueryStateTimer queryStateTimer;
    protected final WarningCollector warningCollector;
    protected final String query;
    protected final PrestoSparkQueryPlanner.PlanAndMore planAndMore;
    protected final Optional<String> sparkQueueName;
    protected final Codec<TaskInfo> taskInfoCodec;
    protected final JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec;
    protected final JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec;
    protected final JsonCodec<PrestoSparkQueryData> queryDataJsonCodec;
    protected final PrestoSparkRddFactory rddFactory;
    protected final TransactionManager transactionManager;
    protected final PagesSerde pagesSerde;
    protected final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
    protected final Duration queryTimeout;
    protected final Metadata metadata;
    protected final PrestoSparkMetadataStorage metadataStorage;
    protected final Optional<String> queryStatusInfoOutputLocation;
    protected final Optional<String> queryDataOutputLocation;
    protected final long queryCompletionDeadline;
    protected final TempStorage tempStorage;
    protected final NodeMemoryConfig nodeMemoryConfig;
    protected final Set<PrestoSparkServiceWaitTimeMetrics> waitTimeMetrics;
    protected final Optional<ErrorClassifier> errorClassifier;
    protected final JavaSparkContext sparkContext;
    protected final PrestoSparkPlanFragmenter planFragmenter;
    protected final PartitioningProviderManager partitioningProviderManager;
    protected final HistoryBasedPlanStatisticsTracker historyBasedPlanStatisticsTracker;
    private AtomicReference<SubPlan> finalFragmentedPlan = new AtomicReference();
    @GuardedBy(value="this")
    private final Map<PlanFragmentId, RddAndMore> fragmentIdToRdd = new HashMap<PlanFragmentId, RddAndMore>();

    public AbstractPrestoSparkQueryExecution(JavaSparkContext sparkContext, Session session, QueryMonitor queryMonitor, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, PrestoSparkTaskExecutorFactory taskExecutorFactory, PrestoSparkTaskExecutorFactoryProvider taskExecutorFactoryProvider, QueryStateTimer queryStateTimer, WarningCollector warningCollector, String query, PrestoSparkQueryPlanner.PlanAndMore planAndMore, Optional<String> sparkQueueName, Codec<TaskInfo> taskInfoCodec, JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec, JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec, JsonCodec<PrestoSparkQueryData> queryDataJsonCodec, PrestoSparkRddFactory rddFactory, TransactionManager transactionManager, PagesSerde pagesSerde, PrestoSparkExecutionExceptionFactory executionExceptionFactory, Duration queryTimeout, long queryCompletionDeadline, PrestoSparkMetadataStorage metadataStorage, Optional<String> queryStatusInfoOutputLocation, Optional<String> queryDataOutputLocation, TempStorage tempStorage, NodeMemoryConfig nodeMemoryConfig, Set<PrestoSparkServiceWaitTimeMetrics> waitTimeMetrics, Optional<ErrorClassifier> errorClassifier, PrestoSparkPlanFragmenter planFragmenter, Metadata metadata, PartitioningProviderManager partitioningProviderManager, HistoryBasedPlanStatisticsTracker historyBasedPlanStatisticsTracker) {
        this.sparkContext = Objects.requireNonNull(sparkContext, "sparkContext is null");
        this.session = Objects.requireNonNull(session, "session is null");
        this.queryMonitor = Objects.requireNonNull(queryMonitor, "queryMonitor is null");
        this.taskInfoCollector = Objects.requireNonNull(taskInfoCollector, "taskInfoCollector is null");
        this.shuffleStatsCollector = Objects.requireNonNull(shuffleStatsCollector, "shuffleStatsCollector is null");
        this.taskExecutorFactory = Objects.requireNonNull(taskExecutorFactory, "taskExecutorFactory is null");
        this.taskExecutorFactoryProvider = Objects.requireNonNull(taskExecutorFactoryProvider, "taskExecutorFactoryProvider is null");
        this.queryStateTimer = Objects.requireNonNull(queryStateTimer, "queryStateTimer is null");
        this.warningCollector = Objects.requireNonNull(warningCollector, "warningCollector is null");
        this.query = Objects.requireNonNull(query, "query is null");
        this.planAndMore = Objects.requireNonNull(planAndMore, "planAndMore is null");
        this.sparkQueueName = Objects.requireNonNull(sparkQueueName, "sparkQueueName is null");
        this.taskInfoCodec = Objects.requireNonNull(taskInfoCodec, "taskInfoCodec is null");
        this.sparkTaskDescriptorJsonCodec = Objects.requireNonNull(sparkTaskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null");
        this.queryStatusInfoJsonCodec = Objects.requireNonNull(queryStatusInfoJsonCodec, "queryStatusInfoJsonCodec is null");
        this.queryDataJsonCodec = Objects.requireNonNull(queryDataJsonCodec, "queryDataJsonCodec is null");
        this.rddFactory = Objects.requireNonNull(rddFactory, "rddFactory is null");
        this.transactionManager = Objects.requireNonNull(transactionManager, "transactionManager is null");
        this.pagesSerde = Objects.requireNonNull(pagesSerde, "pagesSerde is null");
        this.executionExceptionFactory = Objects.requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
        this.queryTimeout = Objects.requireNonNull(queryTimeout, "queryTimeout is null");
        this.queryCompletionDeadline = queryCompletionDeadline;
        this.metadataStorage = Objects.requireNonNull(metadataStorage, "metadataStorage is null");
        this.queryStatusInfoOutputLocation = Objects.requireNonNull(queryStatusInfoOutputLocation, "queryStatusInfoOutputLocation is null");
        this.queryDataOutputLocation = Objects.requireNonNull(queryDataOutputLocation, "queryDataOutputLocation is null");
        this.tempStorage = Objects.requireNonNull(tempStorage, "tempStorage is null");
        this.nodeMemoryConfig = Objects.requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null");
        this.waitTimeMetrics = Objects.requireNonNull(waitTimeMetrics, "waitTimeMetrics is null");
        this.errorClassifier = Objects.requireNonNull(errorClassifier, "errorClassifier is null");
        this.planFragmenter = Objects.requireNonNull(planFragmenter, "planFragmenter is null");
        this.metadata = Objects.requireNonNull(metadata, "metadata is null");
        this.partitioningProviderManager = Objects.requireNonNull(partitioningProviderManager, "partitioningProviderManager is null");
        this.historyBasedPlanStatisticsTracker = Objects.requireNonNull(historyBasedPlanStatisticsTracker, "historyBasedPlanStatisticsTracker is null");
    }

    protected static JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow> partitionBy(int planFragmentId, JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow> rdd, PartitioningScheme partitioningScheme) {
        Partitioner partitioner = AbstractPrestoSparkQueryExecution.createPartitioner(partitioningScheme);
        JavaPairRDD javaPairRdd = rdd.partitionBy(partitioner);
        ShuffledRDD shuffledRdd = (ShuffledRDD)javaPairRdd.rdd();
        shuffledRdd.setSerializer((Serializer)new PrestoSparkShuffleSerializer());
        shuffledRdd.setName(PrestoSparkRddFactory.getRDDName(planFragmentId));
        return JavaPairRDD.fromRDD((RDD)shuffledRdd, PrestoSparkUtils.classTag(MutablePartitionId.class), PrestoSparkUtils.classTag(PrestoSparkMutableRow.class));
    }

    protected static Partitioner createPartitioner(PartitioningScheme partitioningScheme) {
        PartitioningHandle partitioning = partitioningScheme.getPartitioning().getHandle();
        if (partitioning.equals((Object)SystemPartitioningHandle.SINGLE_DISTRIBUTION)) {
            return new PrestoSparkPartitioner(1);
        }
        if (partitioning.equals((Object)SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION) || partitioning.equals((Object)SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION) || partitioning.getConnectorId().isPresent()) {
            int[] bucketToPartition = (int[])partitioningScheme.getBucketToPartition().orElseThrow(() -> new IllegalArgumentException("bucketToPartition is expected to be assigned at this point"));
            Preconditions.checkArgument((bucketToPartition.length > 0 ? 1 : 0) != 0, (Object)"bucketToPartition is expected to be non empty");
            int numberOfPartitions = IntStream.of(bucketToPartition).max().getAsInt() + 1;
            return new PrestoSparkPartitioner(numberOfPartitions);
        }
        throw new IllegalArgumentException("Unexpected partitioning: " + partitioning);
    }

    public List<List<Object>> execute() {
        List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>> rddResults;
        try {
            rddResults = this.doExecute();
            this.queryStateTimer.beginFinishing();
            PrestoSparkTransactionUtils.commit(this.session, this.transactionManager);
            this.queryStateTimer.endQuery();
        }
        catch (Throwable executionException) {
            this.queryStateTimer.beginFinishing();
            try {
                PrestoSparkTransactionUtils.rollback(this.session, this.transactionManager);
            }
            catch (RuntimeException rollbackFailure) {
                log.error((Throwable)rollbackFailure, "Encountered error when performing rollback");
            }
            Optional<ExecutionFailureInfo> failureInfo = Optional.empty();
            if (executionException instanceof SparkException) {
                SparkException sparkException = (SparkException)executionException;
                failureInfo = this.executionExceptionFactory.extractExecutionFailureInfo(sparkException);
                if (!failureInfo.isPresent()) {
                    PrestoException wrappedPrestoException = sparkException.getMessage().contains("most recent failure: JVM_OOM") ? new PrestoException((ErrorCodeSupplier)SparkErrorCode.SPARK_EXECUTOR_OOM, executionException) : (sparkException.getMessage().matches(".*Total size of serialized results .* is bigger than allowed maxResultSize.*") ? new PrestoException((ErrorCodeSupplier)SparkErrorCode.EXCEEDED_SPARK_DRIVER_MAX_RESULT_SIZE, executionException) : (sparkException.getMessage().contains("Executor heartbeat timed out") || sparkException.getMessage().contains("Unable to talk to the executor") ? new PrestoException((ErrorCodeSupplier)SparkErrorCode.SPARK_EXECUTOR_LOST, executionException) : (this.errorClassifier.isPresent() ? this.errorClassifier.get().classify(executionException) : new PrestoException((ErrorCodeSupplier)SparkErrorCode.GENERIC_SPARK_ERROR, executionException))));
                    failureInfo = Optional.of(Failures.toFailure((Throwable)wrappedPrestoException));
                }
            } else if (executionException instanceof PrestoSparkExecutionException) {
                failureInfo = this.executionExceptionFactory.extractExecutionFailureInfo((PrestoSparkExecutionException)executionException);
            } else if (executionException instanceof TimeoutException) {
                failureInfo = Optional.of(Failures.toFailure((Throwable)new PrestoException((ErrorCodeSupplier)StandardErrorCode.EXCEEDED_TIME_LIMIT, "Query exceeded maximum time limit of " + this.queryTimeout, executionException)));
            }
            if (!failureInfo.isPresent()) {
                failureInfo = Optional.of(Failures.toFailure((Throwable)executionException));
            }
            this.queryStateTimer.endQuery();
            try {
                this.queryCompletedEvent(failureInfo, OptionalLong.empty());
            }
            catch (RuntimeException eventFailure) {
                log.error((Throwable)eventFailure, "Error publishing query completed event");
            }
            throw PrestoSparkFailureUtils.toPrestoSparkFailure(this.session, failureInfo.get());
        }
        this.processShuffleStats();
        ConnectorSession connectorSession = this.session.toConnectorSession();
        List<Type> types = this.getOutputTypes();
        ImmutableList.Builder result = ImmutableList.builder();
        for (Tuple2<MutablePartitionId, PrestoSparkSerializedPage> tuple : rddResults) {
            Page page = this.pagesSerde.deserialize(PrestoSparkUtils.toSerializedPage((PrestoSparkSerializedPage)tuple._2));
            Preconditions.checkArgument((page.getChannelCount() == types.size() ? 1 : 0) != 0, (String)"expected %s channels, got %s", (int)types.size(), (int)page.getChannelCount());
            for (int position = 0; position < page.getPositionCount(); ++position) {
                ArrayList<Object> columns = new ArrayList<Object>();
                for (int channel = 0; channel < page.getChannelCount(); ++channel) {
                    columns.add(types.get(channel).getObjectValue(connectorSession.getSqlFunctionProperties(), page.getBlock(channel), position));
                }
                result.add(Collections.unmodifiableList(columns));
            }
        }
        ImmutableList results = result.build();
        OptionalLong updateCount = OptionalLong.empty();
        if (this.planAndMore.getUpdateType().isPresent() && types.size() == 1 && types.get(0).equals(BigintType.BIGINT) && results.size() == 1 && ((List)results.get(0)).size() == 1) {
            updateCount = OptionalLong.of(((Number)((List)results.get(0)).get(0)).longValue());
        }
        try {
            this.queryCompletedEvent(Optional.empty(), updateCount);
        }
        catch (RuntimeException eventFailure) {
            log.error((Throwable)eventFailure, "Error publishing query completed event");
        }
        if (this.queryDataOutputLocation.isPresent()) {
            this.metadataStorage.write(this.queryDataOutputLocation.get(), this.queryDataJsonCodec.toJsonBytes((Object)new PrestoSparkQueryData(PrestoSparkQueryExecutionFactory.getOutputColumns(this.planAndMore), (List)results)));
        }
        return results;
    }

    public List<Type> getOutputTypes() {
        return this.getFinalFragmentedPlan().getFragment().getTypes();
    }

    public Optional<String> getUpdateType() {
        return this.planAndMore.getUpdateType();
    }

    protected abstract List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>> doExecute() throws SparkException, TimeoutException;

    @VisibleForTesting
    public <T extends PrestoSparkTaskOutput> RddAndMore<T> createRdd(SubPlan subPlan, Class<T> outputType, TableWriteInfo tableWriteInfo) throws SparkException, TimeoutException {
        ImmutableMap.Builder rddInputs = ImmutableMap.builder();
        ImmutableMap.Builder broadcastInputs = ImmutableMap.builder();
        ImmutableList.Builder broadcastDependencies = ImmutableList.builder();
        for (SubPlan child : subPlan.getChildren()) {
            RddAndMore<Object> childRdd;
            PlanFragment childFragment = child.getFragment();
            if (childFragment.getPartitioningScheme().getPartitioning().getHandle().equals((Object)SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION)) {
                childRdd = PrestoSparkSessionProperties.isStorageBasedBroadcastJoinEnabled(this.session) ? this.createRdd(child, PrestoSparkStorageHandle.class, tableWriteInfo) : this.createRdd(child, PrestoSparkSerializedPage.class, tableWriteInfo);
                PrestoSparkBroadcastDependency<?> broadcastDependency = this.createBroadcastDependency(childRdd);
                broadcastInputs.put((Object)childFragment.getId(), broadcastDependency.executeBroadcast(this.sparkContext));
                broadcastDependencies.add(broadcastDependency);
                continue;
            }
            childRdd = this.createRdd(child, PrestoSparkMutableRow.class, tableWriteInfo);
            rddInputs.put((Object)childFragment.getId(), AbstractPrestoSparkQueryExecution.partitionBy(childFragment.getId().getId(), childRdd.getRdd(), child.getFragment().getPartitioningScheme()));
            broadcastDependencies.addAll(childRdd.getBroadcastDependencies());
        }
        JavaPairRDD<MutablePartitionId, T> rdd = this.rddFactory.createSparkRdd(this.sparkContext, this.session, subPlan.getFragment(), (Map<PlanFragmentId, JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow>>)rddInputs.build(), (Map<PlanFragmentId, Broadcast<?>>)broadcastInputs.build(), this.taskExecutorFactoryProvider, this.taskInfoCollector, this.shuffleStatsCollector, tableWriteInfo, outputType);
        return new RddAndMore<T>(rdd, (List<PrestoSparkBroadcastDependency<?>>)broadcastDependencies.build());
    }

    protected void validateStorageCapabilities(TempStorage tempStorage) {
        boolean isLocalMode = Utils.isLocalMaster((SparkConf)this.sparkContext.getConf());
        List storageCapabilities = tempStorage.getStorageCapabilities();
        if (!isLocalMode && !storageCapabilities.contains(StorageCapabilities.REMOTELY_ACCESSIBLE)) {
            throw new PrestoException((ErrorCodeSupplier)SparkErrorCode.UNSUPPORTED_STORAGE_TYPE, "Configured TempStorage does not support remote access required for distributing broadcast tables.");
        }
    }

    protected void queryCompletedEvent(Optional<ExecutionFailureInfo> failureInfo, OptionalLong updateCount) {
        List serializedTaskInfos = this.taskInfoCollector.value();
        ImmutableList.Builder taskInfos = ImmutableList.builder();
        long totalSerializedTaskInfoSizeInBytes = 0L;
        for (SerializedTaskInfo serializedTaskInfo : serializedTaskInfos) {
            byte[] bytes = serializedTaskInfo.getBytesAndClear();
            totalSerializedTaskInfoSizeInBytes += (long)bytes.length;
            TaskInfo taskInfo = PrestoSparkUtils.deserializeZstdCompressed(this.taskInfoCodec, bytes);
            taskInfos.add((Object)taskInfo);
        }
        this.taskInfoCollector.reset();
        log.info("Total serialized task info size: %s", new Object[]{DataSize.succinctBytes((long)totalSerializedTaskInfoSizeInBytes)});
        StageInfo stageInfo = PrestoSparkQueryExecutionFactory.createStageInfo(this.session.getQueryId(), this.getFinalFragmentedPlan(), (List<TaskInfo>)taskInfos.build());
        QueryState queryState = failureInfo.isPresent() ? QueryState.FAILED : QueryState.FINISHED;
        QueryInfo queryInfo = PrestoSparkQueryExecutionFactory.createQueryInfo(this.session, this.query, queryState, Optional.of(this.planAndMore), this.sparkQueueName, failureInfo, this.queryStateTimer, Optional.of(stageInfo), this.warningCollector);
        this.queryMonitor.queryCompletedEvent(queryInfo);
        this.historyBasedPlanStatisticsTracker.updateStatistics(queryInfo);
        if (this.queryStatusInfoOutputLocation.isPresent()) {
            PrestoSparkQueryStatusInfo prestoSparkQueryStatusInfo = PrestoSparkQueryExecutionFactory.createPrestoSparkQueryInfo(queryInfo, Optional.of(this.planAndMore), this.warningCollector, updateCount);
            this.metadataStorage.write(this.queryStatusInfoOutputLocation.get(), this.queryStatusInfoJsonCodec.toJsonBytes((Object)prestoSparkQueryStatusInfo));
        }
    }

    protected final void setFinalFragmentedPlan(SubPlan subPlan) {
        Verify.verify((subPlan != null ? 1 : 0) != 0, (String)"subPlan is null", (Object[])new Object[0]);
        boolean updated = this.finalFragmentedPlan.compareAndSet(null, subPlan);
        Verify.verify((boolean)updated, (String)"finalFragmentedPlan is already non-null", (Object[])new Object[0]);
    }

    public final SubPlan getFinalFragmentedPlan() {
        SubPlan subPlan = this.finalFragmentedPlan.get();
        Verify.verify((subPlan != null ? 1 : 0) != 0, (String)"finalFragmentedPlan is null", (Object[])new Object[0]);
        return subPlan;
    }

    protected void processShuffleStats() {
        List statsList = this.shuffleStatsCollector.value();
        TreeMap<ShuffleStatsKey, List> statsMap = new TreeMap<ShuffleStatsKey, List>();
        for (PrestoSparkShuffleStats prestoSparkShuffleStats : statsList) {
            ShuffleStatsKey key = new ShuffleStatsKey(prestoSparkShuffleStats.getFragmentId(), prestoSparkShuffleStats.getOperation());
            statsMap.computeIfAbsent(key, ignored -> new ArrayList()).add(prestoSparkShuffleStats);
        }
        log.info("Shuffle statistics summary:");
        for (Map.Entry entry : statsMap.entrySet()) {
            this.logShuffleStatsSummary((ShuffleStatsKey)entry.getKey(), (List)entry.getValue());
        }
        this.shuffleStatsCollector.reset();
    }

    protected void logShuffleStatsSummary(ShuffleStatsKey key, List<PrestoSparkShuffleStats> statsList) {
        long totalProcessedRows = 0L;
        long totalProcessedRowBatches = 0L;
        long totalProcessedBytes = 0L;
        long totalElapsedWallTimeMills = 0L;
        for (PrestoSparkShuffleStats stats : statsList) {
            totalProcessedRows += stats.getProcessedRows();
            totalProcessedRowBatches += stats.getProcessedRowBatches();
            totalProcessedBytes += stats.getProcessedBytes();
            totalElapsedWallTimeMills += stats.getElapsedWallTimeMills();
        }
        long totalElapsedWallTimeSeconds = totalElapsedWallTimeMills / 1000L;
        long rowsPerSecond = totalProcessedRows;
        long rowBatchesPerSecond = totalProcessedRowBatches;
        long bytesPerSecond = totalProcessedBytes;
        if (totalElapsedWallTimeSeconds > 0L) {
            rowsPerSecond = totalProcessedRows / totalElapsedWallTimeSeconds;
            rowBatchesPerSecond = totalProcessedRowBatches / totalElapsedWallTimeSeconds;
            bytesPerSecond = totalProcessedBytes / totalElapsedWallTimeSeconds;
        }
        long averageRowSize = 0L;
        if (totalProcessedRows > 0L) {
            averageRowSize = totalProcessedBytes / totalProcessedRows;
        }
        long averageRowBatchSize = 0L;
        if (totalProcessedRowBatches > 0L) {
            averageRowBatchSize = totalProcessedBytes / totalProcessedRowBatches;
        }
        log.info("Fragment: %s, Operation: %s, Rows: %s, Row Batches: %s, Size: %s, Avg Row Size: %s, Avg Row Batch Size: %s, Time: %s, %s rows/s, %s batches/s, %s/s", new Object[]{key.getFragmentId(), key.getOperation(), totalProcessedRows, totalProcessedRowBatches, DataSize.succinctBytes((long)totalProcessedBytes), DataSize.succinctBytes((long)averageRowSize), DataSize.succinctBytes((long)averageRowBatchSize), Duration.succinctDuration((double)totalElapsedWallTimeMills, (TimeUnit)TimeUnit.MILLISECONDS), rowsPerSecond, rowBatchesPerSecond, DataSize.succinctBytes((long)bytesPerSecond)});
    }

    protected Optional<int[]> getBucketToPartition(Session session, PartitioningHandle partitioningHandle, int hashPartitionCount) {
        if (partitioningHandle.equals((Object)SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION)) {
            return Optional.of(IntStream.range(0, hashPartitionCount).toArray());
        }
        if (partitioningHandle.equals((Object)SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION)) {
            return Optional.of(IntStream.range(0, hashPartitionCount).toArray());
        }
        if (partitioningHandle.getConnectorId().isPresent()) {
            int connectorPartitionCount = this.getPartitionCount(session, partitioningHandle);
            return Optional.of(IntStream.range(0, connectorPartitionCount).toArray());
        }
        return Optional.empty();
    }

    protected int getPartitionCount(Session session, PartitioningHandle partitioning) {
        ConnectorNodePartitioningProvider partitioningProvider = this.getPartitioningProvider(partitioning);
        return partitioningProvider.getBucketCount((ConnectorTransactionHandle)partitioning.getTransactionHandle().orElse(null), session.toConnectorSession(), partitioning.getConnectorHandle());
    }

    protected ConnectorNodePartitioningProvider getPartitioningProvider(PartitioningHandle partitioning) {
        ConnectorId connectorId = (ConnectorId)partitioning.getConnectorId().orElseThrow(() -> new IllegalArgumentException("Unexpected partitioning: " + partitioning));
        return this.partitioningProviderManager.getPartitioningProvider(connectorId);
    }

    protected SubPlan configureOutputPartitioning(Session session, SubPlan subPlan, int hashPartitionCount) {
        PartitioningHandle partitioningHandle;
        Optional<int[]> bucketToPartition;
        PlanFragment fragment = subPlan.getFragment();
        if (!fragment.getPartitioningScheme().getBucketToPartition().isPresent() && (bucketToPartition = this.getBucketToPartition(session, partitioningHandle = fragment.getPartitioningScheme().getPartitioning().getHandle(), hashPartitionCount)).isPresent()) {
            fragment = fragment.withBucketToPartition(bucketToPartition);
        }
        return new SubPlan(fragment, (List)subPlan.getChildren().stream().map(child -> this.configureOutputPartitioning(session, (SubPlan)child, hashPartitionCount)).collect(ImmutableList.toImmutableList()));
    }

    @VisibleForTesting
    public TableWriteInfo getTableWriteInfo(Session session, SubPlan plan) {
        StreamingPlanSection streamingPlanSection = StreamingPlanSection.extractStreamingSections((SubPlan)plan);
        StreamingSubPlan streamingSubPlan = streamingPlanSection.getPlan();
        TableWriteInfo tableWriteInfo = TableWriteInfo.createTableWriteInfo((StreamingSubPlan)streamingSubPlan, (Metadata)this.metadata, (Session)session);
        if (tableWriteInfo.getWriterTarget().isPresent()) {
            this.checkPageSinkCommitIsSupported(session, (ExecutionWriterTarget)tableWriteInfo.getWriterTarget().get());
        }
        return tableWriteInfo;
    }

    private void checkPageSinkCommitIsSupported(Session session, ExecutionWriterTarget writerTarget) {
        ConnectorId connectorId;
        if (writerTarget instanceof ExecutionWriterTarget.DeleteHandle) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, "delete queries are not supported by presto on spark");
        }
        if (writerTarget instanceof ExecutionWriterTarget.CreateHandle) {
            connectorId = ((ExecutionWriterTarget.CreateHandle)writerTarget).getHandle().getConnectorId();
        } else if (writerTarget instanceof ExecutionWriterTarget.InsertHandle) {
            connectorId = ((ExecutionWriterTarget.InsertHandle)writerTarget).getHandle().getConnectorId();
        } else if (writerTarget instanceof ExecutionWriterTarget.RefreshMaterializedViewHandle) {
            connectorId = ((ExecutionWriterTarget.RefreshMaterializedViewHandle)writerTarget).getHandle().getConnectorId();
        } else {
            throw new IllegalArgumentException("unexpected writer target type: " + writerTarget.getClass());
        }
        Verify.verify((connectorId != null ? 1 : 0) != 0, (String)"connectorId is null", (Object[])new Object[0]);
        Set connectorCapabilities = this.metadata.getConnectorCapabilities(session, connectorId);
        if (!connectorCapabilities.contains(ConnectorCapabilities.SUPPORTS_PAGE_SINK_COMMIT)) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, "catalog does not support page sink commit: " + connectorId);
        }
    }

    protected synchronized <T extends PrestoSparkTaskOutput> RddAndMore<T> createRddForSubPlan(SubPlan subPlan, TableWriteInfo tableWriteInfo) throws SparkException, TimeoutException {
        if (this.fragmentIdToRdd.containsKey(subPlan.getFragment().getId())) {
            return this.fragmentIdToRdd.get(subPlan.getFragment().getId());
        }
        ImmutableMap.Builder rddInputs = ImmutableMap.builder();
        ImmutableMap.Builder broadcastInputs = ImmutableMap.builder();
        ImmutableList.Builder broadcastDependencies = ImmutableList.builder();
        for (SubPlan child : subPlan.getChildren()) {
            RddAndMore<T> childRdd = this.createRddForSubPlan(child, tableWriteInfo);
            if (childRdd.isBroadcastDistribution()) {
                PrestoSparkBroadcastDependency<?> broadcastDependency = this.createBroadcastDependency(childRdd);
                broadcastInputs.put((Object)child.getFragment().getId(), broadcastDependency.executeBroadcast(this.sparkContext));
                broadcastDependencies.add(broadcastDependency);
                continue;
            }
            rddInputs.put((Object)child.getFragment().getId(), childRdd.getRdd());
            broadcastDependencies.addAll(childRdd.getBroadcastDependencies());
        }
        Object rdd = this.rddFactory.createSparkRdd(this.sparkContext, this.session, subPlan.getFragment(), (Map<PlanFragmentId, JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow>>)rddInputs.build(), (Map<PlanFragmentId, Broadcast<?>>)broadcastInputs.build(), this.taskExecutorFactoryProvider, this.taskInfoCollector, this.shuffleStatsCollector, tableWriteInfo, this.getOutputType(subPlan));
        if (!PlanFragmenterUtils.isRootFragment((PlanFragment)subPlan.getFragment()) && !this.isBroadcastDistribution(subPlan)) {
            rdd = AbstractPrestoSparkQueryExecution.partitionBy(subPlan.getFragment().getId().getId(), rdd, subPlan.getFragment().getPartitioningScheme());
        }
        RddAndMore rddAndMore = new RddAndMore(rdd, (List<PrestoSparkBroadcastDependency<?>>)broadcastDependencies.build(), Optional.ofNullable(subPlan.getFragment().getPartitioningScheme().getPartitioning().getHandle()));
        this.fragmentIdToRdd.put(subPlan.getFragment().getId(), rddAndMore);
        return rddAndMore;
    }

    private Class getOutputType(SubPlan subPlan) {
        if (PlanFragmenterUtils.isRootFragment((PlanFragment)subPlan.getFragment())) {
            return PrestoSparkSerializedPage.class;
        }
        if (this.isBroadcastDistribution(subPlan)) {
            return this.getOutputTypeForBroadcastNode();
        }
        return PrestoSparkMutableRow.class;
    }

    private Class getOutputTypeForBroadcastNode() {
        if (PrestoSparkSessionProperties.isStorageBasedBroadcastJoinEnabled(this.session)) {
            return PrestoSparkStorageHandle.class;
        }
        return PrestoSparkSerializedPage.class;
    }

    private boolean isBroadcastDistribution(SubPlan subPlan) {
        return subPlan.getFragment().getPartitioningScheme().getPartitioning().getHandle().equals((Object)SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION);
    }

    private PrestoSparkBroadcastDependency<?> createBroadcastDependency(RddAndMore<?> childRdd) {
        PrestoSparkBroadcastDependency<PrestoSparkSerializedPage> broadcastDependency;
        DataSize maxBroadcastMemory = PrestoSparkSessionProperties.getSparkBroadcastJoinMaxMemoryOverride(this.session);
        if (maxBroadcastMemory == null) {
            maxBroadcastMemory = new DataSize((double)Math.min(this.nodeMemoryConfig.getMaxQueryBroadcastMemory().toBytes(), SystemSessionProperties.getQueryMaxBroadcastMemory((Session)this.session).toBytes()), DataSize.Unit.BYTE);
        }
        if (PrestoSparkSessionProperties.isStorageBasedBroadcastJoinEnabled(this.session)) {
            this.validateStorageCapabilities(this.tempStorage);
            TempDataOperationContext tempDataOperationContext = new TempDataOperationContext(this.session.getSource(), this.session.getQueryId().getId(), this.session.getClientInfo(), Optional.of(this.session.getClientTags()), this.session.getIdentity());
            broadcastDependency = new PrestoSparkStorageBasedBroadcastDependency(childRdd, maxBroadcastMemory, SystemSessionProperties.getQueryMaxTotalMemoryPerNode((Session)this.session), this.queryCompletionDeadline, this.tempStorage, tempDataOperationContext, this.waitTimeMetrics);
        } else {
            broadcastDependency = new PrestoSparkMemoryBasedBroadcastDependency(childRdd, maxBroadcastMemory, this.queryCompletionDeadline, this.waitTimeMetrics);
        }
        return broadcastDependency;
    }

    @VisibleForTesting
    public FragmentExecutionResult executeFragment(SubPlan plan, TableWriteInfo tableWriteInfo) throws SparkException, TimeoutException {
        RddAndMore rddAndMore = this.createRddForSubPlan(plan, tableWriteInfo);
        List<ShuffleDependency> shuffleDependencies = rddAndMore.getShuffleDependencies();
        SimpleFutureAction mapOutputStatisticsFutureAction = null;
        Verify.verify((shuffleDependencies.size() <= 1 ? 1 : 0) != 0, (String)"More than 1 shuffle dependency found", (Object[])new Object[0]);
        if (!shuffleDependencies.isEmpty()) {
            ShuffleDependency shuffleDependency = shuffleDependencies.get(0);
            mapOutputStatisticsFutureAction = this.sparkContext.sc().submitMapStage(shuffleDependency);
        }
        return new FragmentExecutionResult(rddAndMore, Optional.ofNullable(mapOutputStatisticsFutureAction));
    }

    public Optional<PlanNodeStatsEstimate> createRuntimeStats(Optional<MapOutputStatistics> mapOutputStatisticsOptional) {
        return mapOutputStatisticsOptional.map(mapOutputStatistics -> {
            double totalSize = Arrays.stream(mapOutputStatistics.bytesByPartitionId()).sum();
            return new PlanNodeStatsEstimate(Double.NaN, totalSize, (PMap)HashTreePMap.empty(), (SourceInfo)new RuntimeSourceInfo());
        });
    }

    private static class ShuffleStatsKey
    implements Comparable<ShuffleStatsKey> {
        private final int fragmentId;
        private final PrestoSparkShuffleStats.Operation operation;

        public ShuffleStatsKey(int fragmentId, PrestoSparkShuffleStats.Operation operation) {
            this.fragmentId = fragmentId;
            this.operation = Objects.requireNonNull(operation, "operation is null");
        }

        public int getFragmentId() {
            return this.fragmentId;
        }

        public PrestoSparkShuffleStats.Operation getOperation() {
            return this.operation;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ShuffleStatsKey that = (ShuffleStatsKey)o;
            return this.fragmentId == that.fragmentId && this.operation == that.operation;
        }

        public int hashCode() {
            return Objects.hash(this.fragmentId, this.operation);
        }

        @Override
        public int compareTo(ShuffleStatsKey that) {
            return ComparisonChain.start().compare(this.fragmentId, that.fragmentId).compare((Comparable)this.operation, (Comparable)that.operation).result();
        }
    }
}

