/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.json.Codec;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.cost.HistoryBasedPlanStatisticsTracker;
import com.facebook.presto.event.QueryMonitor;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStateTimer;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spark.ErrorClassifier;
import com.facebook.presto.spark.PrestoSparkMetadataStorage;
import com.facebook.presto.spark.PrestoSparkQueryData;
import com.facebook.presto.spark.PrestoSparkQueryExecutionFactory;
import com.facebook.presto.spark.PrestoSparkQueryStatusInfo;
import com.facebook.presto.spark.PrestoSparkServiceWaitTimeMetrics;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.RddAndMore;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutor;
import com.facebook.presto.spark.classloader_interface.MutablePartitionId;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskExecutorFactoryProvider;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs;
import com.facebook.presto.spark.classloader_interface.ScalaUtils;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskSource;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.execution.AbstractPrestoSparkQueryExecution;
import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory;
import com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory;
import com.facebook.presto.spark.planner.PrestoSparkPlanFragmenter;
import com.facebook.presto.spark.planner.PrestoSparkQueryPlanner;
import com.facebook.presto.spark.planner.PrestoSparkRddFactory;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.SubPlan;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.planPrinter.PlanPrinter;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.CollectionAccumulator;
import scala.Tuple2;
import scala.collection.Iterator;

