/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.json.Codec;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.GcMonitor;
import com.facebook.airlift.stats.TestingGcMonitor;
import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.event.SplitMonitor;
import com.facebook.presto.execution.MemoryRevokingSchedulerUtils;
import com.facebook.presto.execution.ScheduledSplit;
import com.facebook.presto.execution.StageExecutionId;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.TaskStateMachine;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.buffer.BufferState;
import com.facebook.presto.execution.buffer.OutputBufferInfo;
import com.facebook.presto.execution.buffer.OutputBufferMemoryManager;
import com.facebook.presto.execution.executor.TaskExecutor;
import com.facebook.presto.memory.MemoryPool;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.memory.QueryContext;
import com.facebook.presto.memory.QueryContextVisitor;
import com.facebook.presto.memory.TraversingQueryContextVisitor;
import com.facebook.presto.memory.VoidTraversingQueryContextVisitor;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.MetadataUpdates;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.OutputFactory;
import com.facebook.presto.operator.PartitionFunction;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.TaskMemoryReservationSummary;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.spark.PrestoSparkAuthenticatorProvider;
import com.facebook.presto.spark.PrestoSparkConfig;
import com.facebook.presto.spark.PrestoSparkSessionProperties;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutor;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutorFactory;
import com.facebook.presto.spark.classloader_interface.MutablePartitionId;
import com.facebook.presto.spark.classloader_interface.PrestoSparkMutableRow;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats;
import com.facebook.presto.spark.classloader_interface.PrestoSparkStorageHandle;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskSource;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.execution.PrestoSparkBroadcastTableCacheManager;
import com.facebook.presto.spark.execution.PrestoSparkBufferedSerializedPage;
import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory;
import com.facebook.presto.spark.execution.PrestoSparkOutputBuffer;
import com.facebook.presto.spark.execution.PrestoSparkPageOutputOperator;
import com.facebook.presto.spark.execution.PrestoSparkRemoteSourceFactory;
import com.facebook.presto.spark.execution.PrestoSparkRowBatch;
import com.facebook.presto.spark.execution.PrestoSparkRowOutputOperator;
import com.facebook.presto.spark.execution.PrestoSparkShuffleInput;
import com.facebook.presto.spark.execution.PrestoSparkTaskExecution;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.page.PageDataOutput;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.storage.TempDataOperationContext;
import com.facebook.presto.spi.storage.TempDataSink;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.spi.storage.TempStorageHandle;
import com.facebook.presto.spiller.NodeSpillConfig;
import com.facebook.presto.spiller.SpillSpaceTracker;
import com.facebook.presto.sql.planner.LocalExecutionPlanner;
import com.facebook.presto.sql.planner.OutputPartitioning;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.RemoteSourceFactory;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.planPrinter.PlanPrinter;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.util.Failures;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.CRC32;
import javax.inject.Inject;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.CollectionAccumulator;
import org.joda.time.DateTime;
import scala.Tuple2;
import scala.collection.AbstractIterator;
import scala.collection.Iterator;

