/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.GcMonitor;
import com.facebook.airlift.stats.TestingGcMonitor;
import com.facebook.presto.Session;
import com.facebook.presto.block.BlockEncodingManager;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.event.SplitMonitor;
import com.facebook.presto.execution.StageExecutionId;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.TaskStateMachine;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.buffer.BufferState;
import com.facebook.presto.execution.buffer.OutputBufferInfo;
import com.facebook.presto.execution.buffer.OutputBufferMemoryManager;
import com.facebook.presto.execution.executor.TaskExecutor;
import com.facebook.presto.memory.MemoryPool;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.memory.QueryContext;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.operator.OutputFactory;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.spark.PrestoSparkAuthenticatorProvider;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutor;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutorFactory;
import com.facebook.presto.spark.classloader_interface.MutablePartitionId;
import com.facebook.presto.spark.classloader_interface.PrestoSparkMutableRow;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskSource;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.execution.PrestoSparkBufferedSerializedPage;
import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory;
import com.facebook.presto.spark.execution.PrestoSparkOutputBuffer;
import com.facebook.presto.spark.execution.PrestoSparkPageOutputOperator;
import com.facebook.presto.spark.execution.PrestoSparkRemoteSourceFactory;
import com.facebook.presto.spark.execution.PrestoSparkRowBatch;
import com.facebook.presto.spark.execution.PrestoSparkRowOutputOperator;
import com.facebook.presto.spark.execution.PrestoSparkTaskExecution;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spiller.NodeSpillConfig;
import com.facebook.presto.spiller.SpillSpaceTracker;
import com.facebook.presto.sql.planner.LocalExecutionPlanner;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.RemoteSourceFactory;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.util.Failures;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.airlift.units.DataSize;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Inject;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.CollectionAccumulator;
import org.joda.time.DateTime;
import scala.Tuple2;
import scala.collection.AbstractIterator;
import scala.collection.Iterator;

