/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.Distribution;
import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.client.Column;
import com.facebook.presto.client.QueryError;
import com.facebook.presto.client.StatementStats;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.event.QueryMonitor;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.execution.QueryIdGenerator;
import com.facebook.presto.execution.QueryInfo;
import com.facebook.presto.execution.QueryPreparer;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStateTimer;
import com.facebook.presto.execution.QueryStats;
import com.facebook.presto.execution.StageExecutionId;
import com.facebook.presto.execution.StageExecutionInfo;
import com.facebook.presto.execution.StageExecutionState;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget;
import com.facebook.presto.execution.scheduler.StreamingPlanSection;
import com.facebook.presto.execution.scheduler.StreamingSubPlan;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.execution.warnings.WarningCollectorFactory;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.security.AccessControl;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.QuerySessionSupplier;
import com.facebook.presto.server.SessionContext;
import com.facebook.presto.server.SessionPropertyDefaults;
import com.facebook.presto.server.protocol.QueryResourceUtil;
import com.facebook.presto.spark.PrestoSparkAuthenticatorProvider;
import com.facebook.presto.spark.PrestoSparkCredentialsProvider;
import com.facebook.presto.spark.PrestoSparkQueryData;
import com.facebook.presto.spark.PrestoSparkQueryStatusInfo;
import com.facebook.presto.spark.PrestoSparkSessionContext;
import com.facebook.presto.spark.PrestoSparkSettingsRequirements;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.SparkErrorCode;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkQueryExecution;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkQueryExecutionFactory;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutor;
import com.facebook.presto.spark.classloader_interface.MutablePartitionId;
import com.facebook.presto.spark.classloader_interface.PrestoSparkConfInitializer;
import com.facebook.presto.spark.classloader_interface.PrestoSparkExecutionException;
import com.facebook.presto.spark.classloader_interface.PrestoSparkMutableRow;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSession;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskExecutorFactoryProvider;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput;
import com.facebook.presto.spark.classloader_interface.ScalaUtils;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskSource;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory;
import com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory;
import com.facebook.presto.spark.planner.PrestoSparkPlanFragmenter;
import com.facebook.presto.spark.planner.PrestoSparkQueryPlanner;
import com.facebook.presto.spark.planner.PrestoSparkRddFactory;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ErrorCode;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.connector.ConnectorCapabilities;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.SubPlan;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.planPrinter.PlanPrinter;
import com.facebook.presto.transaction.TransactionId;
import com.facebook.presto.transaction.TransactionInfo;
import com.facebook.presto.transaction.TransactionManager;
import com.facebook.presto.util.Failures;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.CollectionAccumulator;
import org.joda.time.DateTime;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterator;