public class PrestoSparkTaskExecutorFactory
implements IPrestoSparkTaskExecutorFactory {
    private static final Logger log = Logger.get(PrestoSparkTaskExecutorFactory.class);
    private final SessionPropertyManager sessionPropertyManager;
    private final BlockEncodingManager blockEncodingManager;
    private final FunctionAndTypeManager functionAndTypeManager;
    private final JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec;
    private final Codec<TaskSource> taskSourceCodec;
    private final Codec<TaskInfo> taskInfoCodec;
    private final JsonCodec<List<TaskMemoryReservationSummary>> memoryReservationSummaryJsonCodec;
    private final Executor notificationExecutor;
    private final ScheduledExecutorService yieldExecutor;
    private final ScheduledExecutorService memoryUpdateExecutor;
    private final ExecutorService memoryRevocationExecutor;
    private final LocalExecutionPlanner localExecutionPlanner;
    private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
    private final TaskExecutor taskExecutor;
    private final SplitMonitor splitMonitor;
    private final Set<PrestoSparkAuthenticatorProvider> authenticatorProviders;
    private final NodeMemoryConfig nodeMemoryConfig;
    private final DataSize maxRevocableMemory;
    private final DataSize maxQuerySpillPerNode;
    private final DataSize sinkMaxBufferSize;
    private final boolean perOperatorCpuTimerEnabled;
    private final boolean cpuTimerEnabled;
    private final boolean perOperatorAllocationTrackingEnabled;
    private final boolean allocationTrackingEnabled;
    private final TempStorageManager tempStorageManager;
    private final PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager;
    private final String storageBasedBroadcastJoinStorage;
    private final AtomicBoolean memoryRevokePending = new AtomicBoolean();
    private final AtomicBoolean memoryRevokeRequestInProgress = new AtomicBoolean();

    @Inject
    public PrestoSparkTaskExecutorFactory(SessionPropertyManager sessionPropertyManager, BlockEncodingManager blockEncodingManager, FunctionAndTypeManager functionAndTypeManager, JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec, Codec<TaskSource> taskSourceCodec, Codec<TaskInfo> taskInfoCodec, JsonCodec<List<TaskMemoryReservationSummary>> memoryReservationSummaryJsonCodec, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, ScheduledExecutorService memoryUpdateExecutor, ExecutorService memoryRevocationExecutor, LocalExecutionPlanner localExecutionPlanner, PrestoSparkExecutionExceptionFactory executionExceptionFactory, TaskExecutor taskExecutor, SplitMonitor splitMonitor, Set<PrestoSparkAuthenticatorProvider> authenticatorProviders, TaskManagerConfig taskManagerConfig, NodeMemoryConfig nodeMemoryConfig, NodeSpillConfig nodeSpillConfig, TempStorageManager tempStorageManager, PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager, PrestoSparkConfig prestoSparkConfig) {
        this(sessionPropertyManager, blockEncodingManager, functionAndTypeManager, taskDescriptorJsonCodec, taskSourceCodec, taskInfoCodec, memoryReservationSummaryJsonCodec, notificationExecutor, yieldExecutor, memoryUpdateExecutor, memoryRevocationExecutor, localExecutionPlanner, executionExceptionFactory, taskExecutor, splitMonitor, authenticatorProviders, nodeMemoryConfig, Objects.requireNonNull(nodeSpillConfig, "nodeSpillConfig is null").getMaxRevocableMemoryPerNode(), Objects.requireNonNull(nodeSpillConfig, "nodeSpillConfig is null").getQueryMaxSpillPerNode(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").getSinkMaxBufferSize(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isPerOperatorCpuTimerEnabled(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isTaskCpuTimerEnabled(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isPerOperatorAllocationTrackingEnabled(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isTaskAllocationTrackingEnabled(), tempStorageManager, Objects.requireNonNull(prestoSparkConfig, "prestoSparkConfig is null").getStorageBasedBroadcastJoinStorage(), prestoSparkBroadcastTableCacheManager);
    }

    public PrestoSparkTaskExecutorFactory(SessionPropertyManager sessionPropertyManager, BlockEncodingManager blockEncodingManager, FunctionAndTypeManager functionAndTypeManager, JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec, Codec<TaskSource> taskSourceCodec, Codec<TaskInfo> taskInfoCodec, JsonCodec<List<TaskMemoryReservationSummary>> memoryReservationSummaryJsonCodec, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, ScheduledExecutorService memoryUpdateExecutor, ExecutorService memoryRevocationExecutor, LocalExecutionPlanner localExecutionPlanner, PrestoSparkExecutionExceptionFactory executionExceptionFactory, TaskExecutor taskExecutor, SplitMonitor splitMonitor, Set<PrestoSparkAuthenticatorProvider> authenticatorProviders, NodeMemoryConfig nodeMemoryConfig, DataSize maxRevocableMemory, DataSize maxQuerySpillPerNode, DataSize sinkMaxBufferSize, boolean perOperatorCpuTimerEnabled, boolean cpuTimerEnabled, boolean perOperatorAllocationTrackingEnabled, boolean allocationTrackingEnabled, TempStorageManager tempStorageManager, String storageBasedBroadcastJoinStorage, PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager) {
        this.sessionPropertyManager = Objects.requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
        this.blockEncodingManager = Objects.requireNonNull(blockEncodingManager, "blockEncodingManager is null");
        this.functionAndTypeManager = Objects.requireNonNull(functionAndTypeManager, "functionManager is null");
        this.taskDescriptorJsonCodec = Objects.requireNonNull(taskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null");
        this.taskSourceCodec = Objects.requireNonNull(taskSourceCodec, "taskSourceCodec is null");
        this.taskInfoCodec = Objects.requireNonNull(taskInfoCodec, "taskInfoCodec is null");
        this.memoryReservationSummaryJsonCodec = Objects.requireNonNull(memoryReservationSummaryJsonCodec, "memoryReservationSummaryJsonCodec is null");
        this.notificationExecutor = Objects.requireNonNull(notificationExecutor, "notificationExecutor is null");
        this.yieldExecutor = Objects.requireNonNull(yieldExecutor, "yieldExecutor is null");
        this.memoryUpdateExecutor = Objects.requireNonNull(memoryUpdateExecutor, "memoryUpdateExecutor is null");
        this.memoryRevocationExecutor = Objects.requireNonNull(memoryRevocationExecutor, "memoryRevocationExecutor is null");
        this.localExecutionPlanner = Objects.requireNonNull(localExecutionPlanner, "localExecutionPlanner is null");
        this.executionExceptionFactory = Objects.requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
        this.taskExecutor = Objects.requireNonNull(taskExecutor, "taskExecutor is null");
        this.splitMonitor = Objects.requireNonNull(splitMonitor, "splitMonitor is null");
        this.authenticatorProviders = ImmutableSet.copyOf((Collection)Objects.requireNonNull(authenticatorProviders, "authenticatorProviders is null"));
        this.nodeMemoryConfig = Objects.requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null");
        this.maxRevocableMemory = Objects.requireNonNull(maxRevocableMemory, "maxRevocableMemory is null");
        this.maxQuerySpillPerNode = Objects.requireNonNull(maxQuerySpillPerNode, "maxQuerySpillPerNode is null");
        this.sinkMaxBufferSize = Objects.requireNonNull(sinkMaxBufferSize, "sinkMaxBufferSize is null");
        this.perOperatorCpuTimerEnabled = perOperatorCpuTimerEnabled;
        this.cpuTimerEnabled = cpuTimerEnabled;
        this.perOperatorAllocationTrackingEnabled = perOperatorAllocationTrackingEnabled;
        this.allocationTrackingEnabled = allocationTrackingEnabled;
        this.tempStorageManager = Objects.requireNonNull(tempStorageManager, "tempStorageManager is null");
        this.storageBasedBroadcastJoinStorage = Objects.requireNonNull(storageBasedBroadcastJoinStorage, "storageBasedBroadcastJoinStorage is null");
        this.prestoSparkBroadcastTableCacheManager = Objects.requireNonNull(prestoSparkBroadcastTableCacheManager, "prestoSparkBroadcastTableCacheManager is null");
    }

    public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> create(int partitionId, int attemptNumber, SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, Iterator<SerializedPrestoSparkTaskSource> serializedTaskSources, PrestoSparkTaskInputs inputs, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, Class<T> outputType) {
        try {
            return this.doCreate(partitionId, attemptNumber, serializedTaskDescriptor, serializedTaskSources, inputs, taskInfoCollector, shuffleStatsCollector, outputType);
        }
        catch (RuntimeException e) {
            throw this.executionExceptionFactory.toPrestoSparkExecutionException(e);
        }
    }

    public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> doCreate(int partitionId, int attemptNumber, SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, Iterator<SerializedPrestoSparkTaskSource> serializedTaskSources, PrestoSparkTaskInputs inputs, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, Class<T> outputType) {
        PrestoSparkTaskDescriptor taskDescriptor = (PrestoSparkTaskDescriptor)this.taskDescriptorJsonCodec.fromJson(serializedTaskDescriptor.getBytes());
        ImmutableMap.Builder extraAuthenticators = ImmutableMap.builder();
        this.authenticatorProviders.forEach(provider -> extraAuthenticators.putAll(provider.getTokenAuthenticators()));
        Session session = taskDescriptor.getSession().toSession(this.sessionPropertyManager, taskDescriptor.getExtraCredentials(), (Map)extraAuthenticators.build());
        PlanFragment fragment = taskDescriptor.getFragment();
        StageId stageId = new StageId(session.getQueryId(), fragment.getId().getId());
        this.prestoSparkBroadcastTableCacheManager.removeCachedTablesForStagesOtherThan(stageId);
        TaskId taskId = new TaskId(new StageExecutionId(stageId, 0), partitionId);
        List<TaskSource> taskSources = this.getTaskSources(serializedTaskSources);
        log.info("Task [%s] received %d splits.", new Object[]{taskId, taskSources.stream().mapToInt(taskSource -> taskSource.getSplits().size()).sum()});
        OptionalLong totalSplitSize = PrestoSparkTaskExecutorFactory.computeAllSplitsSize(taskSources);
        if (totalSplitSize.isPresent()) {
            log.info("Total split size: %s bytes.", new Object[]{totalSplitSize.getAsLong()});
        }
        log.info(PlanPrinter.textPlanFragment((PlanFragment)fragment, (FunctionAndTypeManager)this.functionAndTypeManager, (Session)session, (boolean)true));
        DataSize maxUserMemory = new DataSize((double)Math.min(this.nodeMemoryConfig.getMaxQueryMemoryPerNode().toBytes(), SystemSessionProperties.getQueryMaxMemoryPerNode((Session)session).toBytes()), DataSize.Unit.BYTE);
        DataSize maxTotalMemory = new DataSize((double)Math.min(this.nodeMemoryConfig.getMaxQueryTotalMemoryPerNode().toBytes(), SystemSessionProperties.getQueryMaxTotalMemoryPerNode((Session)session).toBytes()), DataSize.Unit.BYTE);
        DataSize maxBroadcastMemory = PrestoSparkSessionProperties.getSparkBroadcastJoinMaxMemoryOverride(session);
        if (maxBroadcastMemory == null) {
            maxBroadcastMemory = new DataSize((double)Math.min(this.nodeMemoryConfig.getMaxQueryBroadcastMemory().toBytes(), SystemSessionProperties.getQueryMaxBroadcastMemory((Session)session).toBytes()), DataSize.Unit.BYTE);
        }
        MemoryPool memoryPool = new MemoryPool(new MemoryPoolId("spark-executor-memory-pool"), maxTotalMemory);
        SpillSpaceTracker spillSpaceTracker = new SpillSpaceTracker(this.maxQuerySpillPerNode);
        QueryContext queryContext = new QueryContext(session.getQueryId(), maxUserMemory, maxTotalMemory, maxBroadcastMemory, this.maxRevocableMemory, memoryPool, (GcMonitor)new TestingGcMonitor(), this.notificationExecutor, this.yieldExecutor, this.maxQuerySpillPerNode, spillSpaceTracker, this.memoryReservationSummaryJsonCodec);
        queryContext.setVerboseExceededMemoryLimitErrorsEnabled(SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled((Session)session));
        queryContext.setHeapDumpOnExceededMemoryLimitEnabled(SystemSessionProperties.isHeapDumpOnExceededMemoryLimitEnabled((Session)session).booleanValue());
        String heapDumpFilePath = Paths.get(SystemSessionProperties.getHeapDumpFileDirectory((Session)session), String.format("%s_%s.hprof", session.getQueryId().getId(), stageId.getId())).toString();
        queryContext.setHeapDumpFilePath(heapDumpFilePath);
        TaskStateMachine taskStateMachine = new TaskStateMachine(taskId, this.notificationExecutor);
        TaskContext taskContext = queryContext.addTaskContext(taskStateMachine, session, SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled((Session)session) ? Optional.of(fragment.getRoot()) : Optional.empty(), this.perOperatorCpuTimerEnabled, this.cpuTimerEnabled, this.perOperatorAllocationTrackingEnabled, this.allocationTrackingEnabled, false);
        double memoryRevokingThreshold = PrestoSparkSessionProperties.getMemoryRevokingThreshold(session);
        double memoryRevokingTarget = PrestoSparkSessionProperties.getMemoryRevokingTarget(session);
        Preconditions.checkArgument((memoryRevokingTarget <= memoryRevokingThreshold ? 1 : 0) != 0, (String)"memoryRevokingTarget should be less than or equal memoryRevokingThreshold, but got %s and %s respectively", (Object)memoryRevokingTarget, (Object)memoryRevokingThreshold);
        if (SystemSessionProperties.isSpillEnabled((Session)session)) {
            memoryPool.addListener((pool, queryId, totalMemoryReservationBytes) -> {
                long totalReservedMemory;
                if (totalMemoryReservationBytes > queryContext.getPeakNodeTotalMemory()) {
                    queryContext.setPeakNodeTotalMemory(totalMemoryReservationBytes);
                }
                if ((double)totalMemoryReservationBytes > (double)pool.getMaxBytes() * memoryRevokingThreshold && this.memoryRevokeRequestInProgress.compareAndSet(false, true)) {
                    this.memoryRevocationExecutor.execute(() -> {
                        try {
                            AtomicLong remainingBytesToRevoke = new AtomicLong(totalMemoryReservationBytes - (long)(memoryRevokingTarget * (double)pool.getMaxBytes()));
                            remainingBytesToRevoke.addAndGet(-MemoryRevokingSchedulerUtils.getMemoryAlreadyBeingRevoked((Collection)ImmutableList.of((Object)taskContext), (long)remainingBytesToRevoke.get()));
                            taskContext.accept((QueryContextVisitor)new VoidTraversingQueryContextVisitor<AtomicLong>(){

                                public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong remainingBytesToRevoke) {
                                    long revokedBytes;
                                    if (remainingBytesToRevoke.get() > 0L && (revokedBytes = operatorContext.requestMemoryRevoking()) > 0L) {
                                        PrestoSparkTaskExecutorFactory.this.memoryRevokePending.set(true);
                                        remainingBytesToRevoke.addAndGet(-revokedBytes);
                                    }
                                    return null;
                                }
                            }, (Object)remainingBytesToRevoke);
                            this.memoryRevokeRequestInProgress.set(false);
                        }
                        catch (Exception e) {
                            log.error((Throwable)e, "Error requesting memory revoking");
                        }
                    });
                }
                if ((totalReservedMemory = pool.getQueryMemoryReservation(queryId) + pool.getQueryRevocableMemoryReservation(queryId)) > maxTotalMemory.toBytes() && !this.memoryRevokeRequestInProgress.get() && !this.isMemoryRevokePending(taskContext)) {
                    throw ExceededMemoryLimitException.exceededLocalTotalMemoryLimit((DataSize)maxTotalMemory, (String)(queryContext.getAdditionalFailureInfo(totalReservedMemory, 0L) + String.format("Total reserved memory: %s, Total revocable memory: %s", DataSize.succinctBytes((long)pool.getQueryMemoryReservation(queryId)), DataSize.succinctBytes((long)pool.getQueryRevocableMemoryReservation(queryId)))), (boolean)SystemSessionProperties.isHeapDumpOnExceededMemoryLimitEnabled((Session)session), Optional.ofNullable(heapDumpFilePath));
                }
            });
        }
        ImmutableMap.Builder shuffleInputs = ImmutableMap.builder();
        ImmutableMap.Builder pageInputs = ImmutableMap.builder();
        ImmutableMap.Builder broadcastInputs = ImmutableMap.builder();
        for (RemoteSourceNode remoteSource : fragment.getRemoteSourceNodes()) {
            ArrayList<PrestoSparkShuffleInput> remoteSourceRowInputs = new ArrayList<PrestoSparkShuffleInput>();
            ArrayList remoteSourcePageInputs = new ArrayList();
            ArrayList<List> broadcastInputsList = new ArrayList<List>();
            for (PlanFragmentId sourceFragmentId : remoteSource.getSourceFragmentIds()) {
                Iterator shuffleInput = (Iterator)inputs.getShuffleInputs().get(sourceFragmentId.toString());
                Broadcast broadcastInput = (Broadcast)inputs.getBroadcastInputs().get(sourceFragmentId.toString());
                List inMemoryInput = (List)inputs.getInMemoryInputs().get(sourceFragmentId.toString());
                if (shuffleInput != null) {
                    Preconditions.checkArgument((broadcastInput == null ? 1 : 0) != 0, (Object)"single remote source is not expected to accept different kind of inputs");
                    Preconditions.checkArgument((inMemoryInput == null ? 1 : 0) != 0, (Object)"single remote source is not expected to accept different kind of inputs");
                    remoteSourceRowInputs.add(new PrestoSparkShuffleInput(sourceFragmentId.getId(), (Iterator<Tuple2<MutablePartitionId, PrestoSparkMutableRow>>)shuffleInput));
                    continue;
                }
                if (broadcastInput != null) {
                    Preconditions.checkArgument((inMemoryInput == null ? 1 : 0) != 0, (Object)"single remote source is not expected to accept different kind of inputs");
                    broadcastInputsList.add((List)broadcastInput.value());
                    continue;
                }
                if (inMemoryInput != null) {
                    remoteSourcePageInputs.add(PrestoSparkUtils.getNullifyingIterator(inMemoryInput));
                    continue;
                }
                throw new IllegalArgumentException("Input not found for sourceFragmentId: " + sourceFragmentId);
            }
            if (!remoteSourceRowInputs.isEmpty()) {
                shuffleInputs.put((Object)remoteSource.getId(), remoteSourceRowInputs);
            }
            if (!remoteSourcePageInputs.isEmpty()) {
                pageInputs.put((Object)remoteSource.getId(), remoteSourcePageInputs);
            }
            if (broadcastInputsList.isEmpty()) continue;
            broadcastInputs.put((Object)remoteSource.getId(), broadcastInputsList);
        }
        OutputBufferMemoryManager memoryManager = new OutputBufferMemoryManager(this.sinkMaxBufferSize.toBytes(), () -> queryContext.getTaskContextByTaskId(taskId).localSystemMemoryContext(), this.notificationExecutor);
        Optional<OutputPartitioning> preDeterminedPartition = Optional.empty();
        if (fragment.getPartitioningScheme().getPartitioning().getHandle().equals((Object)SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION)) {
            int partitionCount = SystemSessionProperties.getHashPartitionCount((Session)session);
            preDeterminedPartition = Optional.of(new OutputPartitioning((PartitionFunction)new PrestoSparkRowOutputOperator.PreDeterminedPartitionFunction(partitionId % partitionCount, partitionCount), (List)ImmutableList.of(), (List)ImmutableList.of(), false, OptionalInt.empty()));
        }
        TempDataOperationContext tempDataOperationContext = new TempDataOperationContext(session.getSource(), session.getQueryId().getId(), session.getClientInfo(), Optional.of(session.getClientTags()), session.getIdentity());
        TempStorage tempStorage = this.tempStorageManager.getTempStorage(this.storageBasedBroadcastJoinStorage);
        Output<T> output = PrestoSparkTaskExecutorFactory.configureOutput(outputType, this.blockEncodingManager, memoryManager, PrestoSparkSessionProperties.getShuffleOutputTargetAverageRowSize(session), preDeterminedPartition, tempStorage, tempDataOperationContext, PrestoSparkSessionProperties.getStorageBasedBroadcastJoinWriteBufferSize(session));
        PrestoSparkOutputBuffer<?> outputBuffer = output.getOutputBuffer();
        LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = this.localExecutionPlanner.plan(taskContext, fragment.getRoot(), fragment.getPartitioningScheme(), fragment.getStageExecutionDescriptor(), fragment.getTableScanSchedulingOrder(), output.getOutputFactory(), (RemoteSourceFactory)new PrestoSparkRemoteSourceFactory(this.blockEncodingManager, (Map<PlanNodeId, List<PrestoSparkShuffleInput>>)shuffleInputs.build(), (Map<PlanNodeId, List<java.util.Iterator<PrestoSparkSerializedPage>>>)pageInputs.build(), (Map<PlanNodeId, List<?>>)broadcastInputs.build(), partitionId, shuffleStatsCollector, tempStorage, tempDataOperationContext, this.prestoSparkBroadcastTableCacheManager, stageId), taskDescriptor.getTableWriteInfo(), true);
        taskStateMachine.addStateChangeListener(state -> {
            if (state.isDone()) {
                outputBuffer.setNoMoreRows();
            }
        });
        PrestoSparkTaskExecution taskExecution = new PrestoSparkTaskExecution(taskStateMachine, taskContext, localExecutionPlan, this.taskExecutor, this.splitMonitor, this.notificationExecutor, this.memoryUpdateExecutor);
        taskExecution.start(taskSources);
        return new PrestoSparkTaskExecutor(taskContext, taskStateMachine, output.getOutputSupplier(), this.taskInfoCodec, taskInfoCollector, shuffleStatsCollector, this.executionExceptionFactory, output.getOutputBufferType(), outputBuffer, tempStorage, tempDataOperationContext);
    }

    public boolean isMemoryRevokePending(TaskContext taskContext) {
        TraversingQueryContextVisitor<Void, Boolean> visitor = new TraversingQueryContextVisitor<Void, Boolean>(){

            public Boolean visitOperatorContext(OperatorContext operatorContext, Void context) {
                return operatorContext.isMemoryRevokingRequested();
            }

            public Boolean mergeResults(List<Boolean> childrenResults) {
                return childrenResults.contains(true);
            }
        };
        this.memoryRevocationExecutor.execute(() -> this.lambda$isMemoryRevokePending$6(taskContext, (TraversingQueryContextVisitor)visitor));
        return this.memoryRevokePending.get();
    }

    private static OptionalLong computeAllSplitsSize(List<TaskSource> taskSources) {
        long sum = 0L;
        for (TaskSource taskSource : taskSources) {
            for (ScheduledSplit scheduledSplit : taskSource.getSplits()) {
                ConnectorSplit connectorSplit = scheduledSplit.getSplit().getConnectorSplit();
                if (!connectorSplit.getSplitSizeInBytes().isPresent()) {
                    return OptionalLong.empty();
                }
                sum += connectorSplit.getSplitSizeInBytes().getAsLong();
            }
        }
        return OptionalLong.of(sum);
    }

    private List<TaskSource> getTaskSources(Iterator<SerializedPrestoSparkTaskSource> serializedTaskSources) {
        long totalSerializedSizeInBytes = 0L;
        ImmutableList.Builder result = ImmutableList.builder();
        while (serializedTaskSources.hasNext()) {
            SerializedPrestoSparkTaskSource serializedTaskSource = (SerializedPrestoSparkTaskSource)serializedTaskSources.next();
            totalSerializedSizeInBytes += (long)serializedTaskSource.getBytes().length;
            result.add((Object)PrestoSparkUtils.deserializeZstdCompressed(this.taskSourceCodec, serializedTaskSource.getBytes()));
        }
        log.info("Total serialized size of all task sources: %s", new Object[]{DataSize.succinctBytes((long)totalSerializedSizeInBytes)});
        return result.build();
    }

    private static <T extends PrestoSparkTaskOutput> Output<T> configureOutput(Class<T> outputType, BlockEncodingManager blockEncodingManager, OutputBufferMemoryManager memoryManager, DataSize targetAverageRowSize, Optional<OutputPartitioning> preDeterminedPartition, TempStorage tempStorage, TempDataOperationContext tempDataOperationContext, DataSize writeBufferSize) {
        if (outputType.equals(PrestoSparkMutableRow.class)) {
            PrestoSparkOutputBuffer<PrestoSparkRowBatch> outputBuffer = new PrestoSparkOutputBuffer<PrestoSparkRowBatch>(memoryManager);
            PrestoSparkRowOutputOperator.PrestoSparkRowOutputFactory outputFactory = new PrestoSparkRowOutputOperator.PrestoSparkRowOutputFactory(outputBuffer, targetAverageRowSize, preDeterminedPartition);
            RowOutputSupplier outputSupplier = new RowOutputSupplier(outputBuffer);
            return new Output(OutputBufferType.SPARK_ROW_OUTPUT_BUFFER, outputBuffer, outputFactory, outputSupplier);
        }
        if (outputType.equals(PrestoSparkSerializedPage.class)) {
            PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> outputBuffer = new PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage>(memoryManager);
            PrestoSparkPageOutputOperator.PrestoSparkPageOutputFactory outputFactory = new PrestoSparkPageOutputOperator.PrestoSparkPageOutputFactory(outputBuffer, blockEncodingManager);
            PageOutputSupplier outputSupplier = new PageOutputSupplier(outputBuffer);
            return new Output(OutputBufferType.SPARK_PAGE_OUTPUT_BUFFER, outputBuffer, outputFactory, outputSupplier);
        }
        if (outputType.equals(PrestoSparkStorageHandle.class)) {
            PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> outputBuffer = new PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage>(memoryManager);
            PrestoSparkPageOutputOperator.PrestoSparkPageOutputFactory outputFactory = new PrestoSparkPageOutputOperator.PrestoSparkPageOutputFactory(outputBuffer, blockEncodingManager);
            DiskPageOutputSupplier outputSupplier = new DiskPageOutputSupplier(outputBuffer, tempStorage, tempDataOperationContext, writeBufferSize);
            return new Output(OutputBufferType.SPARK_DISK_PAGE_OUTPUT_BUFFER, outputBuffer, outputFactory, outputSupplier);
        }
        throw new IllegalArgumentException("Unexpected output type: " + outputType.getName());
    }

    private /* synthetic */ void lambda$isMemoryRevokePending$6(TaskContext taskContext, TraversingQueryContextVisitor visitor) {
        this.memoryRevokePending.set((Boolean)taskContext.accept((QueryContextVisitor)visitor, null));
    }

    private static enum OutputBufferType {
        SPARK_ROW_OUTPUT_BUFFER,
        SPARK_PAGE_OUTPUT_BUFFER,
        SPARK_DISK_PAGE_OUTPUT_BUFFER;

    }

    private static class DiskPageOutputSupplier
    implements OutputSupplier<PrestoSparkStorageHandle> {
        private static final MutablePartitionId DEFAULT_PARTITION = new MutablePartitionId();
        private final PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> outputBuffer;
        private final TempStorage tempStorage;
        private final TempDataOperationContext tempDataOperationContext;
        private final long writeBufferSizeInBytes;
        private TempDataSink tempDataSink;
        private long timeSpentWaitingForOutputInMillis;

        private DiskPageOutputSupplier(PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> outputBuffer, TempStorage tempStorage, TempDataOperationContext tempDataOperationContext, DataSize writeBufferSize) {
            this.outputBuffer = Objects.requireNonNull(outputBuffer, "outputBuffer is null");
            this.tempStorage = Objects.requireNonNull(tempStorage, "tempStorage is null");
            this.tempDataOperationContext = Objects.requireNonNull(tempDataOperationContext, "tempDataOperationContext is null");
            this.writeBufferSizeInBytes = Objects.requireNonNull(writeBufferSize, "writeBufferSize is null").toBytes();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Tuple2<MutablePartitionId, PrestoSparkStorageHandle> getNext() throws InterruptedException {
            long start = System.currentTimeMillis();
            PrestoSparkBufferedSerializedPage page = this.outputBuffer.get();
            if (page == null) {
                return null;
            }
            long compressedBroadcastSizeInBytes = 0L;
            long uncompressedBroadcastSizeInBytes = 0L;
            long deserializedBroadcastRetainedSizeInBytes = 0L;
            int positionCount = 0;
            CRC32 checksum = new CRC32();
            IOException ioException = null;
            try {
                this.tempDataSink = this.tempStorage.create(this.tempDataOperationContext);
                ArrayList<PageDataOutput> bufferedPages = new ArrayList<PageDataOutput>();
                long bufferedBytes = 0L;
                while (page != null) {
                    PageDataOutput pageDataOutput = new PageDataOutput(page.getSerializedPage());
                    long writtenSize = pageDataOutput.size();
                    if (this.writeBufferSizeInBytes - bufferedBytes < writtenSize) {
                        this.tempDataSink.write(bufferedPages);
                        bufferedPages.clear();
                        bufferedBytes = 0L;
                    }
                    bufferedPages.add(pageDataOutput);
                    bufferedBytes += writtenSize;
                    compressedBroadcastSizeInBytes += (long)page.getSerializedPage().getSizeInBytes();
                    uncompressedBroadcastSizeInBytes += (long)page.getSerializedPage().getUncompressedSizeInBytes();
                    deserializedBroadcastRetainedSizeInBytes += page.getDeserializedRetainedSizeInBytes();
                    positionCount += page.getPositionCount();
                    Slice slice = page.getSerializedPage().getSlice();
                    checksum.update(slice.byteArray(), slice.byteArrayOffset(), slice.length());
                    page = this.outputBuffer.get();
                }
                if (!bufferedPages.isEmpty()) {
                    this.tempDataSink.write(bufferedPages);
                    bufferedPages.clear();
                }
                TempStorageHandle tempStorageHandle = this.tempDataSink.commit();
                log.info("Created broadcast spill file: " + tempStorageHandle.toString() + " deserialized size: " + deserializedBroadcastRetainedSizeInBytes);
                PrestoSparkStorageHandle prestoSparkStorageHandle = new PrestoSparkStorageHandle(this.tempStorage.serializeHandle(tempStorageHandle), uncompressedBroadcastSizeInBytes, compressedBroadcastSizeInBytes, deserializedBroadcastRetainedSizeInBytes, checksum.getValue(), positionCount);
                long end = System.currentTimeMillis();
                this.timeSpentWaitingForOutputInMillis += end - start;
                Tuple2 tuple2 = new Tuple2((Object)DEFAULT_PARTITION, (Object)prestoSparkStorageHandle);
                return tuple2;
            }
            catch (IOException e) {
                if (ioException == null) {
                    ioException = e;
                }
                try {
                    if (this.tempDataSink != null) {
                        this.tempDataSink.rollback();
                    }
                }
                catch (IOException exception) {
                    if (ioException != exception) {
                        ioException.addSuppressed(exception);
                    }
                }
            }
            finally {
                try {
                    if (this.tempDataSink != null) {
                        this.tempDataSink.close();
                    }
                }
                catch (IOException e) {
                    if (ioException == null) {
                        ioException = e;
                    } else if (ioException != e) {
                        ioException.addSuppressed(e);
                    }
                    throw new UncheckedIOException("Unable to dump data to disk: ", ioException);
                }
            }
            throw new UncheckedIOException("Unable to dump data to disk: ", ioException);
        }

        @Override
        public long getTimeSpentWaitingForOutputInMillis() {
            return this.timeSpentWaitingForOutputInMillis;
        }
    }

    private static class PageOutputSupplier
    implements OutputSupplier<PrestoSparkSerializedPage> {
        private static final MutablePartitionId DEFAULT_PARTITION = new MutablePartitionId();
        private final PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> outputBuffer;
        private long timeSpentWaitingForOutputInMillis;

        private PageOutputSupplier(PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> outputBuffer) {
            this.outputBuffer = Objects.requireNonNull(outputBuffer, "outputBuffer is null");
        }

        @Override
        public Tuple2<MutablePartitionId, PrestoSparkSerializedPage> getNext() throws InterruptedException {
            long start = System.currentTimeMillis();
            PrestoSparkBufferedSerializedPage page = this.outputBuffer.get();
            long end = System.currentTimeMillis();
            this.timeSpentWaitingForOutputInMillis += end - start;
            if (page == null) {
                return null;
            }
            return new Tuple2((Object)DEFAULT_PARTITION, (Object)PrestoSparkUtils.toPrestoSparkSerializedPage(page.getSerializedPage()));
        }

        @Override
        public long getTimeSpentWaitingForOutputInMillis() {
            return this.timeSpentWaitingForOutputInMillis;
        }
    }

    private static class RowOutputSupplier
    implements OutputSupplier<PrestoSparkMutableRow> {
        private final PrestoSparkOutputBuffer<PrestoSparkRowBatch> outputBuffer;
        private PrestoSparkRowBatch.RowTupleSupplier currentRowTupleSupplier;
        private long timeSpentWaitingForOutputInMillis;

        private RowOutputSupplier(PrestoSparkOutputBuffer<PrestoSparkRowBatch> outputBuffer) {
            this.outputBuffer = Objects.requireNonNull(outputBuffer, "outputBuffer is null");
        }

        @Override
        public Tuple2<MutablePartitionId, PrestoSparkMutableRow> getNext() throws InterruptedException {
            Tuple2<MutablePartitionId, PrestoSparkMutableRow> next = null;
            while (next == null) {
                if (this.currentRowTupleSupplier == null) {
                    long start = System.currentTimeMillis();
                    PrestoSparkRowBatch rowBatch = this.outputBuffer.get();
                    long end = System.currentTimeMillis();
                    this.timeSpentWaitingForOutputInMillis += end - start;
                    if (rowBatch == null) {
                        return null;
                    }
                    this.currentRowTupleSupplier = rowBatch.createRowTupleSupplier();
                }
                if ((next = this.currentRowTupleSupplier.getNext()) != null) continue;
                this.currentRowTupleSupplier = null;
            }
            return next;
        }

        @Override
        public long getTimeSpentWaitingForOutputInMillis() {
            return this.timeSpentWaitingForOutputInMillis;
        }
    }

    private static interface OutputSupplier<T extends PrestoSparkTaskOutput> {
        public Tuple2<MutablePartitionId, T> getNext() throws InterruptedException;

        public long getTimeSpentWaitingForOutputInMillis();
    }

    private static class Output<T extends PrestoSparkTaskOutput> {
        private final OutputBufferType outputBufferType;
        private final PrestoSparkOutputBuffer<?> outputBuffer;
        private final OutputFactory outputFactory;
        private final OutputSupplier<T> outputSupplier;

        private Output(OutputBufferType outputBufferType, PrestoSparkOutputBuffer<?> outputBuffer, OutputFactory outputFactory, OutputSupplier<T> outputSupplier) {
            this.outputBufferType = Objects.requireNonNull(outputBufferType, "outputBufferType is null");
            this.outputBuffer = Objects.requireNonNull(outputBuffer, "outputBuffer is null");
            this.outputFactory = Objects.requireNonNull(outputFactory, "outputFactory is null");
            this.outputSupplier = Objects.requireNonNull(outputSupplier, "outputSupplier is null");
        }

        public OutputBufferType getOutputBufferType() {
            return this.outputBufferType;
        }

        public PrestoSparkOutputBuffer<?> getOutputBuffer() {
            return this.outputBuffer;
        }

        public OutputFactory getOutputFactory() {
            return this.outputFactory;
        }

        public OutputSupplier<T> getOutputSupplier() {
            return this.outputSupplier;
        }
    }

    private static class PrestoSparkTaskExecutor<T extends PrestoSparkTaskOutput>
    extends AbstractIterator<Tuple2<MutablePartitionId, T>>
    implements IPrestoSparkTaskExecutor<T> {
        private final TaskContext taskContext;
        private final TaskStateMachine taskStateMachine;
        private final OutputSupplier<T> outputSupplier;
        private final Codec<TaskInfo> taskInfoCodec;
        private final CollectionAccumulator<SerializedTaskInfo> taskInfoCollector;
        private final CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector;
        private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
        private final OutputBufferType outputBufferType;
        private final PrestoSparkOutputBuffer<?> outputBuffer;
        private final TempStorage tempStorage;
        private final TempDataOperationContext tempDataOperationContext;
        private final UUID taskInstanceId = UUID.randomUUID();
        private Tuple2<MutablePartitionId, T> next;
        private Long start;
        private long processedRows;
        private long processedRowBatches;
        private long processedBytes;

        private PrestoSparkTaskExecutor(TaskContext taskContext, TaskStateMachine taskStateMachine, OutputSupplier<T> outputSupplier, Codec<TaskInfo> taskInfoCodec, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, PrestoSparkExecutionExceptionFactory executionExceptionFactory, OutputBufferType outputBufferType, PrestoSparkOutputBuffer<?> outputBuffer, TempStorage tempStorage, TempDataOperationContext tempDataOperationContext) {
            this.taskContext = Objects.requireNonNull(taskContext, "taskContext is null");
            this.taskStateMachine = Objects.requireNonNull(taskStateMachine, "taskStateMachine is null");
            this.outputSupplier = Objects.requireNonNull(outputSupplier, "outputSupplier is null");
            this.taskInfoCodec = Objects.requireNonNull(taskInfoCodec, "taskInfoCodec is null");
            this.taskInfoCollector = Objects.requireNonNull(taskInfoCollector, "taskInfoCollector is null");
            this.shuffleStatsCollector = Objects.requireNonNull(shuffleStatsCollector, "shuffleStatsCollector is null");
            this.executionExceptionFactory = Objects.requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
            this.outputBufferType = Objects.requireNonNull(outputBufferType, "outputBufferType is null");
            this.outputBuffer = Objects.requireNonNull(outputBuffer, "outputBuffer is null");
            this.tempStorage = Objects.requireNonNull(tempStorage, "tempStorage is null");
            this.tempDataOperationContext = Objects.requireNonNull(tempDataOperationContext, "tempDataOperationContext is null");
        }

        public boolean hasNext() {
            if (this.next == null) {
                this.next = this.computeNext();
            }
            return this.next != null;
        }

        public Tuple2<MutablePartitionId, T> next() {
            if (this.next == null) {
                this.next = this.computeNext();
            }
            if (this.next == null) {
                throw new NoSuchElementException();
            }
            Tuple2<MutablePartitionId, T> result = this.next;
            this.next = null;
            return result;
        }

        protected Tuple2<MutablePartitionId, T> computeNext() {
            try {
                return this.doComputeNext();
            }
            catch (RuntimeException e) {
                throw this.executionExceptionFactory.toPrestoSparkExecutionException(e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.taskStateMachine.abort();
                throw new RuntimeException(e);
            }
        }

        private Tuple2<MutablePartitionId, T> doComputeNext() throws InterruptedException {
            Throwable failure;
            block6: {
                Tuple2<MutablePartitionId, T> output;
                if (this.start == null) {
                    this.start = System.currentTimeMillis();
                }
                if ((output = this.outputSupplier.getNext()) != null) {
                    this.processedRows += ((PrestoSparkTaskOutput)output._2).getPositionCount();
                    ++this.processedRowBatches;
                    this.processedBytes += ((PrestoSparkTaskOutput)output._2).getSize();
                    return output;
                }
                TaskState taskState = this.taskStateMachine.getState();
                Preconditions.checkState((boolean)taskState.isDone(), (Object)"task is expected to be done");
                long end = System.currentTimeMillis();
                PrestoSparkShuffleStats shuffleStats = new PrestoSparkShuffleStats(this.taskContext.getTaskId().getStageExecutionId().getStageId().getId(), this.taskContext.getTaskId().getId(), PrestoSparkShuffleStats.Operation.WRITE, this.processedRows, this.processedRowBatches, this.processedBytes, end - this.start - this.outputSupplier.getTimeSpentWaitingForOutputInMillis());
                this.shuffleStatsCollector.add((Object)shuffleStats);
                TaskInfo taskInfo = PrestoSparkTaskExecutor.createTaskInfo(this.taskContext, this.taskStateMachine, this.taskInstanceId, this.outputBufferType, this.outputBuffer);
                SerializedTaskInfo serializedTaskInfo = new SerializedTaskInfo(PrestoSparkUtils.serializeZstdCompressed(this.taskInfoCodec, taskInfo));
                this.taskInfoCollector.add((Object)serializedTaskInfo);
                LinkedBlockingQueue failures = this.taskStateMachine.getFailureCauses();
                if (failures.isEmpty()) {
                    return null;
                }
                failure = (Throwable)Iterables.getFirst((Iterable)failures, null);
                if (this.outputSupplier instanceof DiskPageOutputSupplier && output != null) {
                    PrestoSparkStorageHandle sparkStorageHandle = (PrestoSparkStorageHandle)output._2;
                    TempStorageHandle tempStorageHandle = this.tempStorage.deserialize(sparkStorageHandle.getSerializedStorageHandle());
                    try {
                        this.tempStorage.remove(this.tempDataOperationContext, tempStorageHandle);
                        log.info("Removed broadcast spill file: " + tempStorageHandle.toString());
                    }
                    catch (IOException e) {
                        if (e == failure) break block6;
                        failure.addSuppressed(e);
                    }
                }
            }
            Throwables.propagateIfPossible((Throwable)failure, Error.class);
            Throwables.propagateIfPossible((Throwable)failure, RuntimeException.class);
            Throwables.propagateIfPossible((Throwable)failure, InterruptedException.class);
            throw new RuntimeException(failure);
        }

        private static TaskInfo createTaskInfo(TaskContext taskContext, TaskStateMachine taskStateMachine, UUID taskInstanceId, OutputBufferType outputBufferType, PrestoSparkOutputBuffer<?> outputBuffer) {
            TaskId taskId = taskContext.getTaskId();
            TaskState taskState = taskContext.getState();
            TaskStats taskStats = taskContext.getTaskStats().summarizeFinal();
            Object failures = ImmutableList.of();
            if (taskState == TaskState.FAILED) {
                failures = Failures.toFailures((Collection)taskStateMachine.getFailureCauses());
            }
            TaskStatus taskStatus = new TaskStatus(taskInstanceId.getLeastSignificantBits(), taskInstanceId.getMostSignificantBits(), 1L, taskState, URI.create("http://fake.invalid/task/" + taskId), taskContext.getCompletedDriverGroups(), (List)failures, taskStats.getQueuedPartitionedDrivers(), taskStats.getRunningPartitionedDrivers(), 0.0, false, taskStats.getPhysicalWrittenDataSizeInBytes(), taskStats.getUserMemoryReservationInBytes(), taskStats.getSystemMemoryReservationInBytes(), taskStats.getPeakNodeTotalMemoryInBytes(), (long)taskStats.getFullGcCount(), taskStats.getFullGcTimeInMillis(), taskStats.getTotalCpuTimeInNanos(), System.currentTimeMillis() - taskStats.getCreateTime().getMillis(), taskStats.getQueuedPartitionedSplitsWeight(), taskStats.getRunningPartitionedSplitsWeight());
            OutputBufferInfo outputBufferInfo = new OutputBufferInfo(outputBufferType.name(), BufferState.FINISHED, false, false, 0L, 0L, outputBuffer.getTotalRowsProcessed(), outputBuffer.getTotalPagesProcessed(), (List)ImmutableList.of());
            return new TaskInfo(taskId, taskStatus, DateTime.now(), outputBufferInfo, (Set)ImmutableSet.of(), taskStats, false, MetadataUpdates.DEFAULT_METADATA_UPDATES, "");
        }
    }
}