public class PrestoSparkStaticQueryExecution
extends AbstractPrestoSparkQueryExecution {
    private static final Logger log = Logger.get(PrestoSparkStaticQueryExecution.class);

    public PrestoSparkStaticQueryExecution(JavaSparkContext sparkContext, Session session, QueryMonitor queryMonitor, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, PrestoSparkTaskExecutorFactory taskExecutorFactory, PrestoSparkTaskExecutorFactoryProvider taskExecutorFactoryProvider, QueryStateTimer queryStateTimer, WarningCollector warningCollector, String query, PrestoSparkQueryPlanner.PlanAndMore planAndMore, Optional<String> sparkQueueName, Codec<TaskInfo> taskInfoCodec, JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec, JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec, JsonCodec<PrestoSparkQueryData> queryDataJsonCodec, PrestoSparkRddFactory rddFactory, TransactionManager transactionManager, PagesSerde pagesSerde, PrestoSparkExecutionExceptionFactory executionExceptionFactory, Duration queryTimeout, long queryCompletionDeadline, PrestoSparkMetadataStorage metadataStorage, Optional<String> queryStatusInfoOutputLocation, Optional<String> queryDataOutputLocation, TempStorage tempStorage, NodeMemoryConfig nodeMemoryConfig, Set<PrestoSparkServiceWaitTimeMetrics> waitTimeMetrics, Optional<ErrorClassifier> errorClassifier, PrestoSparkPlanFragmenter planFragmenter, Metadata metadata, PartitioningProviderManager partitioningProviderManager, HistoryBasedPlanStatisticsTracker historyBasedPlanStatisticsTracker) {
        super(sparkContext, session, queryMonitor, taskInfoCollector, shuffleStatsCollector, taskExecutorFactory, taskExecutorFactoryProvider, queryStateTimer, warningCollector, query, planAndMore, sparkQueueName, taskInfoCodec, sparkTaskDescriptorJsonCodec, queryStatusInfoJsonCodec, queryDataJsonCodec, rddFactory, transactionManager, pagesSerde, executionExceptionFactory, queryTimeout, queryCompletionDeadline, metadataStorage, queryStatusInfoOutputLocation, queryDataOutputLocation, tempStorage, nodeMemoryConfig, waitTimeMetrics, errorClassifier, planFragmenter, metadata, partitioningProviderManager, historyBasedPlanStatisticsTracker);
    }

    @Override
    protected List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>> doExecute() throws SparkException, TimeoutException {
        SubPlan rootFragmentedPlan = this.createFragmentedPlan();
        if (this.planAndMore.getPhysicalResourceSettings().getMaxExecutorCount().isPresent()) {
            this.sparkContext.sc().conf().set("spark.dynamicAllocation.maxExecutors", Integer.toString(this.planAndMore.getPhysicalResourceSettings().getMaxExecutorCount().getAsInt()));
        }
        this.setFinalFragmentedPlan(rootFragmentedPlan);
        TableWriteInfo tableWriteInfo = this.getTableWriteInfo(this.session, rootFragmentedPlan);
        PlanFragment rootFragment = rootFragmentedPlan.getFragment();
        this.queryStateTimer.beginRunning();
        if (rootFragment.getPartitioning().equals((Object)SystemPartitioningHandle.COORDINATOR_DISTRIBUTION)) {
            PrestoSparkTaskDescriptor taskDescriptor = new PrestoSparkTaskDescriptor(this.session.toSessionRepresentation(), this.session.getIdentity().getExtraCredentials(), rootFragment, tableWriteInfo);
            SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor = new SerializedPrestoSparkTaskDescriptor(this.sparkTaskDescriptorJsonCodec.toJsonBytes((Object)taskDescriptor));
            HashMap<PlanFragmentId, RddAndMore<PrestoSparkSerializedPage>> inputRdds = new HashMap<PlanFragmentId, RddAndMore<PrestoSparkSerializedPage>>();
            for (SubPlan child : rootFragmentedPlan.getChildren()) {
                inputRdds.put(child.getFragment().getId(), this.createRdd(child, PrestoSparkSerializedPage.class, tableWriteInfo));
            }
            Map inputFutures = (Map)inputRdds.entrySet().stream().collect(ImmutableMap.toImmutableMap(entry -> ((PlanFragmentId)entry.getKey()).toString(), entry -> ((RddAndMore)entry.getValue()).getRdd().collectAsync()));
            PrestoSparkQueryExecutionFactory.waitForActionsCompletionWithTimeout(inputFutures.values(), PrestoSparkUtils.computeNextTimeout(this.queryCompletionDeadline), TimeUnit.MILLISECONDS, this.waitTimeMetrics);
            inputRdds = null;
            ImmutableMap.Builder inputs = ImmutableMap.builder();
            long totalNumberOfPagesReceived = 0L;
            long totalCompressedSizeInBytes = 0L;
            long totalUncompressedSizeInBytes = 0L;
            for (Map.Entry inputFuture : inputFutures.entrySet()) {
                ArrayList<PrestoSparkSerializedPage> pages = new ArrayList<PrestoSparkSerializedPage>();
                List tuples = (List)Futures.getUnchecked((Future)((Future)inputFuture.getValue()));
                long currentFragmentOutputCompressedSizeInBytes = 0L;
                long currentFragmentOutputUncompressedSizeInBytes = 0L;
                for (Tuple2 tuple : tuples) {
                    PrestoSparkSerializedPage page = (PrestoSparkSerializedPage)tuple._2;
                    currentFragmentOutputCompressedSizeInBytes += page.getSize();
                    currentFragmentOutputUncompressedSizeInBytes += (long)page.getUncompressedSizeInBytes();
                    pages.add(page);
                }
                log.info("Received %s pages from fragment %s. Compressed size: %s. Uncompressed size: %s.", new Object[]{pages.size(), inputFuture.getKey(), DataSize.succinctBytes((long)currentFragmentOutputCompressedSizeInBytes), DataSize.succinctBytes((long)currentFragmentOutputUncompressedSizeInBytes)});
                totalNumberOfPagesReceived += (long)pages.size();
                totalCompressedSizeInBytes += currentFragmentOutputCompressedSizeInBytes;
                totalUncompressedSizeInBytes += currentFragmentOutputUncompressedSizeInBytes;
                inputs.put(inputFuture.getKey(), pages);
            }
            log.info("Received %s pages in total. Compressed size: %s. Uncompressed size: %s.", new Object[]{totalNumberOfPagesReceived, DataSize.succinctBytes((long)totalCompressedSizeInBytes), DataSize.succinctBytes((long)totalUncompressedSizeInBytes)});
            IPrestoSparkTaskExecutor<PrestoSparkSerializedPage> prestoSparkTaskExecutor = this.taskExecutorFactory.create(0, 0, serializedTaskDescriptor, (Iterator<SerializedPrestoSparkTaskSource>)ScalaUtils.emptyScalaIterator(), new PrestoSparkTaskInputs((Map)ImmutableMap.of(), (Map)ImmutableMap.of(), (Map)inputs.build()), (CollectionAccumulator<SerializedTaskInfo>)this.taskInfoCollector, (CollectionAccumulator<PrestoSparkShuffleStats>)this.shuffleStatsCollector, PrestoSparkSerializedPage.class);
            return ScalaUtils.collectScalaIterator(prestoSparkTaskExecutor);
        }
        RddAndMore<PrestoSparkSerializedPage> rootRdd = this.createRdd(rootFragmentedPlan, PrestoSparkSerializedPage.class, tableWriteInfo);
        return rootRdd.collectAndDestroyDependenciesWithTimeout(PrestoSparkUtils.computeNextTimeout(this.queryCompletionDeadline), TimeUnit.MILLISECONDS, this.waitTimeMetrics);
    }

    @VisibleForTesting
    public SubPlan createFragmentedPlan() {
        SubPlan rootFragmentedPlan = this.planFragmenter.fragmentQueryPlan(this.session, this.planAndMore.getPlan(), this.warningCollector);
        this.queryMonitor.queryUpdatedEvent(PrestoSparkQueryExecutionFactory.createQueryInfo(this.session, this.query, QueryState.PLANNING, Optional.of(this.planAndMore), this.sparkQueueName, Optional.empty(), this.queryStateTimer, Optional.of(PrestoSparkQueryExecutionFactory.createStageInfo(this.session.getQueryId(), rootFragmentedPlan, (List<TaskInfo>)ImmutableList.of())), this.warningCollector));
        log.info(PlanPrinter.textDistributedPlan((SubPlan)rootFragmentedPlan, (FunctionAndTypeManager)this.metadata.getFunctionAndTypeManager(), (Session)this.session, (boolean)true));
        int hashPartitionCount = this.planAndMore.getPhysicalResourceSettings().getHashPartitionCount();
        rootFragmentedPlan = this.configureOutputPartitioning(this.session, rootFragmentedPlan, hashPartitionCount);
        return rootFragmentedPlan;
    }
}