public class PrestoSparkQueryExecutionFactory
implements IPrestoSparkQueryExecutionFactory {
    private static final Logger log = Logger.get(PrestoSparkQueryExecutionFactory.class);
    private final QueryIdGenerator queryIdGenerator;
    private final QuerySessionSupplier sessionSupplier;
    private final QueryPreparer queryPreparer;
    private final PrestoSparkQueryPlanner queryPlanner;
    private final PrestoSparkPlanFragmenter planFragmenter;
    private final PrestoSparkRddFactory rddFactory;
    private final QueryMonitor queryMonitor;
    private final JsonCodec<TaskInfo> taskInfoJsonCodec;
    private final JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec;
    private final JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec;
    private final JsonCodec<PrestoSparkQueryData> queryDataJsonCodec;
    private final TransactionManager transactionManager;
    private final AccessControl accessControl;
    private final Metadata metadata;
    private final BlockEncodingManager blockEncodingManager;
    private final PrestoSparkSettingsRequirements settingsRequirements;
    private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
    private final PrestoSparkTaskExecutorFactory prestoSparkTaskExecutorFactory;
    private final SessionPropertyDefaults sessionPropertyDefaults;
    private final WarningCollectorFactory warningCollectorFactory;
    private final Set<PrestoSparkCredentialsProvider> credentialsProviders;
    private final Set<PrestoSparkAuthenticatorProvider> authenticatorProviders;

    @Inject
    public PrestoSparkQueryExecutionFactory(QueryIdGenerator queryIdGenerator, QuerySessionSupplier sessionSupplier, QueryPreparer queryPreparer, PrestoSparkQueryPlanner queryPlanner, PrestoSparkPlanFragmenter planFragmenter, PrestoSparkRddFactory rddFactory, QueryMonitor queryMonitor, JsonCodec<TaskInfo> taskInfoJsonCodec, JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec, JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec, JsonCodec<PrestoSparkQueryData> queryDataJsonCodec, TransactionManager transactionManager, AccessControl accessControl, Metadata metadata, BlockEncodingManager blockEncodingManager, PrestoSparkSettingsRequirements settingsRequirements, PrestoSparkExecutionExceptionFactory executionExceptionFactory, PrestoSparkTaskExecutorFactory prestoSparkTaskExecutorFactory, SessionPropertyDefaults sessionPropertyDefaults, WarningCollectorFactory warningCollectorFactory, Set<PrestoSparkCredentialsProvider> credentialsProviders, Set<PrestoSparkAuthenticatorProvider> authenticatorProviders) {
        this.queryIdGenerator = Objects.requireNonNull(queryIdGenerator, "queryIdGenerator is null");
        this.sessionSupplier = Objects.requireNonNull(sessionSupplier, "sessionSupplier is null");
        this.queryPreparer = Objects.requireNonNull(queryPreparer, "queryPreparer is null");
        this.queryPlanner = Objects.requireNonNull(queryPlanner, "queryPlanner is null");
        this.planFragmenter = Objects.requireNonNull(planFragmenter, "planFragmenter is null");
        this.rddFactory = Objects.requireNonNull(rddFactory, "rddFactory is null");
        this.queryMonitor = Objects.requireNonNull(queryMonitor, "queryMonitor is null");
        this.taskInfoJsonCodec = Objects.requireNonNull(taskInfoJsonCodec, "taskInfoJsonCodec is null");
        this.sparkTaskDescriptorJsonCodec = Objects.requireNonNull(sparkTaskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null");
        this.queryStatusInfoJsonCodec = Objects.requireNonNull(queryStatusInfoJsonCodec, "queryStatusInfoJsonCodec is null");
        this.queryDataJsonCodec = Objects.requireNonNull(queryDataJsonCodec, "queryDataJsonCodec is null");
        this.transactionManager = Objects.requireNonNull(transactionManager, "transactionManager is null");
        this.accessControl = Objects.requireNonNull(accessControl, "accessControl is null");
        this.metadata = Objects.requireNonNull(metadata, "metadata is null");
        this.blockEncodingManager = Objects.requireNonNull(blockEncodingManager, "blockEncodingManager is null");
        this.settingsRequirements = Objects.requireNonNull(settingsRequirements, "settingsRequirements is null");
        this.executionExceptionFactory = Objects.requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
        this.prestoSparkTaskExecutorFactory = Objects.requireNonNull(prestoSparkTaskExecutorFactory, "prestoSparkTaskExecutorFactory is null");
        this.sessionPropertyDefaults = Objects.requireNonNull(sessionPropertyDefaults, "sessionPropertyDefaults is null");
        this.warningCollectorFactory = Objects.requireNonNull(warningCollectorFactory, "warningCollectorFactory is null");
        this.credentialsProviders = ImmutableSet.copyOf((Collection)Objects.requireNonNull(credentialsProviders, "credentialsProviders is null"));
        this.authenticatorProviders = ImmutableSet.copyOf((Collection)Objects.requireNonNull(authenticatorProviders, "authenticatorProviders is null"));
    }

    public IPrestoSparkQueryExecution create(SparkContext sparkContext, PrestoSparkSession prestoSparkSession, String sql, Optional<String> sparkQueueName, PrestoSparkTaskExecutorFactoryProvider executorFactoryProvider, Optional<Path> queryStatusInfoOutputPath, Optional<Path> queryDataOutputPath) {
        PrestoSparkConfInitializer.checkInitialized((SparkContext)sparkContext);
        queryStatusInfoOutputPath.ifPresent(path -> Preconditions.checkArgument((boolean)Files.notExists(path, new LinkOption[0]), (String)"File already exist: %s", (Object)path));
        queryDataOutputPath.ifPresent(path -> Preconditions.checkArgument((boolean)Files.notExists(path, new LinkOption[0]), (String)"File already exist: %s", (Object)path));
        QueryStateTimer queryStateTimer = new QueryStateTimer(Ticker.systemTicker());
        queryStateTimer.beginPlanning();
        QueryId queryId = this.queryIdGenerator.createNextQueryId();
        log.info("Starting execution for presto query: %s", new Object[]{queryId});
        PrestoSparkSessionContext sessionContext = PrestoSparkSessionContext.createFromSessionInfo(prestoSparkSession, this.credentialsProviders, this.authenticatorProviders);
        Session session = this.sessionSupplier.createSession(queryId, (SessionContext)sessionContext);
        session = this.sessionPropertyDefaults.newSessionWithDefaultProperties(session, Optional.empty(), Optional.empty());
        WarningCollector warningCollector = this.warningCollectorFactory.create(SystemSessionProperties.getWarningHandlingLevel((Session)session));
        TransactionId transactionId = this.transactionManager.beginTransaction(true);
        session = session.beginTransactionId(transactionId, this.transactionManager, this.accessControl);
        Duration queryMaxRunTime = SystemSessionProperties.getQueryMaxRunTime((Session)session);
        Duration queryMaxExecutionTime = SystemSessionProperties.getQueryMaxExecutionTime((Session)session);
        Duration queryTimeout = queryMaxRunTime.compareTo(queryMaxExecutionTime) < 0 ? queryMaxRunTime : queryMaxExecutionTime;
        long queryCompletionDeadline = System.currentTimeMillis() + queryTimeout.toMillis();
        this.queryMonitor.queryCreatedEvent(new BasicQueryInfo(PrestoSparkQueryExecutionFactory.createQueryInfo(session, sql, QueryState.PLANNING, Optional.empty(), sparkQueueName, Optional.empty(), queryStateTimer, Optional.empty(), warningCollector)));
        PrestoSparkQueryPlanner.PlanAndMore planAndMore = null;
        try {
            this.settingsRequirements.verify(sparkContext, session);
            queryStateTimer.beginAnalyzing();
            QueryPreparer.PreparedQuery preparedQuery = this.queryPreparer.prepareQuery(session, sql, warningCollector);
            planAndMore = this.queryPlanner.createQueryPlan(session, preparedQuery, warningCollector);
            SubPlan fragmentedPlan = this.planFragmenter.fragmentQueryPlan(session, planAndMore.getPlan(), warningCollector);
            log.info(PlanPrinter.textDistributedPlan((SubPlan)fragmentedPlan, (FunctionAndTypeManager)this.metadata.getFunctionAndTypeManager(), (Session)session, (boolean)true));
            TableWriteInfo tableWriteInfo = this.getTableWriteInfo(session, fragmentedPlan);
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
            CollectionAccumulator taskInfoCollector = new CollectionAccumulator();
            taskInfoCollector.register(sparkContext, (Option)new Some((Object)"taskInfoCollector"), false);
            CollectionAccumulator shuffleStatsCollector = new CollectionAccumulator();
            shuffleStatsCollector.register(sparkContext, (Option)new Some((Object)"shuffleStatsCollector"), false);
            queryStateTimer.endAnalysis();
            return new PrestoSparkQueryExecution(javaSparkContext, session, this.queryMonitor, taskInfoCollector, shuffleStatsCollector, this.prestoSparkTaskExecutorFactory, executorFactoryProvider, queryStateTimer, warningCollector, sql, planAndMore, fragmentedPlan, sparkQueueName, this.taskInfoJsonCodec, this.sparkTaskDescriptorJsonCodec, this.queryStatusInfoJsonCodec, this.queryDataJsonCodec, this.rddFactory, tableWriteInfo, this.transactionManager, PrestoSparkUtils.createPagesSerde(this.blockEncodingManager), this.executionExceptionFactory, queryTimeout, queryCompletionDeadline, queryStatusInfoOutputPath, queryDataOutputPath);
        }
        catch (Throwable executionFailure) {
            queryStateTimer.beginFinishing();
            try {
                PrestoSparkQueryExecutionFactory.rollback(session, this.transactionManager);
            }
            catch (RuntimeException rollbackFailure) {
                log.error((Throwable)rollbackFailure, "Encountered error when performing rollback");
            }
            queryStateTimer.endQuery();
            Optional<Object> failureInfo = Optional.empty();
            if (executionFailure instanceof PrestoSparkExecutionException) {
                failureInfo = this.executionExceptionFactory.extractExecutionFailureInfo((PrestoSparkExecutionException)executionFailure);
                Verify.verify((boolean)failureInfo.isPresent());
            }
            if (!failureInfo.isPresent()) {
                failureInfo = Optional.of(Failures.toFailure((Throwable)executionFailure));
            }
            try {
                QueryInfo queryInfo = PrestoSparkQueryExecutionFactory.createQueryInfo(session, sql, QueryState.FAILED, Optional.ofNullable(planAndMore), sparkQueueName, failureInfo, queryStateTimer, Optional.empty(), warningCollector);
                this.queryMonitor.queryCompletedEvent(queryInfo);
                if (queryStatusInfoOutputPath.isPresent()) {
                    PrestoSparkQueryStatusInfo prestoSparkQueryStatusInfo = PrestoSparkQueryExecutionFactory.createPrestoSparkQueryInfo(queryInfo, Optional.ofNullable(planAndMore), warningCollector, OptionalLong.empty());
                    PrestoSparkQueryExecutionFactory.writeJsonFile(queryStatusInfoOutputPath.get(), prestoSparkQueryStatusInfo, this.queryStatusInfoJsonCodec);
                }
            }
            catch (RuntimeException eventFailure) {
                log.error((Throwable)eventFailure, "Error publishing query immediate failure event");
            }
            throw ((ExecutionFailureInfo)failureInfo.get()).toFailure();
        }
    }

    private TableWriteInfo getTableWriteInfo(Session session, SubPlan plan) {
        StreamingPlanSection streamingPlanSection = StreamingPlanSection.extractStreamingSections((SubPlan)plan);
        StreamingSubPlan streamingSubPlan = streamingPlanSection.getPlan();
        TableWriteInfo tableWriteInfo = TableWriteInfo.createTableWriteInfo((StreamingSubPlan)streamingSubPlan, (Metadata)this.metadata, (Session)session);
        if (tableWriteInfo.getWriterTarget().isPresent()) {
            this.checkPageSinkCommitIsSupported(session, (ExecutionWriterTarget)tableWriteInfo.getWriterTarget().get());
        }
        return tableWriteInfo;
    }

    private void checkPageSinkCommitIsSupported(Session session, ExecutionWriterTarget writerTarget) {
        ConnectorId connectorId;
        if (writerTarget instanceof ExecutionWriterTarget.DeleteHandle) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, "delete queries are not supported by presto on spark");
        }
        if (writerTarget instanceof ExecutionWriterTarget.CreateHandle) {
            connectorId = ((ExecutionWriterTarget.CreateHandle)writerTarget).getHandle().getConnectorId();
        } else if (writerTarget instanceof ExecutionWriterTarget.InsertHandle) {
            connectorId = ((ExecutionWriterTarget.InsertHandle)writerTarget).getHandle().getConnectorId();
        } else {
            throw new IllegalArgumentException("unexpected writer target type: " + writerTarget.getClass());
        }
        Verify.verify((connectorId != null ? 1 : 0) != 0, (String)"connectorId is null", (Object[])new Object[0]);
        Set connectorCapabilities = this.metadata.getConnectorCapabilities(session, connectorId);
        if (!connectorCapabilities.contains(ConnectorCapabilities.SUPPORTS_PAGE_SINK_COMMIT)) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, "catalog does not support page sink commit: " + connectorId);
        }
    }

    private static void commit(Session session, TransactionManager transactionManager) {
        MoreFutures.getFutureValue((Future)transactionManager.asyncCommit(PrestoSparkQueryExecutionFactory.getTransactionInfo(session, transactionManager).getTransactionId()));
    }

    private static void rollback(Session session, TransactionManager transactionManager) {
        MoreFutures.getFutureValue((Future)transactionManager.asyncAbort(PrestoSparkQueryExecutionFactory.getTransactionInfo(session, transactionManager).getTransactionId()));
    }

    private static TransactionInfo getTransactionInfo(Session session, TransactionManager transactionManager) {
        Optional transaction = session.getTransactionId().flatMap(arg_0 -> ((TransactionManager)transactionManager).getOptionalTransactionInfo(arg_0));
        Preconditions.checkState((boolean)transaction.isPresent(), (Object)"transaction is not present");
        Preconditions.checkState((boolean)((TransactionInfo)transaction.get()).isAutoCommitContext(), (Object)"transaction doesn't have auto commit context enabled");
        return (TransactionInfo)transaction.get();
    }

    private static QueryInfo createQueryInfo(Session session, String query, QueryState queryState, Optional<PrestoSparkQueryPlanner.PlanAndMore> planAndMore, Optional<String> sparkQueueName, Optional<ExecutionFailureInfo> failureInfo, QueryStateTimer queryStateTimer, Optional<StageInfo> rootStage, WarningCollector warningCollector) {
        Preconditions.checkArgument((failureInfo.isPresent() || queryState != QueryState.FAILED ? 1 : 0) != 0, (String)"unexpected query state: %s", (Object)queryState);
        int peakRunningTasks = 0;
        long peakUserMemoryReservationInBytes = 0L;
        long peakTotalMemoryReservationInBytes = 0L;
        long peakTaskUserMemoryInBytes = 0L;
        long peakTaskTotalMemoryInBytes = 0L;
        long peakNodeTotalMemoryInBytes = 0L;
        for (StageInfo stageInfo : StageInfo.getAllStages(rootStage)) {
            StageExecutionInfo stageExecutionInfo = stageInfo.getLatestAttemptExecutionInfo();
            for (TaskInfo taskInfo : stageExecutionInfo.getTasks()) {
                ++peakRunningTasks;
                long taskPeakUserMemoryInBytes = taskInfo.getStats().getPeakUserMemoryInBytes();
                long taskPeakTotalMemoryInBytes = taskInfo.getStats().getPeakTotalMemoryInBytes();
                peakUserMemoryReservationInBytes += taskPeakUserMemoryInBytes;
                peakTotalMemoryReservationInBytes += taskPeakTotalMemoryInBytes;
                peakTaskUserMemoryInBytes = Math.max(peakTaskUserMemoryInBytes, taskPeakUserMemoryInBytes);
                peakTaskTotalMemoryInBytes = Math.max(peakTaskTotalMemoryInBytes, taskPeakTotalMemoryInBytes);
                peakNodeTotalMemoryInBytes = Math.max(taskInfo.getStats().getPeakNodeTotalMemoryInBytes(), peakNodeTotalMemoryInBytes);
            }
        }
        QueryStats queryStats = QueryStats.create((QueryStateTimer)queryStateTimer, rootStage, (int)peakRunningTasks, (DataSize)DataSize.succinctBytes((long)peakUserMemoryReservationInBytes), (DataSize)DataSize.succinctBytes((long)peakTotalMemoryReservationInBytes), (DataSize)DataSize.succinctBytes((long)peakTaskUserMemoryInBytes), (DataSize)DataSize.succinctBytes((long)peakTaskTotalMemoryInBytes), (DataSize)DataSize.succinctBytes((long)peakNodeTotalMemoryInBytes));
        return new QueryInfo(session.getQueryId(), session.toSessionRepresentation(), queryState, new MemoryPoolId("spark-memory-pool"), queryStats.isScheduled(), URI.create("http://fake.invalid/query/" + session.getQueryId()), planAndMore.map(PrestoSparkQueryPlanner.PlanAndMore::getFieldNames).orElse((List)ImmutableList.of()), query, queryStats, Optional.empty(), Optional.empty(), (Map)ImmutableMap.of(), (Set)ImmutableSet.of(), (Map)ImmutableMap.of(), (Map)ImmutableMap.of(), (Set)ImmutableSet.of(), Optional.empty(), false, (String)planAndMore.flatMap(PrestoSparkQueryPlanner.PlanAndMore::getUpdateType).orElse(null), rootStage, (ExecutionFailureInfo)failureInfo.orElse(null), (ErrorCode)failureInfo.map(ExecutionFailureInfo::getErrorCode).orElse(null), warningCollector.getWarnings(), planAndMore.map(PrestoSparkQueryPlanner.PlanAndMore::getInputs).orElse((Set)ImmutableSet.of()), planAndMore.flatMap(PrestoSparkQueryPlanner.PlanAndMore::getOutput), true, sparkQueueName.map(ResourceGroupId::new), planAndMore.flatMap(PrestoSparkQueryPlanner.PlanAndMore::getQueryType), Optional.empty(), Optional.empty());
    }

    private static StageInfo createStageInfo(QueryId queryId, SubPlan plan, List<TaskInfo> taskInfos) {
        ArrayListMultimap taskInfoMap = ArrayListMultimap.create();
        for (TaskInfo taskInfo : taskInfos) {
            PlanFragmentId fragmentId = new PlanFragmentId(taskInfo.getTaskId().getStageExecutionId().getStageId().getId());
            taskInfoMap.put((Object)fragmentId, (Object)taskInfo);
        }
        return PrestoSparkQueryExecutionFactory.createStageInfo(queryId, plan, (ListMultimap<PlanFragmentId, TaskInfo>)taskInfoMap);
    }

    private static StageInfo createStageInfo(QueryId queryId, SubPlan plan, ListMultimap<PlanFragmentId, TaskInfo> taskInfoMap) {
        PlanFragmentId planFragmentId = plan.getFragment().getId();
        StageId stageId = new StageId(queryId, planFragmentId.getId());
        List taskInfos = taskInfoMap.get((Object)planFragmentId);
        long peakUserMemoryReservationInBytes = 0L;
        long peakNodeTotalMemoryReservationInBytes = 0L;
        for (TaskInfo taskInfo : taskInfos) {
            long taskPeakUserMemoryInBytes = taskInfo.getStats().getUserMemoryReservationInBytes();
            peakUserMemoryReservationInBytes += taskPeakUserMemoryInBytes;
            peakNodeTotalMemoryReservationInBytes = Math.max(taskInfo.getStats().getPeakNodeTotalMemoryInBytes(), peakNodeTotalMemoryReservationInBytes);
        }
        StageExecutionInfo stageExecutionInfo = StageExecutionInfo.create((StageExecutionId)new StageExecutionId(stageId, 0), (StageExecutionState)StageExecutionState.FINISHED, Optional.empty(), (List)taskInfos, (DateTime)DateTime.now(), (Distribution.DistributionSnapshot)new Distribution().snapshot(), (DataSize)DataSize.succinctBytes((long)peakUserMemoryReservationInBytes), (DataSize)DataSize.succinctBytes((long)peakNodeTotalMemoryReservationInBytes), (int)1, (int)1);
        return new StageInfo(stageId, URI.create("http://fake.invalid/stage/" + stageId), Optional.of(plan.getFragment()), stageExecutionInfo, (List)ImmutableList.of(), (List)plan.getChildren().stream().map(child -> PrestoSparkQueryExecutionFactory.createStageInfo(queryId, child, taskInfoMap)).collect(ImmutableList.toImmutableList()), false);
    }

    private static PrestoSparkQueryStatusInfo createPrestoSparkQueryInfo(QueryInfo queryInfo, Optional<PrestoSparkQueryPlanner.PlanAndMore> planAndMore, WarningCollector warningCollector, OptionalLong updateCount) {
        StatementStats stats = QueryResourceUtil.toStatementStats((QueryInfo)queryInfo);
        stats = new StatementStats(stats.getState(), stats.isQueued(), stats.isScheduled(), stats.getNodes(), stats.getTotalSplits(), stats.getQueuedSplits(), stats.getRunningSplits(), stats.getCompletedSplits(), stats.getCpuTimeMillis(), stats.getWallTimeMillis(), stats.getQueuedTimeMillis(), stats.getElapsedTimeMillis(), stats.getProcessedRows(), stats.getProcessedBytes(), stats.getPeakMemoryBytes(), stats.getPeakTotalMemoryBytes(), stats.getPeakTaskTotalMemoryBytes(), stats.getSpilledBytes(), null);
        return new PrestoSparkQueryStatusInfo(queryInfo.getQueryId().getId(), planAndMore.map(PrestoSparkQueryExecutionFactory::getOutputColumns), stats, Optional.ofNullable(queryInfo.getFailureInfo()).map(PrestoSparkQueryExecutionFactory::toQueryError), warningCollector.getWarnings(), planAndMore.flatMap(PrestoSparkQueryPlanner.PlanAndMore::getUpdateType), updateCount);
    }

    private static List<Column> getOutputColumns(PrestoSparkQueryPlanner.PlanAndMore planAndMore) {
        ImmutableList.Builder result = ImmutableList.builder();
        List<String> columnNames = planAndMore.getFieldNames();
        List columnTypes = (List)planAndMore.getPlan().getRoot().getOutputVariables().stream().map(VariableReferenceExpression::getType).collect(ImmutableList.toImmutableList());
        Preconditions.checkArgument((columnNames.size() == columnTypes.size() ? 1 : 0) != 0, (String)"Column names and types size mismatch: %s != %s", (int)columnNames.size(), (int)columnTypes.size());
        for (int i = 0; i < columnNames.size(); ++i) {
            result.add((Object)new Column(columnNames.get(i), (Type)columnTypes.get(i)));
        }
        return result.build();
    }

    private static QueryError toQueryError(ExecutionFailureInfo executionFailureInfo) {
        ErrorCode errorCode = executionFailureInfo.getErrorCode() != null ? executionFailureInfo.getErrorCode() : StandardErrorCode.GENERIC_INTERNAL_ERROR.toErrorCode();
        return new QueryError((String)MoreObjects.firstNonNull((Object)executionFailureInfo.getMessage(), (Object)"Internal error"), null, errorCode.getCode(), errorCode.getName(), errorCode.getType().toString(), executionFailureInfo.getErrorLocation(), executionFailureInfo.toFailureInfo());
    }

    private static <T> void writeJsonFile(Path outputPath, T object, JsonCodec<T> codec) {
        try {
            Files.write(outputPath, codec.toJsonBytes(object), new OpenOption[0]);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static <T> void waitForActionsCompletionWithTimeout(Collection<JavaFutureAction<T>> actions, long timeout, TimeUnit timeUnit) throws SparkException, TimeoutException {
        long deadline = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        try {
            for (JavaFutureAction<T> action : actions) {
                long nextTimeoutInMillis = deadline - System.currentTimeMillis();
                if (nextTimeoutInMillis <= 0L) {
                    throw new TimeoutException();
                }
                PrestoSparkQueryExecutionFactory.getActionResultWithTimeout(action, nextTimeoutInMillis, TimeUnit.MILLISECONDS);
            }
        }
        finally {
            for (JavaFutureAction<T> action : actions) {
                if (action.isDone()) continue;
                action.cancel(true);
            }
        }
    }

    private static <T> T getActionResultWithTimeout(JavaFutureAction<T> action, long timeout, TimeUnit timeUnit) throws SparkException, TimeoutException {
        long deadline = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        while (true) {
            Object object;
            long nextTimeoutInMillis = deadline - System.currentTimeMillis();
            if (nextTimeoutInMillis <= 0L) {
                throw new TimeoutException();
            }
            try {
                object = action.get(nextTimeoutInMillis, TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                try {
                    if (deadline - System.currentTimeMillis() > 0L) continue;
                    throw e;
                }
                catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e2);
                }
                catch (ExecutionException e3) {
                    Throwables.propagateIfPossible((Throwable)e3.getCause(), SparkException.class);
                    Throwables.propagateIfPossible((Throwable)e3.getCause(), RuntimeException.class);
                    throw new UncheckedExecutionException(e3.getCause());
                }
            }
            return (T)object;
            break;
        }
        finally {
            if (!action.isDone()) {
                action.cancel(true);
            }
        }
    }

    private static class ShuffleStatsKey
    implements Comparable<ShuffleStatsKey> {
        private final int fragmentId;
        private final PrestoSparkShuffleStats.Operation operation;

        private ShuffleStatsKey(int fragmentId, PrestoSparkShuffleStats.Operation operation) {
            this.fragmentId = fragmentId;
            this.operation = Objects.requireNonNull(operation, "operation is null");
        }

        public int getFragmentId() {
            return this.fragmentId;
        }

        public PrestoSparkShuffleStats.Operation getOperation() {
            return this.operation;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ShuffleStatsKey that = (ShuffleStatsKey)o;
            return this.fragmentId == that.fragmentId && this.operation == that.operation;
        }

        public int hashCode() {
            return Objects.hash(this.fragmentId, this.operation);
        }

        @Override
        public int compareTo(ShuffleStatsKey that) {
            return ComparisonChain.start().compare(this.fragmentId, that.fragmentId).compare((Comparable)this.operation, (Comparable)that.operation).result();
        }
    }

    private static class RddAndMore<T extends PrestoSparkTaskOutput> {
        private final JavaPairRDD<MutablePartitionId, T> rdd;
        private final List<Broadcast<?>> broadcastDependencies;
        private boolean collected;

        private RddAndMore(JavaPairRDD<MutablePartitionId, T> rdd, List<Broadcast<?>> broadcastDependencies) {
            this.rdd = Objects.requireNonNull(rdd, "rdd is null");
            this.broadcastDependencies = ImmutableList.copyOf((Collection)Objects.requireNonNull(broadcastDependencies, "broadcastDependencies is null"));
        }

        public List<Tuple2<MutablePartitionId, T>> collectAndDestroyDependenciesWithTimeout(long timeout, TimeUnit timeUnit) throws SparkException, TimeoutException {
            Preconditions.checkState((!this.collected ? 1 : 0) != 0, (Object)"already collected");
            this.collected = true;
            List result = (List)PrestoSparkQueryExecutionFactory.getActionResultWithTimeout(this.rdd.collectAsync(), timeout, timeUnit);
            this.broadcastDependencies.forEach(Broadcast::destroy);
            return result;
        }

        public JavaPairRDD<MutablePartitionId, T> getRdd() {
            return this.rdd;
        }

        public List<Broadcast<?>> getBroadcastDependencies() {
            return this.broadcastDependencies;
        }
    }

    public static class PrestoSparkQueryExecution
    implements IPrestoSparkQueryExecution {
        private final JavaSparkContext sparkContext;
        private final Session session;
        private final QueryMonitor queryMonitor;
        private final CollectionAccumulator<SerializedTaskInfo> taskInfoCollector;
        private final CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector;
        private final PrestoSparkTaskExecutorFactory taskExecutorFactory;
        private final PrestoSparkTaskExecutorFactoryProvider taskExecutorFactoryProvider;
        private final QueryStateTimer queryStateTimer;
        private final WarningCollector warningCollector;
        private final String query;
        private final PrestoSparkQueryPlanner.PlanAndMore planAndMore;
        private final SubPlan fragmentedPlan;
        private final Optional<String> sparkQueueName;
        private final JsonCodec<TaskInfo> taskInfoJsonCodec;
        private final JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec;
        private final JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec;
        private final JsonCodec<PrestoSparkQueryData> queryDataJsonCodec;
        private final PrestoSparkRddFactory rddFactory;
        private final TableWriteInfo tableWriteInfo;
        private final TransactionManager transactionManager;
        private final PagesSerde pagesSerde;
        private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
        private final Duration queryTimeout;
        private final Optional<Path> queryStatusInfoOutputPath;
        private final Optional<Path> queryDataOutputPath;
        private final long queryCompletionDeadline;

        private PrestoSparkQueryExecution(JavaSparkContext sparkContext, Session session, QueryMonitor queryMonitor, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, PrestoSparkTaskExecutorFactory taskExecutorFactory, PrestoSparkTaskExecutorFactoryProvider taskExecutorFactoryProvider, QueryStateTimer queryStateTimer, WarningCollector warningCollector, String query, PrestoSparkQueryPlanner.PlanAndMore planAndMore, SubPlan fragmentedPlan, Optional<String> sparkQueueName, JsonCodec<TaskInfo> taskInfoJsonCodec, JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec, JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec, JsonCodec<PrestoSparkQueryData> queryDataJsonCodec, PrestoSparkRddFactory rddFactory, TableWriteInfo tableWriteInfo, TransactionManager transactionManager, PagesSerde pagesSerde, PrestoSparkExecutionExceptionFactory executionExceptionFactory, Duration queryTimeout, long queryCompletionDeadline, Optional<Path> queryStatusInfoOutputPath, Optional<Path> queryDataOutputPath) {
            this.sparkContext = Objects.requireNonNull(sparkContext, "sparkContext is null");
            this.session = Objects.requireNonNull(session, "session is null");
            this.queryMonitor = Objects.requireNonNull(queryMonitor, "queryMonitor is null");
            this.taskInfoCollector = Objects.requireNonNull(taskInfoCollector, "taskInfoCollector is null");
            this.shuffleStatsCollector = Objects.requireNonNull(shuffleStatsCollector, "shuffleStatsCollector is null");
            this.taskExecutorFactory = Objects.requireNonNull(taskExecutorFactory, "taskExecutorFactory is null");
            this.taskExecutorFactoryProvider = Objects.requireNonNull(taskExecutorFactoryProvider, "taskExecutorFactoryProvider is null");
            this.queryStateTimer = Objects.requireNonNull(queryStateTimer, "queryStateTimer is null");
            this.warningCollector = Objects.requireNonNull(warningCollector, "warningCollector is null");
            this.query = Objects.requireNonNull(query, "query is null");
            this.planAndMore = Objects.requireNonNull(planAndMore, "planAndMore is null");
            this.fragmentedPlan = Objects.requireNonNull(fragmentedPlan, "fragmentedPlan is null");
            this.sparkQueueName = Objects.requireNonNull(sparkQueueName, "sparkQueueName is null");
            this.taskInfoJsonCodec = Objects.requireNonNull(taskInfoJsonCodec, "taskInfoJsonCodec is null");
            this.sparkTaskDescriptorJsonCodec = Objects.requireNonNull(sparkTaskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null");
            this.queryStatusInfoJsonCodec = Objects.requireNonNull(queryStatusInfoJsonCodec, "queryStatusInfoJsonCodec is null");
            this.queryDataJsonCodec = Objects.requireNonNull(queryDataJsonCodec, "queryDataJsonCodec is null");
            this.rddFactory = Objects.requireNonNull(rddFactory, "rddFactory is null");
            this.tableWriteInfo = Objects.requireNonNull(tableWriteInfo, "tableWriteInfo is null");
            this.transactionManager = Objects.requireNonNull(transactionManager, "transactionManager is null");
            this.pagesSerde = Objects.requireNonNull(pagesSerde, "pagesSerde is null");
            this.executionExceptionFactory = Objects.requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
            this.queryTimeout = Objects.requireNonNull(queryTimeout, "queryTimeout is null");
            this.queryCompletionDeadline = queryCompletionDeadline;
            this.queryStatusInfoOutputPath = Objects.requireNonNull(queryStatusInfoOutputPath, "queryStatusInfoOutputPath is null");
            this.queryDataOutputPath = Objects.requireNonNull(queryDataOutputPath, "queryDataOutputPath is null");
        }

        public List<List<Object>> execute() {
            List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>> rddResults;
            this.queryStateTimer.beginRunning();
            try {
                rddResults = this.doExecute(this.fragmentedPlan);
                this.queryStateTimer.beginFinishing();
                PrestoSparkQueryExecutionFactory.commit(this.session, this.transactionManager);
                this.queryStateTimer.endQuery();
            }
            catch (Throwable executionException) {
                this.queryStateTimer.beginFinishing();
                try {
                    PrestoSparkQueryExecutionFactory.rollback(this.session, this.transactionManager);
                }
                catch (RuntimeException rollbackFailure) {
                    log.error((Throwable)rollbackFailure, "Encountered error when performing rollback");
                }
                Optional<ExecutionFailureInfo> failureInfo = Optional.empty();
                if (executionException instanceof SparkException) {
                    SparkException sparkException = (SparkException)executionException;
                    failureInfo = this.executionExceptionFactory.extractExecutionFailureInfo(sparkException);
                    if (!failureInfo.isPresent()) {
                        PrestoException wrappedPrestoException = sparkException.getMessage().contains("most recent failure: JVM_OOM") ? new PrestoException((ErrorCodeSupplier)SparkErrorCode.SPARK_EXECUTOR_OOM, executionException) : (sparkException.getMessage().matches(".*Total size of serialized results .* is bigger than allowed maxResultSize.*") ? new PrestoException((ErrorCodeSupplier)SparkErrorCode.EXCEEDED_SPARK_DRIVER_MAX_RESULT_SIZE, executionException) : (sparkException.getMessage().contains("Executor heartbeat timed out") || sparkException.getMessage().contains("Unable to talk to the executor") ? new PrestoException((ErrorCodeSupplier)SparkErrorCode.SPARK_EXECUTOR_LOST, executionException) : new PrestoException((ErrorCodeSupplier)SparkErrorCode.GENERIC_SPARK_ERROR, executionException)));
                        failureInfo = Optional.of(Failures.toFailure((Throwable)wrappedPrestoException));
                    }
                } else if (executionException instanceof PrestoSparkExecutionException) {
                    failureInfo = this.executionExceptionFactory.extractExecutionFailureInfo((PrestoSparkExecutionException)executionException);
                } else if (executionException instanceof TimeoutException) {
                    failureInfo = Optional.of(Failures.toFailure((Throwable)new PrestoException((ErrorCodeSupplier)StandardErrorCode.EXCEEDED_TIME_LIMIT, "Query exceeded maximum time limit of " + this.queryTimeout, executionException)));
                }
                if (!failureInfo.isPresent()) {
                    failureInfo = Optional.of(Failures.toFailure((Throwable)executionException));
                }
                this.queryStateTimer.endQuery();
                try {
                    this.queryCompletedEvent(failureInfo, OptionalLong.empty());
                }
                catch (RuntimeException eventFailure) {
                    log.error((Throwable)eventFailure, "Error publishing query completed event");
                }
                throw failureInfo.get().toFailure();
            }
            this.processShuffleStats();
            ConnectorSession connectorSession = this.session.toConnectorSession();
            List types = this.fragmentedPlan.getFragment().getTypes();
            ImmutableList.Builder result = ImmutableList.builder();
            for (Tuple2<MutablePartitionId, PrestoSparkSerializedPage> tuple : rddResults) {
                Page page = this.pagesSerde.deserialize(PrestoSparkUtils.toSerializedPage((PrestoSparkSerializedPage)tuple._2));
                Preconditions.checkArgument((page.getChannelCount() == types.size() ? 1 : 0) != 0, (String)"expected %s channels, got %s", (int)types.size(), (int)page.getChannelCount());
                for (int position = 0; position < page.getPositionCount(); ++position) {
                    ArrayList<Object> columns = new ArrayList<Object>();
                    for (int channel = 0; channel < page.getChannelCount(); ++channel) {
                        columns.add(((Type)types.get(channel)).getObjectValue(connectorSession.getSqlFunctionProperties(), page.getBlock(channel), position));
                    }
                    result.add(Collections.unmodifiableList(columns));
                }
            }
            ImmutableList results = result.build();
            OptionalLong updateCount = OptionalLong.empty();
            if (this.planAndMore.getUpdateType().isPresent() && types.size() == 1 && ((Type)types.get(0)).equals(BigintType.BIGINT) && results.size() == 1 && ((List)results.get(0)).size() == 1) {
                updateCount = OptionalLong.of(((Number)((List)results.get(0)).get(0)).longValue());
            }
            try {
                this.queryCompletedEvent(Optional.empty(), updateCount);
            }
            catch (RuntimeException eventFailure) {
                log.error((Throwable)eventFailure, "Error publishing query completed event");
            }
            this.queryDataOutputPath.ifPresent(arg_0 -> this.lambda$execute$0((List)results, arg_0));
            return results;
        }

        public List<Type> getOutputTypes() {
            return this.fragmentedPlan.getFragment().getTypes();
        }

        public Optional<String> getUpdateType() {
            return this.planAndMore.getUpdateType();
        }

        private List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>> doExecute(SubPlan root) throws SparkException, TimeoutException {
            PlanFragment rootFragment = root.getFragment();
            if (rootFragment.getPartitioning().equals((Object)SystemPartitioningHandle.COORDINATOR_DISTRIBUTION)) {
                PrestoSparkTaskDescriptor taskDescriptor = new PrestoSparkTaskDescriptor(this.session.toSessionRepresentation(), this.session.getIdentity().getExtraCredentials(), rootFragment, this.tableWriteInfo);
                SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor = new SerializedPrestoSparkTaskDescriptor(this.sparkTaskDescriptorJsonCodec.toJsonBytes((Object)taskDescriptor));
                HashMap<PlanFragmentId, RddAndMore<PrestoSparkSerializedPage>> inputRdds = new HashMap<PlanFragmentId, RddAndMore<PrestoSparkSerializedPage>>();
                for (SubPlan child : root.getChildren()) {
                    inputRdds.put(child.getFragment().getId(), this.createRdd(child, PrestoSparkSerializedPage.class));
                }
                Map inputFutures = (Map)inputRdds.entrySet().stream().collect(ImmutableMap.toImmutableMap(entry -> ((PlanFragmentId)entry.getKey()).toString(), entry -> ((RddAndMore)entry.getValue()).getRdd().collectAsync()));
                PrestoSparkQueryExecutionFactory.waitForActionsCompletionWithTimeout(inputFutures.values(), this.computeNextTimeout(), TimeUnit.MILLISECONDS);
                Map inputs = (Map)inputFutures.entrySet().stream().collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, entry -> (ImmutableList)((List)Futures.getUnchecked((Future)((Future)entry.getValue()))).stream().map(Tuple2::_2).collect(ImmutableList.toImmutableList())));
                IPrestoSparkTaskExecutor<PrestoSparkSerializedPage> prestoSparkTaskExecutor = this.taskExecutorFactory.create(0, 0, serializedTaskDescriptor, (Iterator<SerializedPrestoSparkTaskSource>)ScalaUtils.emptyScalaIterator(), new PrestoSparkTaskInputs((Map)ImmutableMap.of(), (Map)ImmutableMap.of(), inputs), this.taskInfoCollector, this.shuffleStatsCollector, PrestoSparkSerializedPage.class);
                return ScalaUtils.collectScalaIterator(prestoSparkTaskExecutor);
            }
            RddAndMore<PrestoSparkSerializedPage> rootRdd = this.createRdd(root, PrestoSparkSerializedPage.class);
            return rootRdd.collectAndDestroyDependenciesWithTimeout(this.computeNextTimeout(), TimeUnit.MILLISECONDS);
        }

        private <T extends PrestoSparkTaskOutput> RddAndMore<T> createRdd(SubPlan subPlan, Class<T> outputType) throws SparkException, TimeoutException {
            ImmutableMap.Builder rddInputs = ImmutableMap.builder();
            ImmutableMap.Builder broadcastInputs = ImmutableMap.builder();
            ImmutableList.Builder broadcastDependencies = ImmutableList.builder();
            for (SubPlan child : subPlan.getChildren()) {
                RddAndMore<PrestoSparkSerializedPage> childRdd;
                PlanFragment childFragment = child.getFragment();
                if (childFragment.getPartitioningScheme().getPartitioning().getHandle().equals((Object)SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION)) {
                    childRdd = this.createRdd(child, PrestoSparkSerializedPage.class);
                    List broadcastPages = childRdd.collectAndDestroyDependenciesWithTimeout(this.computeNextTimeout(), TimeUnit.MILLISECONDS).stream().map(Tuple2::_2).collect(Collectors.toList());
                    int compressedBroadcastSizeInBytes = broadcastPages.stream().mapToInt(page -> page.getBytes().length).sum();
                    int uncompressedBroadcastSizeInBytes = broadcastPages.stream().mapToInt(PrestoSparkSerializedPage::getUncompressedSizeInBytes).sum();
                    DataSize maxBroadcastSize = SystemSessionProperties.getQueryMaxBroadcastMemory((Session)this.session);
                    long maxBroadcastSizeInBytes = maxBroadcastSize.toBytes();
                    if ((long)compressedBroadcastSizeInBytes > maxBroadcastSizeInBytes) {
                        throw ExceededMemoryLimitException.exceededLocalBroadcastMemoryLimit((DataSize)maxBroadcastSize, (String)String.format("Compressed broadcast size: %s", DataSize.succinctBytes((long)compressedBroadcastSizeInBytes)));
                    }
                    if ((long)uncompressedBroadcastSizeInBytes > maxBroadcastSizeInBytes) {
                        throw ExceededMemoryLimitException.exceededLocalBroadcastMemoryLimit((DataSize)maxBroadcastSize, (String)String.format("Uncompressed broadcast size: %s", DataSize.succinctBytes((long)compressedBroadcastSizeInBytes)));
                    }
                    Broadcast broadcast = this.sparkContext.broadcast(broadcastPages);
                    broadcastInputs.put((Object)childFragment.getId(), (Object)broadcast);
                    broadcastDependencies.add((Object)broadcast);
                    continue;
                }
                childRdd = this.createRdd(child, PrestoSparkMutableRow.class);
                rddInputs.put((Object)childFragment.getId(), childRdd.getRdd());
                broadcastDependencies.addAll(childRdd.getBroadcastDependencies());
            }
            JavaPairRDD<MutablePartitionId, T> rdd = this.rddFactory.createSparkRdd(this.sparkContext, this.session, subPlan.getFragment(), (Map<PlanFragmentId, JavaPairRDD<MutablePartitionId, PrestoSparkMutableRow>>)rddInputs.build(), (Map<PlanFragmentId, Broadcast<List<PrestoSparkSerializedPage>>>)broadcastInputs.build(), this.taskExecutorFactoryProvider, this.taskInfoCollector, this.shuffleStatsCollector, this.tableWriteInfo, outputType);
            return new RddAndMore(rdd, (List)broadcastDependencies.build());
        }

        private void queryCompletedEvent(Optional<ExecutionFailureInfo> failureInfo, OptionalLong updateCount) {
            List serializedTaskInfos = this.taskInfoCollector.value();
            List taskInfos = (List)serializedTaskInfos.stream().map(SerializedTaskInfo::getBytes).map(PrestoSparkUtils::decompress).map(arg_0 -> this.taskInfoJsonCodec.fromJson(arg_0)).collect(ImmutableList.toImmutableList());
            StageInfo stageInfo = PrestoSparkQueryExecutionFactory.createStageInfo(this.session.getQueryId(), this.fragmentedPlan, taskInfos);
            QueryState queryState = failureInfo.isPresent() ? QueryState.FAILED : QueryState.FINISHED;
            QueryInfo queryInfo = PrestoSparkQueryExecutionFactory.createQueryInfo(this.session, this.query, queryState, Optional.of(this.planAndMore), this.sparkQueueName, failureInfo, this.queryStateTimer, Optional.of(stageInfo), this.warningCollector);
            this.queryMonitor.queryCompletedEvent(queryInfo);
            if (this.queryStatusInfoOutputPath.isPresent()) {
                PrestoSparkQueryStatusInfo prestoSparkQueryStatusInfo = PrestoSparkQueryExecutionFactory.createPrestoSparkQueryInfo(queryInfo, Optional.of(this.planAndMore), this.warningCollector, updateCount);
                PrestoSparkQueryExecutionFactory.writeJsonFile(this.queryStatusInfoOutputPath.get(), prestoSparkQueryStatusInfo, this.queryStatusInfoJsonCodec);
            }
        }

        private void processShuffleStats() {
            ArrayList statsList = new ArrayList(this.shuffleStatsCollector.value());
            TreeMap<ShuffleStatsKey, List> statsMap = new TreeMap<ShuffleStatsKey, List>();
            for (PrestoSparkShuffleStats prestoSparkShuffleStats : statsList) {
                ShuffleStatsKey key = new ShuffleStatsKey(prestoSparkShuffleStats.getFragmentId(), prestoSparkShuffleStats.getOperation());
                statsMap.computeIfAbsent(key, ignored -> new ArrayList()).add(prestoSparkShuffleStats);
            }
            log.info("Shuffle statistics summary:");
            for (Map.Entry entry : statsMap.entrySet()) {
                this.logShuffleStatsSummary((ShuffleStatsKey)entry.getKey(), (List)entry.getValue());
            }
        }

        private void logShuffleStatsSummary(ShuffleStatsKey key, List<PrestoSparkShuffleStats> statsList) {
            long totalProcessedRows = 0L;
            long totalProcessedRowBatches = 0L;
            long totalProcessedBytes = 0L;
            long totalElapsedWallTimeMills = 0L;
            for (PrestoSparkShuffleStats stats : statsList) {
                totalProcessedRows += stats.getProcessedRows();
                totalProcessedRowBatches += stats.getProcessedRowBatches();
                totalProcessedBytes += stats.getProcessedBytes();
                totalElapsedWallTimeMills += stats.getElapsedWallTimeMills();
            }
            long totalElapsedWallTimeSeconds = totalElapsedWallTimeMills / 1000L;
            long rowsPerSecond = totalProcessedRows;
            long rowBatchesPerSecond = totalProcessedRowBatches;
            long bytesPerSecond = totalProcessedBytes;
            if (totalElapsedWallTimeSeconds > 0L) {
                rowsPerSecond = totalProcessedRows / totalElapsedWallTimeSeconds;
                rowBatchesPerSecond = totalProcessedRowBatches / totalElapsedWallTimeSeconds;
                bytesPerSecond = totalProcessedBytes / totalElapsedWallTimeSeconds;
            }
            long averageRowSize = 0L;
            if (totalProcessedRows > 0L) {
                averageRowSize = totalProcessedBytes / totalProcessedRows;
            }
            long averageRowBatchSize = 0L;
            if (totalProcessedRowBatches > 0L) {
                averageRowBatchSize = totalProcessedBytes / totalProcessedRowBatches;
            }
            log.info("Fragment: %s, Operation: %s, Rows: %s, Row Batches: %s, Size: %s, Avg Row Size: %s, Avg Row Batch Size: %s, Time: %s, %s rows/s, %s batches/s, %s/s", new Object[]{key.getFragmentId(), key.getOperation(), totalProcessedRows, totalProcessedRowBatches, DataSize.succinctBytes((long)totalProcessedBytes), DataSize.succinctBytes((long)averageRowSize), DataSize.succinctBytes((long)averageRowBatchSize), Duration.succinctDuration((double)totalElapsedWallTimeMills, (TimeUnit)TimeUnit.MILLISECONDS), rowsPerSecond, rowBatchesPerSecond, DataSize.succinctBytes((long)bytesPerSecond)});
        }

        private long computeNextTimeout() throws TimeoutException {
            long timeout = this.queryCompletionDeadline - System.currentTimeMillis();
            if (timeout <= 0L) {
                throw new TimeoutException();
            }
            return timeout;
        }

        private /* synthetic */ void lambda$execute$0(List results, Path path) {
            PrestoSparkQueryExecutionFactory.writeJsonFile(path, new PrestoSparkQueryData(PrestoSparkQueryExecutionFactory.getOutputColumns(this.planAndMore), results), this.queryDataJsonCodec);
        }
    }
}