public class PrestoSparkTaskExecutorFactory
implements IPrestoSparkTaskExecutorFactory {
    private static final Logger log = Logger.get(PrestoSparkTaskExecutorFactory.class);
    private final SessionPropertyManager sessionPropertyManager;
    private final BlockEncodingManager blockEncodingManager;
    private final JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec;
    private final JsonCodec<TaskSource> taskSourceJsonCodec;
    private final JsonCodec<TaskInfo> taskInfoJsonCodec;
    private final Executor notificationExecutor;
    private final ScheduledExecutorService yieldExecutor;
    private final LocalExecutionPlanner localExecutionPlanner;
    private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
    private final TaskExecutor taskExecutor;
    private final SplitMonitor splitMonitor;
    private final Set<PrestoSparkAuthenticatorProvider> authenticatorProviders;
    private final DataSize maxUserMemory;
    private final DataSize maxTotalMemory;
    private final DataSize maxSpillMemory;
    private final DataSize sinkMaxBufferSize;
    private final boolean perOperatorCpuTimerEnabled;
    private final boolean cpuTimerEnabled;
    private final boolean perOperatorAllocationTrackingEnabled;
    private final boolean allocationTrackingEnabled;

    @Inject
    public PrestoSparkTaskExecutorFactory(SessionPropertyManager sessionPropertyManager, BlockEncodingManager blockEncodingManager, JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec, JsonCodec<TaskSource> taskSourceJsonCodec, JsonCodec<TaskInfo> taskInfoJsonCodec, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, LocalExecutionPlanner localExecutionPlanner, PrestoSparkExecutionExceptionFactory executionExceptionFactory, TaskExecutor taskExecutor, SplitMonitor splitMonitor, Set<PrestoSparkAuthenticatorProvider> authenticatorProviders, TaskManagerConfig taskManagerConfig, NodeMemoryConfig nodeMemoryConfig, NodeSpillConfig nodeSpillConfig) {
        this(sessionPropertyManager, blockEncodingManager, taskDescriptorJsonCodec, taskSourceJsonCodec, taskInfoJsonCodec, notificationExecutor, yieldExecutor, localExecutionPlanner, executionExceptionFactory, taskExecutor, splitMonitor, authenticatorProviders, Objects.requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null").getMaxQueryMemoryPerNode(), Objects.requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null").getMaxQueryTotalMemoryPerNode(), Objects.requireNonNull(nodeSpillConfig, "nodeSpillConfig is null").getMaxSpillPerNode(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").getSinkMaxBufferSize(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isPerOperatorCpuTimerEnabled(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isTaskCpuTimerEnabled(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isPerOperatorAllocationTrackingEnabled(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isTaskAllocationTrackingEnabled());
    }

    public PrestoSparkTaskExecutorFactory(SessionPropertyManager sessionPropertyManager, BlockEncodingManager blockEncodingManager, JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec, JsonCodec<TaskSource> taskSourceJsonCodec, JsonCodec<TaskInfo> taskInfoJsonCodec, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, LocalExecutionPlanner localExecutionPlanner, PrestoSparkExecutionExceptionFactory executionExceptionFactory, TaskExecutor taskExecutor, SplitMonitor splitMonitor, Set<PrestoSparkAuthenticatorProvider> authenticatorProviders, DataSize maxUserMemory, DataSize maxTotalMemory, DataSize maxSpillMemory, DataSize sinkMaxBufferSize, boolean perOperatorCpuTimerEnabled, boolean cpuTimerEnabled, boolean perOperatorAllocationTrackingEnabled, boolean allocationTrackingEnabled) {
        this.sessionPropertyManager = Objects.requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
        this.blockEncodingManager = Objects.requireNonNull(blockEncodingManager, "blockEncodingManager is null");
        this.taskDescriptorJsonCodec = Objects.requireNonNull(taskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null");
        this.taskSourceJsonCodec = Objects.requireNonNull(taskSourceJsonCodec, "taskSourceJsonCodec is null");
        this.taskInfoJsonCodec = Objects.requireNonNull(taskInfoJsonCodec, "taskInfoJsonCodec is null");
        this.notificationExecutor = Objects.requireNonNull(notificationExecutor, "notificationExecutor is null");
        this.yieldExecutor = Objects.requireNonNull(yieldExecutor, "yieldExecutor is null");
        this.localExecutionPlanner = Objects.requireNonNull(localExecutionPlanner, "localExecutionPlanner is null");
        this.executionExceptionFactory = Objects.requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
        this.taskExecutor = Objects.requireNonNull(taskExecutor, "taskExecutor is null");
        this.splitMonitor = Objects.requireNonNull(splitMonitor, "splitMonitor is null");
        this.authenticatorProviders = ImmutableSet.copyOf((Collection)Objects.requireNonNull(authenticatorProviders, "authenticatorProviders is null"));
        this.maxUserMemory = Objects.requireNonNull(maxUserMemory, "maxUserMemory is null");
        this.maxTotalMemory = Objects.requireNonNull(maxTotalMemory, "maxTotalMemory is null");
        this.maxSpillMemory = Objects.requireNonNull(maxSpillMemory, "maxSpillMemory is null");
        this.sinkMaxBufferSize = Objects.requireNonNull(sinkMaxBufferSize, "sinkMaxBufferSize is null");
        this.perOperatorCpuTimerEnabled = perOperatorCpuTimerEnabled;
        this.cpuTimerEnabled = cpuTimerEnabled;
        this.perOperatorAllocationTrackingEnabled = perOperatorAllocationTrackingEnabled;
        this.allocationTrackingEnabled = allocationTrackingEnabled;
    }

    public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> create(int partitionId, int attemptNumber, SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, Iterator<SerializedPrestoSparkTaskSource> serializedTaskSources, PrestoSparkTaskInputs inputs, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, Class<T> outputType) {
        try {
            return this.doCreate(partitionId, attemptNumber, serializedTaskDescriptor, serializedTaskSources, inputs, taskInfoCollector, outputType);
        }
        catch (RuntimeException e) {
            throw this.executionExceptionFactory.toPrestoSparkExecutionException(e);
        }
    }

    public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> doCreate(int partitionId, int attemptNumber, SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, Iterator<SerializedPrestoSparkTaskSource> serializedTaskSources, PrestoSparkTaskInputs inputs, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, Class<T> outputType) {
        PrestoSparkTaskDescriptor taskDescriptor = (PrestoSparkTaskDescriptor)this.taskDescriptorJsonCodec.fromJson(serializedTaskDescriptor.getBytes());
        ImmutableMap.Builder extraAuthenticators = ImmutableMap.builder();
        this.authenticatorProviders.forEach(provider -> extraAuthenticators.putAll(provider.getTokenAuthenticators()));
        Session session = taskDescriptor.getSession().toSession(this.sessionPropertyManager, taskDescriptor.getExtraCredentials(), (Map)extraAuthenticators.build());
        PlanFragment fragment = taskDescriptor.getFragment();
        StageId stageId = new StageId(session.getQueryId(), fragment.getId().getId());
        TaskId taskId = new TaskId(new StageExecutionId(stageId, 0), partitionId);
        List<TaskSource> taskSources = this.getTaskSources(serializedTaskSources);
        log.info("Task [%s] received %d splits.", new Object[]{taskId, taskSources.stream().mapToInt(taskSource -> taskSource.getSplits().size()).sum()});
        MemoryPool memoryPool = new MemoryPool(new MemoryPoolId("spark-executor-memory-pool"), this.maxTotalMemory);
        SpillSpaceTracker spillSpaceTracker = new SpillSpaceTracker(this.maxSpillMemory);
        QueryContext queryContext = new QueryContext(session.getQueryId(), this.maxUserMemory, this.maxTotalMemory, this.maxUserMemory, memoryPool, (GcMonitor)new TestingGcMonitor(), this.notificationExecutor, this.yieldExecutor, this.maxSpillMemory, spillSpaceTracker);
        TaskStateMachine taskStateMachine = new TaskStateMachine(taskId, this.notificationExecutor);
        TaskContext taskContext = queryContext.addTaskContext(taskStateMachine, session, this.perOperatorCpuTimerEnabled, this.cpuTimerEnabled, this.perOperatorAllocationTrackingEnabled, this.allocationTrackingEnabled, false);
        ImmutableMap.Builder rowInputs = ImmutableMap.builder();
        ImmutableMap.Builder pageInputs = ImmutableMap.builder();
        for (RemoteSourceNode remoteSource : fragment.getRemoteSourceNodes()) {
            ArrayList<Iterator> remoteSourceRowInputs = new ArrayList<Iterator>();
            ArrayList remoteSourcePageInputs = new ArrayList();
            for (PlanFragmentId sourceFragmentId : remoteSource.getSourceFragmentIds()) {
                Iterator shuffleInput = (Iterator)inputs.getShuffleInputs().get(sourceFragmentId.toString());
                Broadcast broadcastInput = (Broadcast)inputs.getBroadcastInputs().get(sourceFragmentId.toString());
                List inMemoryInput = (List)inputs.getInMemoryInputs().get(sourceFragmentId.toString());
                if (shuffleInput != null) {
                    Preconditions.checkArgument((broadcastInput == null ? 1 : 0) != 0, (Object)"single remote source is not expected to accept different kind of inputs");
                    Preconditions.checkArgument((inMemoryInput == null ? 1 : 0) != 0, (Object)"single remote source is not expected to accept different kind of inputs");
                    remoteSourceRowInputs.add(shuffleInput);
                    continue;
                }
                if (broadcastInput != null) {
                    Preconditions.checkArgument((inMemoryInput == null ? 1 : 0) != 0, (Object)"single remote source is not expected to accept different kind of inputs");
                    remoteSourcePageInputs.add(((List)broadcastInput.value()).iterator());
                    continue;
                }
                if (inMemoryInput != null) {
                    remoteSourcePageInputs.add(inMemoryInput.iterator());
                    continue;
                }
                throw new IllegalArgumentException("Input not found for sourceFragmentId: " + sourceFragmentId);
            }
            if (!remoteSourceRowInputs.isEmpty()) {
                rowInputs.put((Object)remoteSource.getId(), remoteSourceRowInputs);
            }
            if (remoteSourcePageInputs.isEmpty()) continue;
            pageInputs.put((Object)remoteSource.getId(), remoteSourcePageInputs);
        }
        OutputBufferMemoryManager memoryManager = new OutputBufferMemoryManager(this.sinkMaxBufferSize.toBytes(), () -> queryContext.getTaskContextByTaskId(taskId).localSystemMemoryContext(), this.notificationExecutor);
        PagesSerde pagesSerde = new PagesSerde((BlockEncodingSerde)this.blockEncodingManager, Optional.empty(), Optional.empty(), Optional.empty());
        Output<T> output = PrestoSparkTaskExecutorFactory.configureOutput(outputType, pagesSerde, memoryManager);
        PrestoSparkOutputBuffer<?> outputBuffer = output.getOutputBuffer();
        LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = this.localExecutionPlanner.plan(taskContext, fragment.getRoot(), fragment.getPartitioningScheme(), fragment.getStageExecutionDescriptor(), fragment.getTableScanSchedulingOrder(), output.getOutputFactory(), (RemoteSourceFactory)new PrestoSparkRemoteSourceFactory(pagesSerde, (Map<PlanNodeId, List<Iterator<Tuple2<MutablePartitionId, PrestoSparkMutableRow>>>>)rowInputs.build(), (Map<PlanNodeId, List<java.util.Iterator<PrestoSparkSerializedPage>>>)pageInputs.build()), taskDescriptor.getTableWriteInfo(), true);
        taskStateMachine.addStateChangeListener(state -> {
            if (state.isDone()) {
                outputBuffer.setNoMoreRows();
            }
        });
        PrestoSparkTaskExecution taskExecution = new PrestoSparkTaskExecution(taskStateMachine, taskContext, localExecutionPlan, this.taskExecutor, this.splitMonitor, this.notificationExecutor);
        taskExecution.start(taskSources);
        return new PrestoSparkTaskExecutor(taskContext, taskStateMachine, output.getOutputSupplier(), this.taskInfoJsonCodec, taskInfoCollector, this.executionExceptionFactory, output.getOutputBufferType(), outputBuffer);
    }

    private List<TaskSource> getTaskSources(Iterator<SerializedPrestoSparkTaskSource> serializedTaskSources) {
        ImmutableList.Builder result = ImmutableList.builder();
        while (serializedTaskSources.hasNext()) {
            SerializedPrestoSparkTaskSource serializedTaskSource = (SerializedPrestoSparkTaskSource)serializedTaskSources.next();
            result.add(this.taskSourceJsonCodec.fromJson(PrestoSparkUtils.decompress(serializedTaskSource.getBytes())));
        }
        return result.build();
    }

    private static <T extends PrestoSparkTaskOutput> Output<T> configureOutput(Class<T> outputType, PagesSerde pagesSerde, OutputBufferMemoryManager memoryManager) {
        if (outputType.equals(PrestoSparkMutableRow.class)) {
            PrestoSparkOutputBuffer<PrestoSparkRowBatch> outputBuffer = new PrestoSparkOutputBuffer<PrestoSparkRowBatch>(memoryManager);
            PrestoSparkRowOutputOperator.PrestoSparkRowOutputFactory outputFactory = new PrestoSparkRowOutputOperator.PrestoSparkRowOutputFactory(outputBuffer);
            RowOutputSupplier outputSupplier = new RowOutputSupplier(outputBuffer);
            return new Output(OutputBufferType.SPARK_ROW_OUTPUT_BUFFER, outputBuffer, outputFactory, outputSupplier);
        }
        if (outputType.equals(PrestoSparkSerializedPage.class)) {
            PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> outputBuffer = new PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage>(memoryManager);
            PrestoSparkPageOutputOperator.PrestoSparkPageOutputFactory outputFactory = new PrestoSparkPageOutputOperator.PrestoSparkPageOutputFactory(outputBuffer, pagesSerde);
            PageOutputSupplier outputSupplier = new PageOutputSupplier(outputBuffer);
            return new Output(OutputBufferType.SPARK_PAGE_OUTPUT_BUFFER, outputBuffer, outputFactory, outputSupplier);
        }
        throw new IllegalArgumentException("Unexpected output type: " + outputType.getName());
    }

    private static enum OutputBufferType {
        SPARK_ROW_OUTPUT_BUFFER,
        SPARK_PAGE_OUTPUT_BUFFER;

    }

    private static class PageOutputSupplier
    implements OutputSupplier<PrestoSparkSerializedPage> {
        private static final MutablePartitionId DEFAULT_PARTITION = new MutablePartitionId();
        private final PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> outputBuffer;

        private PageOutputSupplier(PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> outputBuffer) {
            this.outputBuffer = Objects.requireNonNull(outputBuffer, "outputBuffer is null");
        }

        @Override
        public Tuple2<MutablePartitionId, PrestoSparkSerializedPage> getNext() throws InterruptedException {
            PrestoSparkBufferedSerializedPage page = this.outputBuffer.get();
            if (page == null) {
                return null;
            }
            return new Tuple2((Object)DEFAULT_PARTITION, (Object)PrestoSparkUtils.toPrestoSparkSerializedPage(page.getSerializedPage()));
        }
    }

    private static class RowOutputSupplier
    implements OutputSupplier<PrestoSparkMutableRow> {
        private final PrestoSparkOutputBuffer<PrestoSparkRowBatch> outputBuffer;
        private PrestoSparkRowBatch.RowTupleSupplier currentRowTupleSupplier;

        private RowOutputSupplier(PrestoSparkOutputBuffer<PrestoSparkRowBatch> outputBuffer) {
            this.outputBuffer = Objects.requireNonNull(outputBuffer, "outputBuffer is null");
        }

        @Override
        public Tuple2<MutablePartitionId, PrestoSparkMutableRow> getNext() throws InterruptedException {
            Tuple2<MutablePartitionId, PrestoSparkMutableRow> next = null;
            while (next == null) {
                if (this.currentRowTupleSupplier == null) {
                    PrestoSparkRowBatch rowBatch = this.outputBuffer.get();
                    if (rowBatch == null) {
                        return null;
                    }
                    this.currentRowTupleSupplier = rowBatch.createRowTupleSupplier();
                }
                if ((next = this.currentRowTupleSupplier.getNext()) != null) continue;
                this.currentRowTupleSupplier = null;
            }
            return next;
        }
    }

    private static interface OutputSupplier<T extends PrestoSparkTaskOutput> {
        public Tuple2<MutablePartitionId, T> getNext() throws InterruptedException;
    }

    private static class Output<T extends PrestoSparkTaskOutput> {
        private final OutputBufferType outputBufferType;
        private final PrestoSparkOutputBuffer<?> outputBuffer;
        private final OutputFactory outputFactory;
        private final OutputSupplier<T> outputSupplier;

        private Output(OutputBufferType outputBufferType, PrestoSparkOutputBuffer<?> outputBuffer, OutputFactory outputFactory, OutputSupplier<T> outputSupplier) {
            this.outputBufferType = Objects.requireNonNull(outputBufferType, "outputBufferType is null");
            this.outputBuffer = Objects.requireNonNull(outputBuffer, "outputBuffer is null");
            this.outputFactory = Objects.requireNonNull(outputFactory, "outputFactory is null");
            this.outputSupplier = Objects.requireNonNull(outputSupplier, "outputSupplier is null");
        }

        public OutputBufferType getOutputBufferType() {
            return this.outputBufferType;
        }

        public PrestoSparkOutputBuffer<?> getOutputBuffer() {
            return this.outputBuffer;
        }

        public OutputFactory getOutputFactory() {
            return this.outputFactory;
        }

        public OutputSupplier<T> getOutputSupplier() {
            return this.outputSupplier;
        }
    }

    private static class PrestoSparkTaskExecutor<T extends PrestoSparkTaskOutput>
    extends AbstractIterator<Tuple2<MutablePartitionId, T>>
    implements IPrestoSparkTaskExecutor<T> {
        private final TaskContext taskContext;
        private final TaskStateMachine taskStateMachine;
        private final OutputSupplier<T> outputSupplier;
        private final JsonCodec<TaskInfo> taskInfoJsonCodec;
        private final CollectionAccumulator<SerializedTaskInfo> taskInfoCollector;
        private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
        private final OutputBufferType outputBufferType;
        private final PrestoSparkOutputBuffer<?> outputBuffer;
        private final UUID taskInstanceId = UUID.randomUUID();
        private Tuple2<MutablePartitionId, T> next;

        private PrestoSparkTaskExecutor(TaskContext taskContext, TaskStateMachine taskStateMachine, OutputSupplier<T> outputSupplier, JsonCodec<TaskInfo> taskInfoJsonCodec, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, PrestoSparkExecutionExceptionFactory executionExceptionFactory, OutputBufferType outputBufferType, PrestoSparkOutputBuffer<?> outputBuffer) {
            this.taskContext = Objects.requireNonNull(taskContext, "taskContext is null");
            this.taskStateMachine = Objects.requireNonNull(taskStateMachine, "taskStateMachine is null");
            this.outputSupplier = Objects.requireNonNull(outputSupplier, "outputSupplier is null");
            this.taskInfoJsonCodec = Objects.requireNonNull(taskInfoJsonCodec, "taskInfoJsonCodec is null");
            this.taskInfoCollector = Objects.requireNonNull(taskInfoCollector, "taskInfoCollector is null");
            this.executionExceptionFactory = Objects.requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
            this.outputBufferType = Objects.requireNonNull(outputBufferType, "outputBufferType is null");
            this.outputBuffer = Objects.requireNonNull(outputBuffer, "outputBuffer is null");
        }

        public boolean hasNext() {
            if (this.next == null) {
                this.next = this.computeNext();
            }
            return this.next != null;
        }

        public Tuple2<MutablePartitionId, T> next() {
            if (this.next == null) {
                this.next = this.computeNext();
            }
            if (this.next == null) {
                throw new NoSuchElementException();
            }
            Tuple2<MutablePartitionId, T> result = this.next;
            this.next = null;
            return result;
        }

        protected Tuple2<MutablePartitionId, T> computeNext() {
            try {
                return this.doComputeNext();
            }
            catch (RuntimeException e) {
                throw this.executionExceptionFactory.toPrestoSparkExecutionException(e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.taskStateMachine.abort();
                throw new RuntimeException(e);
            }
        }

        private Tuple2<MutablePartitionId, T> doComputeNext() throws InterruptedException {
            Tuple2<MutablePartitionId, T> output = this.outputSupplier.getNext();
            if (output != null) {
                return output;
            }
            TaskState taskState = this.taskStateMachine.getState();
            Preconditions.checkState((boolean)taskState.isDone(), (Object)"task is expected to be done");
            TaskInfo taskInfo = PrestoSparkTaskExecutor.createTaskInfo(this.taskContext, this.taskStateMachine, this.taskInstanceId, this.outputBufferType, this.outputBuffer);
            SerializedTaskInfo serializedTaskInfo = new SerializedTaskInfo(taskInfo.getTaskId().getStageExecutionId().getStageId().getId(), taskInfo.getTaskId().getId(), PrestoSparkUtils.compress(this.taskInfoJsonCodec.toJsonBytes((Object)taskInfo)));
            this.taskInfoCollector.add((Object)serializedTaskInfo);
            LinkedBlockingQueue failures = this.taskStateMachine.getFailureCauses();
            if (failures.isEmpty()) {
                return null;
            }
            Throwable failure = (Throwable)Iterables.getFirst((Iterable)failures, null);
            Throwables.propagateIfPossible((Throwable)failure, Error.class);
            Throwables.propagateIfPossible((Throwable)failure, RuntimeException.class);
            Throwables.propagateIfPossible((Throwable)failure, InterruptedException.class);
            throw new RuntimeException(failure);
        }

        private static TaskInfo createTaskInfo(TaskContext taskContext, TaskStateMachine taskStateMachine, UUID taskInstanceId, OutputBufferType outputBufferType, PrestoSparkOutputBuffer<?> outputBuffer) {
            TaskId taskId = taskContext.getTaskId();
            TaskState taskState = taskContext.getState();
            TaskStats taskStats = taskContext.getTaskStats();
            Object failures = ImmutableList.of();
            if (taskState == TaskState.FAILED) {
                failures = Failures.toFailures((Collection)taskStateMachine.getFailureCauses());
            }
            TaskStatus taskStatus = new TaskStatus(taskInstanceId.getLeastSignificantBits(), taskInstanceId.getMostSignificantBits(), 1L, taskState, URI.create("http://fake.invalid/task/" + taskId), taskContext.getCompletedDriverGroups(), (List)failures, taskStats.getQueuedPartitionedDrivers(), taskStats.getRunningPartitionedDrivers(), 0.0, false, taskStats.getPhysicalWrittenDataSize().toBytes(), taskStats.getUserMemoryReservation().toBytes(), taskStats.getSystemMemoryReservation().toBytes(), (long)taskStats.getFullGcCount(), taskStats.getFullGcTime().toMillis());
            OutputBufferInfo outputBufferInfo = new OutputBufferInfo(outputBufferType.name(), BufferState.FINISHED, false, false, 0L, 0L, outputBuffer.getTotalRowsProcessed(), outputBuffer.getTotalPagesProcessed(), (List)ImmutableList.of());
            return new TaskInfo(taskId, taskStatus, DateTime.now(), outputBufferInfo, (Set)ImmutableSet.of(), taskStats, false);
        }
    }
}

