/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution.task;

import com.facebook.airlift.json.Codec;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.common.RuntimeUnit;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.Location;
import com.facebook.presto.execution.ScheduledSplit;
import com.facebook.presto.execution.StageExecutionId;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.metadata.RemoteTransactionHandle;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.operator.ExchangeOperator;
import com.facebook.presto.spark.PrestoSparkSessionProperties;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.accesscontrol.PrestoSparkAuthenticatorProvider;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutor;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutorFactory;
import com.facebook.presto.spark.classloader_interface.MutablePartitionId;
import com.facebook.presto.spark.classloader_interface.PrestoSparkNativeTaskInputs;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleReadDescriptor;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleWriteDescriptor;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskSource;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.execution.BroadcastFileInfo;
import com.facebook.presto.spark.execution.PrestoSparkBroadcastTableCacheManager;
import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory;
import com.facebook.presto.spark.execution.nativeprocess.NativeExecutionProcess;
import com.facebook.presto.spark.execution.nativeprocess.NativeExecutionProcessFactory;
import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleInfoTranslator;
import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleWriteInfo;
import com.facebook.presto.spark.execution.task.NativeExecutionTask;
import com.facebook.presto.spark.execution.task.NativeExecutionTaskFactory;
import com.facebook.presto.spark.util.PrestoSparkStatsCollectionUtils;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.split.RemoteSplit;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.SchedulingOrderVisitor;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.sun.management.OperatingSystemMXBean;
import io.airlift.units.DataSize;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.inject.Inject;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.CollectionAccumulator;
import scala.Tuple2;
import scala.collection.AbstractIterator;
import scala.collection.Iterator;

public class PrestoSparkNativeTaskExecutorFactory
implements IPrestoSparkTaskExecutorFactory {
    private static final Logger log = Logger.get(PrestoSparkNativeTaskExecutorFactory.class);
    private static final TaskId DUMMY_TASK_ID = TaskId.valueOf((String)"remotesourcetaskid.0.0.0.0");
    private final SessionPropertyManager sessionPropertyManager;
    private final JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec;
    private final JsonCodec<BroadcastFileInfo> broadcastFileInfoJsonCodec;
    private final Codec<TaskSource> taskSourceCodec;
    private final Codec<TaskInfo> taskInfoCodec;
    private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
    private final Set<PrestoSparkAuthenticatorProvider> authenticatorProviders;
    private final NativeExecutionProcessFactory nativeExecutionProcessFactory;
    private final NativeExecutionTaskFactory nativeExecutionTaskFactory;
    private final PrestoSparkShuffleInfoTranslator shuffleInfoTranslator;
    private final PagesSerde pagesSerde;
    private NativeExecutionProcess nativeExecutionProcess;

    @Inject
    public PrestoSparkNativeTaskExecutorFactory(SessionPropertyManager sessionPropertyManager, BlockEncodingManager blockEncodingManager, JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec, JsonCodec<BroadcastFileInfo> broadcastFileInfoJsonCodec, Codec<TaskSource> taskSourceCodec, Codec<TaskInfo> taskInfoCodec, PrestoSparkExecutionExceptionFactory executionExceptionFactory, Set<PrestoSparkAuthenticatorProvider> authenticatorProviders, PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager, NativeExecutionProcessFactory nativeExecutionProcessFactory, NativeExecutionTaskFactory nativeExecutionTaskFactory, PrestoSparkShuffleInfoTranslator shuffleInfoTranslator) {
        this.sessionPropertyManager = Objects.requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
        this.taskDescriptorJsonCodec = Objects.requireNonNull(taskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null");
        this.taskSourceCodec = Objects.requireNonNull(taskSourceCodec, "taskSourceCodec is null");
        this.taskInfoCodec = Objects.requireNonNull(taskInfoCodec, "taskInfoCodec is null");
        this.broadcastFileInfoJsonCodec = Objects.requireNonNull(broadcastFileInfoJsonCodec, "broadcastFileInfoJsonCodec is null");
        this.executionExceptionFactory = Objects.requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
        this.authenticatorProviders = ImmutableSet.copyOf((Collection)Objects.requireNonNull(authenticatorProviders, "authenticatorProviders is null"));
        this.nativeExecutionProcessFactory = Objects.requireNonNull(nativeExecutionProcessFactory, "processFactory is null");
        this.nativeExecutionTaskFactory = Objects.requireNonNull(nativeExecutionTaskFactory, "taskFactory is null");
        this.shuffleInfoTranslator = Objects.requireNonNull(shuffleInfoTranslator, "shuffleInfoFactory is null");
        this.pagesSerde = PrestoSparkUtils.createPagesSerde(Objects.requireNonNull(blockEncodingManager, "blockEncodingManager is null"));
    }

    public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> create(int partitionId, int attemptNumber, SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, Iterator<SerializedPrestoSparkTaskSource> serializedTaskSources, PrestoSparkTaskInputs inputs, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, Class<T> outputType) {
        try {
            return this.doCreate(partitionId, attemptNumber, serializedTaskDescriptor, serializedTaskSources, inputs, taskInfoCollector, shuffleStatsCollector, outputType);
        }
        catch (RuntimeException e) {
            throw this.executionExceptionFactory.toPrestoSparkExecutionException(e);
        }
    }

    public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> doCreate(int partitionId, int attemptNumber, SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, Iterator<SerializedPrestoSparkTaskSource> serializedTaskSources, PrestoSparkTaskInputs inputs, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, Class<T> outputType) {
        CpuTracker cpuTracker = new CpuTracker();
        PrestoSparkTaskDescriptor taskDescriptor = (PrestoSparkTaskDescriptor)this.taskDescriptorJsonCodec.fromJson(serializedTaskDescriptor.getBytes());
        ImmutableMap.Builder extraAuthenticators = ImmutableMap.builder();
        this.authenticatorProviders.forEach(provider -> extraAuthenticators.putAll(provider.getTokenAuthenticators()));
        Session session = taskDescriptor.getSession().toSession(this.sessionPropertyManager, taskDescriptor.getExtraCredentials(), (Map)extraAuthenticators.build());
        PlanFragment fragment = taskDescriptor.getFragment();
        StageId stageId = new StageId(session.getQueryId(), fragment.getId().getId());
        TaskId taskId = new TaskId(new StageExecutionId(stageId, 0), partitionId, attemptNumber);
        log.info("Logging plan fragment is not supported for presto-on-spark native execution, yet");
        if (fragment.getPartitioning().isCoordinatorOnly()) {
            throw new UnsupportedOperationException("Coordinator only fragment execution is not supported by native task executor");
        }
        Preconditions.checkArgument((boolean)(inputs instanceof PrestoSparkNativeTaskInputs), (Object)String.format("PrestoSparkNativeTaskInputs is required for native execution, but %s is provided", inputs.getClass().getName()));
        this.createAndStartNativeExecutionProcess(session);
        PrestoSparkNativeTaskInputs nativeInputs = (PrestoSparkNativeTaskInputs)inputs;
        List<TaskSource> taskSources = this.getTaskSources(serializedTaskSources, fragment, session, nativeInputs);
        boolean isFixedBroadcastDistribution = fragment.getPartitioningScheme().getPartitioning().getHandle().equals((Object)SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION);
        Optional<PrestoSparkShuffleWriteInfo> shuffleWriteInfo = nativeInputs.getShuffleWriteDescriptor().map(descriptor -> this.shuffleInfoTranslator.createShuffleWriteInfo(session, (PrestoSparkShuffleWriteDescriptor)descriptor));
        Optional<String> serializedShuffleWriteInfo = shuffleWriteInfo.map(this.shuffleInfoTranslator::createSerializedWriteInfo);
        Optional<String> broadcastDirectory = isFixedBroadcastDistribution ? Optional.of(this.getBroadcastDirectoryPath(session)) : Optional.empty();
        boolean triggerCoredumpWhenUnresponsive = PrestoSparkSessionProperties.isNativeTriggerCoredumpWhenUnresponsiveEnabled(session);
        try {
            log.info("Submitting native execution task ");
            NativeExecutionTask task = this.nativeExecutionTaskFactory.createNativeExecutionTask(session, this.nativeExecutionProcess.getLocation(), taskId, fragment, (List<TaskSource>)ImmutableList.copyOf(taskSources), taskDescriptor.getTableWriteInfo(), serializedShuffleWriteInfo, broadcastDirectory);
            log.info("Creating task and will wait for remote task completion");
            TaskInfo taskInfo = task.start();
            PrestoSparkNativeTaskExecutorFactory.processTaskInfoForErrorsOrCompletion(taskInfo);
            return new PrestoSparkNativeTaskOutputIterator<T>(partitionId, task, outputType, taskInfoCollector, this.taskInfoCodec, this.executionExceptionFactory, cpuTracker, this.nativeExecutionProcess, triggerCoredumpWhenUnresponsive);
        }
        catch (RuntimeException e) {
            if (triggerCoredumpWhenUnresponsive && NativeExecutionTask.isNativeExecutionTaskError(e)) {
                this.nativeExecutionProcess.sendCoreSignal();
            }
            throw e;
        }
    }

    private String getBroadcastDirectoryPath(Session session) {
        return String.format("%s/%s", PrestoSparkSessionProperties.getNativeExecutionBroadcastBasePath(session), session.getQueryId().getId());
    }

    public void close() {
        if (this.nativeExecutionProcess != null) {
            this.nativeExecutionProcess.close();
        }
    }

    private static void completeTask(boolean success, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, NativeExecutionTask task, Codec<TaskInfo> taskInfoCodec, CpuTracker cpuTracker) {
        task.stop(success);
        OptionalLong processCpuTime = cpuTracker.get();
        Optional<TaskInfo> taskInfoOptional = task.getTaskInfo();
        if (!taskInfoOptional.isPresent()) {
            log.error("Missing taskInfo. Statistics might be inaccurate");
            return;
        }
        processCpuTime.ifPresent(cpuTime -> ((TaskInfo)taskInfoOptional.get()).getStats().getRuntimeStats().addMetricValue("javaProcessCpuTime", RuntimeUnit.NANO, cpuTime));
        SerializedTaskInfo serializedTaskInfo = new SerializedTaskInfo(PrestoSparkUtils.serializeZstdCompressed(taskInfoCodec, taskInfoOptional.get()));
        taskInfoCollector.add((Object)serializedTaskInfo);
        PrestoSparkStatsCollectionUtils.collectMetrics(taskInfoOptional.get());
    }

    private static void processTaskInfoForErrorsOrCompletion(TaskInfo taskInfo) {
        if (!taskInfo.getTaskStatus().getState().isDone()) {
            log.info("processTaskInfoForErrors: task is not done yet.. %s", new Object[]{taskInfo});
            return;
        }
        if (!taskInfo.getTaskStatus().getState().equals((Object)TaskState.FINISHED)) {
            RuntimeException failure = taskInfo.getTaskStatus().getFailures().stream().findFirst().map(ExecutionFailureInfo::toException).orElse((RuntimeException)((Object)new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Native task failed for an unknown reason")));
            throw failure;
        }
        log.info("processTaskInfoForErrors: task completed successfully = %s", new Object[]{taskInfo});
    }

    private void createAndStartNativeExecutionProcess(Session session) {
        Objects.requireNonNull(this.nativeExecutionProcessFactory, "Trying to instantiate native process but factory is null");
        try {
            this.nativeExecutionProcess = this.nativeExecutionProcessFactory.getNativeExecutionProcess(session, NativeExecutionProcessFactory.DEFAULT_URI);
            this.nativeExecutionProcess.start();
        }
        catch (IOException | InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private List<TaskSource> getTaskSources(Iterator<SerializedPrestoSparkTaskSource> serializedTaskSources, PlanFragment fragment, Session session, PrestoSparkNativeTaskInputs nativeTaskInputs) {
        ArrayList<TaskSource> taskSources = new ArrayList<TaskSource>();
        long totalSerializedSizeInBytes = 0L;
        while (serializedTaskSources.hasNext()) {
            SerializedPrestoSparkTaskSource serializedTaskSource = (SerializedPrestoSparkTaskSource)serializedTaskSources.next();
            taskSources.add(PrestoSparkUtils.deserializeZstdCompressed(this.taskSourceCodec, serializedTaskSource.getBytes()));
            totalSerializedSizeInBytes += (long)serializedTaskSource.getBytes().length;
        }
        Set planNodeIdsWithSources = taskSources.stream().map(TaskSource::getPlanNodeId).collect(Collectors.toSet());
        HashSet tableScanIds = Sets.newHashSet((Iterable)SchedulingOrderVisitor.scheduleOrder((PlanNode)fragment.getRoot()));
        tableScanIds.stream().filter(id -> !planNodeIdsWithSources.contains(id)).forEach(id -> taskSources.add(new TaskSource(id, (Set)ImmutableSet.of(), true)));
        log.info("Total serialized size of all table scan task sources: %s", new Object[]{DataSize.succinctBytes((long)totalSerializedSizeInBytes)});
        ImmutableList.Builder shuffleTaskSources = ImmutableList.builder();
        ImmutableList.Builder broadcastTaskSources = ImmutableList.builder();
        AtomicLong nextSplitId = new AtomicLong();
        taskSources.stream().flatMap(source -> source.getSplits().stream()).mapToLong(ScheduledSplit::getSequenceId).max().ifPresent(id -> nextSplitId.set(id + 1L));
        for (RemoteSourceNode remoteSource : fragment.getRemoteSourceNodes()) {
            for (PlanFragmentId sourceFragmentId : remoteSource.getSourceFragmentIds()) {
                Broadcast broadcast;
                PrestoSparkShuffleReadDescriptor shuffleReadDescriptor = (PrestoSparkShuffleReadDescriptor)nativeTaskInputs.getShuffleReadDescriptors().get(sourceFragmentId.toString());
                if (shuffleReadDescriptor != null) {
                    ScheduledSplit split = new ScheduledSplit(nextSplitId.getAndIncrement(), remoteSource.getId(), new Split(ExchangeOperator.REMOTE_CONNECTOR_ID, (ConnectorTransactionHandle)new RemoteTransactionHandle(), (ConnectorSplit)new RemoteSplit(new Location(String.format("batch://%s?shuffleInfo=%s", DUMMY_TASK_ID, this.shuffleInfoTranslator.createSerializedReadInfo(this.shuffleInfoTranslator.createShuffleReadInfo(session, shuffleReadDescriptor)))), DUMMY_TASK_ID)));
                    TaskSource source2 = new TaskSource(remoteSource.getId(), (Set)ImmutableSet.of((Object)split), (Set)ImmutableSet.of((Object)Lifespan.taskWide()), true);
                    shuffleTaskSources.add((Object)source2);
                }
                if ((broadcast = (Broadcast)nativeTaskInputs.getBroadcastInputs().get(sourceFragmentId.toString())) == null) continue;
                Set splits = (Set)((List)broadcast.value()).stream().map(PrestoSparkSerializedPage.class::cast).map(prestoSparkSerializedPage -> PrestoSparkUtils.toSerializedPage(prestoSparkSerializedPage)).map(serializedPage -> this.pagesSerde.deserialize(serializedPage)).flatMap(page -> IntStream.range(0, page.getPositionCount()).mapToObj(position -> VarcharType.VARCHAR.getObjectValue(null, page.getBlock(0), position))).map(String.class::cast).map(filePath -> new BroadcastFileInfo((String)filePath)).map(broadcastFileInfo -> new ScheduledSplit(nextSplitId.getAndIncrement(), remoteSource.getId(), new Split(ExchangeOperator.REMOTE_CONNECTOR_ID, (ConnectorTransactionHandle)new RemoteTransactionHandle(), (ConnectorSplit)new RemoteSplit(new Location(String.format("batch://%s?broadcastInfo=%s", DUMMY_TASK_ID, this.broadcastFileInfoJsonCodec.toJson(broadcastFileInfo))), DUMMY_TASK_ID)))).collect(ImmutableSet.toImmutableSet());
                TaskSource source3 = new TaskSource(remoteSource.getId(), splits, (Set)ImmutableSet.of((Object)Lifespan.taskWide()), true);
                broadcastTaskSources.add((Object)source3);
            }
        }
        taskSources.addAll((Collection<TaskSource>)shuffleTaskSources.build());
        taskSources.addAll((Collection<TaskSource>)broadcastTaskSources.build());
        return taskSources;
    }

    private Optional<TableWriterNode> findTableWriteNode(PlanNode node) {
        return PlanNodeSearcher.searchFrom((PlanNode)node).where(TableWriterNode.class::isInstance).findFirst();
    }

    private static class PrestoSparkNativeTaskOutputIterator<T extends PrestoSparkTaskOutput>
    extends AbstractIterator<Tuple2<MutablePartitionId, T>>
    implements IPrestoSparkTaskExecutor<T> {
        private final int partitionId;
        private final NativeExecutionTask nativeExecutionTask;
        private Optional<SerializedPage> next = Optional.empty();
        private final CollectionAccumulator<SerializedTaskInfo> taskInfoCollectionAccumulator;
        private final Codec<TaskInfo> taskInfoCodec;
        private final Class<T> outputType;
        private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
        private final CpuTracker cpuTracker;
        private final NativeExecutionProcess nativeExecutionProcess;
        private final boolean triggerCoredumpWhenUnresponsive;

        public PrestoSparkNativeTaskOutputIterator(int partitionId, NativeExecutionTask nativeExecutionTask, Class<T> outputType, CollectionAccumulator<SerializedTaskInfo> taskInfoCollectionAccumulator, Codec<TaskInfo> taskInfoCodec, PrestoSparkExecutionExceptionFactory executionExceptionFactory, CpuTracker cpuTracker, NativeExecutionProcess nativeExecutionProcess, boolean triggerCoredumpWhenUnresponsive) {
            this.partitionId = partitionId;
            this.nativeExecutionTask = nativeExecutionTask;
            this.taskInfoCollectionAccumulator = taskInfoCollectionAccumulator;
            this.taskInfoCodec = taskInfoCodec;
            this.outputType = outputType;
            this.executionExceptionFactory = executionExceptionFactory;
            this.cpuTracker = cpuTracker;
            this.nativeExecutionProcess = Objects.requireNonNull(nativeExecutionProcess, "nativeExecutionProcess is null");
            this.triggerCoredumpWhenUnresponsive = triggerCoredumpWhenUnresponsive;
        }

        public boolean hasNext() {
            this.next = this.computeNext();
            return this.next.isPresent();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Optional<SerializedPage> computeNext() {
            try {
                Object taskFinishedOrHasResult;
                Object object = taskFinishedOrHasResult = this.nativeExecutionTask.getTaskFinishedOrHasResult();
                synchronized (object) {
                    while (!this.nativeExecutionTask.isTaskDone() && !this.nativeExecutionTask.hasResult()) {
                        taskFinishedOrHasResult.wait();
                    }
                }
                Optional<SerializedPage> pageOptional = this.nativeExecutionTask.pollResult();
                if (pageOptional.isPresent()) {
                    return pageOptional;
                }
                Object object2 = taskFinishedOrHasResult;
                synchronized (object2) {
                    while (!this.nativeExecutionTask.isTaskDone()) {
                        taskFinishedOrHasResult.wait();
                    }
                }
                Optional<TaskInfo> taskInfo = this.nativeExecutionTask.getTaskInfo();
                PrestoSparkNativeTaskExecutorFactory.processTaskInfoForErrorsOrCompletion(taskInfo.get());
            }
            catch (RuntimeException ex) {
                if (this.triggerCoredumpWhenUnresponsive && NativeExecutionTask.isNativeExecutionTaskError(ex)) {
                    this.nativeExecutionProcess.sendCoreSignal();
                }
                PrestoSparkNativeTaskExecutorFactory.completeTask(false, (CollectionAccumulator<SerializedTaskInfo>)this.taskInfoCollectionAccumulator, this.nativeExecutionTask, (Codec<TaskInfo>)this.taskInfoCodec, this.cpuTracker);
                throw this.executionExceptionFactory.toPrestoSparkExecutionException(ex);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            PrestoSparkNativeTaskExecutorFactory.completeTask(true, (CollectionAccumulator<SerializedTaskInfo>)this.taskInfoCollectionAccumulator, this.nativeExecutionTask, (Codec<TaskInfo>)this.taskInfoCodec, this.cpuTracker);
            return Optional.empty();
        }

        public Tuple2<MutablePartitionId, T> next() {
            Preconditions.checkArgument((this.outputType == PrestoSparkSerializedPage.class ? 1 : 0) != 0, (Object)String.format("PrestoSparkNativeTaskExecutorFactory only outputType=PrestoSparkSerializedPageBut tried to extract outputType=%s", this.outputType));
            MutablePartitionId mutablePartitionId = new MutablePartitionId();
            mutablePartitionId.setPartition(this.partitionId);
            return new Tuple2((Object)mutablePartitionId, (Object)PrestoSparkUtils.toPrestoSparkSerializedPage(this.next.get()));
        }
    }

    private static class CpuTracker {
        private OperatingSystemMXBean operatingSystemMXBean;
        private OptionalLong startCpuTime;

        public CpuTracker() {
            if (ManagementFactory.getOperatingSystemMXBean() instanceof OperatingSystemMXBean) {
                this.operatingSystemMXBean = (OperatingSystemMXBean)ManagementFactory.getOperatingSystemMXBean();
                this.startCpuTime = OptionalLong.of(this.operatingSystemMXBean.getProcessCpuTime());
            } else {
                this.startCpuTime = OptionalLong.empty();
            }
        }

        OptionalLong get() {
            if (this.operatingSystemMXBean != null) {
                long endCpuTime = this.operatingSystemMXBean.getProcessCpuTime();
                return OptionalLong.of(endCpuTime - this.startCpuTime.getAsLong());
            }
            return OptionalLong.empty();
        }
    }
}

