/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.GcMonitor;
import com.facebook.airlift.stats.TestingGcMonitor;
import com.facebook.presto.Session;
import com.facebook.presto.block.BlockEncodingManager;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.event.SplitMonitor;
import com.facebook.presto.execution.StageExecutionId;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.TaskStateMachine;
import com.facebook.presto.execution.buffer.OutputBufferMemoryManager;
import com.facebook.presto.execution.executor.TaskExecutor;
import com.facebook.presto.memory.MemoryPool;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.memory.QueryContext;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.operator.OutputFactory;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.spark.PrestoSparkAuthenticatorProvider;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutor;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutorFactory;
import com.facebook.presto.spark.classloader_interface.PrestoSparkRow;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.SerializedTaskStats;
import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory;
import com.facebook.presto.spark.execution.PrestoSparkOutputOperator;
import com.facebook.presto.spark.execution.PrestoSparkRemoteSourceFactory;
import com.facebook.presto.spark.execution.PrestoSparkRowBuffer;
import com.facebook.presto.spark.execution.PrestoSparkTaskExecution;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spiller.NodeSpillConfig;
import com.facebook.presto.spiller.SpillSpaceTracker;
import com.facebook.presto.sql.planner.LocalExecutionPlanner;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.RemoteSourceFactory;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import io.airlift.units.DataSize;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Inject;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.CollectionAccumulator;
import scala.Tuple2;

public class PrestoSparkTaskExecutorFactory
implements IPrestoSparkTaskExecutorFactory {
    private static final Logger log = Logger.get(PrestoSparkTaskExecutorFactory.class);
    private final SessionPropertyManager sessionPropertyManager;
    private final BlockEncodingManager blockEncodingManager;
    private final JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec;
    private final JsonCodec<TaskStats> taskStatsJsonCodec;
    private final Executor notificationExecutor;
    private final ScheduledExecutorService yieldExecutor;
    private final LocalExecutionPlanner localExecutionPlanner;
    private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
    private final TaskExecutor taskExecutor;
    private final SplitMonitor splitMonitor;
    private final Set<PrestoSparkAuthenticatorProvider> authenticatorProviders;
    private final DataSize maxUserMemory;
    private final DataSize maxTotalMemory;
    private final DataSize maxSpillMemory;
    private final DataSize sinkMaxBufferSize;
    private final boolean perOperatorCpuTimerEnabled;
    private final boolean cpuTimerEnabled;
    private final boolean perOperatorAllocationTrackingEnabled;
    private final boolean allocationTrackingEnabled;

    @Inject
    public PrestoSparkTaskExecutorFactory(SessionPropertyManager sessionPropertyManager, BlockEncodingManager blockEncodingManager, JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec, JsonCodec<TaskStats> taskStatsJsonCodec, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, LocalExecutionPlanner localExecutionPlanner, PrestoSparkExecutionExceptionFactory executionExceptionFactory, TaskExecutor taskExecutor, SplitMonitor splitMonitor, Set<PrestoSparkAuthenticatorProvider> authenticatorProviders, TaskManagerConfig taskManagerConfig, NodeMemoryConfig nodeMemoryConfig, NodeSpillConfig nodeSpillConfig) {
        this(sessionPropertyManager, blockEncodingManager, taskDescriptorJsonCodec, taskStatsJsonCodec, notificationExecutor, yieldExecutor, localExecutionPlanner, executionExceptionFactory, taskExecutor, splitMonitor, authenticatorProviders, Objects.requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null").getMaxQueryMemoryPerNode(), Objects.requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null").getMaxQueryTotalMemoryPerNode(), Objects.requireNonNull(nodeSpillConfig, "nodeSpillConfig is null").getMaxSpillPerNode(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").getSinkMaxBufferSize(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isPerOperatorCpuTimerEnabled(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isTaskCpuTimerEnabled(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isPerOperatorAllocationTrackingEnabled(), Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null").isTaskAllocationTrackingEnabled());
    }

    public PrestoSparkTaskExecutorFactory(SessionPropertyManager sessionPropertyManager, BlockEncodingManager blockEncodingManager, JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec, JsonCodec<TaskStats> taskStatsJsonCodec, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, LocalExecutionPlanner localExecutionPlanner, PrestoSparkExecutionExceptionFactory executionExceptionFactory, TaskExecutor taskExecutor, SplitMonitor splitMonitor, Set<PrestoSparkAuthenticatorProvider> authenticatorProviders, DataSize maxUserMemory, DataSize maxTotalMemory, DataSize maxSpillMemory, DataSize sinkMaxBufferSize, boolean perOperatorCpuTimerEnabled, boolean cpuTimerEnabled, boolean perOperatorAllocationTrackingEnabled, boolean allocationTrackingEnabled) {
        this.sessionPropertyManager = Objects.requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
        this.blockEncodingManager = Objects.requireNonNull(blockEncodingManager, "blockEncodingManager is null");
        this.taskDescriptorJsonCodec = Objects.requireNonNull(taskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null");
        this.taskStatsJsonCodec = Objects.requireNonNull(taskStatsJsonCodec, "taskStatsJsonCodec is null");
        this.notificationExecutor = Objects.requireNonNull(notificationExecutor, "notificationExecutor is null");
        this.yieldExecutor = Objects.requireNonNull(yieldExecutor, "yieldExecutor is null");
        this.localExecutionPlanner = Objects.requireNonNull(localExecutionPlanner, "localExecutionPlanner is null");
        this.executionExceptionFactory = Objects.requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
        this.taskExecutor = Objects.requireNonNull(taskExecutor, "taskExecutor is null");
        this.splitMonitor = Objects.requireNonNull(splitMonitor, "splitMonitor is null");
        this.authenticatorProviders = ImmutableSet.copyOf((Collection)Objects.requireNonNull(authenticatorProviders, "authenticatorProviders is null"));
        this.maxUserMemory = Objects.requireNonNull(maxUserMemory, "maxUserMemory is null");
        this.maxTotalMemory = Objects.requireNonNull(maxTotalMemory, "maxTotalMemory is null");
        this.maxSpillMemory = Objects.requireNonNull(maxSpillMemory, "maxSpillMemory is null");
        this.sinkMaxBufferSize = Objects.requireNonNull(sinkMaxBufferSize, "sinkMaxBufferSize is null");
        this.perOperatorCpuTimerEnabled = perOperatorCpuTimerEnabled;
        this.cpuTimerEnabled = cpuTimerEnabled;
        this.perOperatorAllocationTrackingEnabled = perOperatorAllocationTrackingEnabled;
        this.allocationTrackingEnabled = allocationTrackingEnabled;
    }

    public IPrestoSparkTaskExecutor create(int partitionId, int attemptNumber, SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, PrestoSparkTaskInputs inputs, CollectionAccumulator<SerializedTaskStats> taskStatsCollector) {
        try {
            return this.doCreate(partitionId, attemptNumber, serializedTaskDescriptor, inputs, taskStatsCollector);
        }
        catch (RuntimeException e) {
            throw this.executionExceptionFactory.toPrestoSparkExecutionException(e);
        }
    }

    public IPrestoSparkTaskExecutor doCreate(int partitionId, int attemptNumber, SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, PrestoSparkTaskInputs inputs, CollectionAccumulator<SerializedTaskStats> taskStatsCollector) {
        PrestoSparkTaskDescriptor taskDescriptor = (PrestoSparkTaskDescriptor)this.taskDescriptorJsonCodec.fromJson(serializedTaskDescriptor.getBytes());
        ImmutableMap.Builder extraAuthenticators = ImmutableMap.builder();
        this.authenticatorProviders.forEach(provider -> extraAuthenticators.putAll(provider.getTokenAuthenticators()));
        Session session = taskDescriptor.getSession().toSession(this.sessionPropertyManager, taskDescriptor.getExtraCredentials(), (Map)extraAuthenticators.build());
        PlanFragment fragment = taskDescriptor.getFragment();
        StageId stageId = new StageId(session.getQueryId(), fragment.getId().getId());
        TaskId taskId = new TaskId(new StageExecutionId(stageId, 0), partitionId);
        log.info("Task [%s] received %d splits.", new Object[]{taskId, taskDescriptor.getSources().stream().mapToInt(taskSource -> taskSource.getSplits().size()).sum()});
        MemoryPool memoryPool = new MemoryPool(new MemoryPoolId("spark-executor-memory-pool"), this.maxTotalMemory);
        SpillSpaceTracker spillSpaceTracker = new SpillSpaceTracker(this.maxSpillMemory);
        QueryContext queryContext = new QueryContext(session.getQueryId(), this.maxUserMemory, this.maxTotalMemory, memoryPool, (GcMonitor)new TestingGcMonitor(), this.notificationExecutor, this.yieldExecutor, this.maxSpillMemory, spillSpaceTracker);
        TaskContext taskContext = queryContext.addTaskContext(new TaskStateMachine(taskId, this.notificationExecutor), session, this.perOperatorCpuTimerEnabled, this.cpuTimerEnabled, this.perOperatorAllocationTrackingEnabled, this.allocationTrackingEnabled, false);
        OutputBufferMemoryManager memoryManager = new OutputBufferMemoryManager(this.sinkMaxBufferSize.toBytes(), () -> queryContext.getTaskContextByTaskId(taskId).localSystemMemoryContext(), this.notificationExecutor);
        PrestoSparkRowBuffer rowBuffer = new PrestoSparkRowBuffer(memoryManager);
        ImmutableMap.Builder shuffleInputs = ImmutableMap.builder();
        ImmutableMap.Builder broadcastInputs = ImmutableMap.builder();
        for (RemoteSourceNode remoteSource : fragment.getRemoteSourceNodes()) {
            ArrayList<Iterator> shuffleRemoteSourceInputs = new ArrayList<Iterator>();
            ArrayList broadcastRemoteSourceInputs = new ArrayList();
            for (PlanFragmentId sourceFragmentId : remoteSource.getSourceFragmentIds()) {
                Iterator shuffleInput = (Iterator)inputs.getShuffleInputs().get(sourceFragmentId.toString());
                Broadcast broadcastInput = (Broadcast)inputs.getBroadcastInputs().get(sourceFragmentId.toString());
                Preconditions.checkArgument((shuffleInput != null || broadcastInput != null ? 1 : 0) != 0, (String)"Input not found for sourceFragmentId: %s", (Object)sourceFragmentId);
                Preconditions.checkArgument((shuffleInput == null || broadcastInput == null ? 1 : 0) != 0, (Object)"Single remote source cannot accept both, broadcast and shuffle inputs");
                if (shuffleInput != null) {
                    shuffleRemoteSourceInputs.add(Iterators.transform((Iterator)shuffleInput, tuple -> (PrestoSparkRow)tuple._2));
                }
                if (broadcastInput == null) continue;
                broadcastRemoteSourceInputs.add(((List)broadcastInput.value()).iterator());
            }
            if (!shuffleRemoteSourceInputs.isEmpty()) {
                shuffleInputs.put((Object)remoteSource.getId(), (Object)Iterators.concat(shuffleRemoteSourceInputs.iterator()));
            }
            if (broadcastRemoteSourceInputs.isEmpty()) continue;
            broadcastInputs.put((Object)remoteSource.getId(), (Object)Iterators.concat(broadcastRemoteSourceInputs.iterator()));
        }
        LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = this.localExecutionPlanner.plan(taskContext, fragment.getRoot(), fragment.getPartitioningScheme(), fragment.getStageExecutionDescriptor(), fragment.getTableScanSchedulingOrder(), (OutputFactory)new PrestoSparkOutputOperator.PrestoSparkOutputFactory(rowBuffer), (RemoteSourceFactory)new PrestoSparkRemoteSourceFactory(new PagesSerde((BlockEncodingSerde)this.blockEncodingManager, Optional.empty(), Optional.empty(), Optional.empty()), (Map<PlanNodeId, Iterator<PrestoSparkRow>>)shuffleInputs.build(), (Map<PlanNodeId, Iterator<PrestoSparkSerializedPage>>)broadcastInputs.build()), taskDescriptor.getTableWriteInfo(), true);
        TaskStateMachine taskStateMachine = new TaskStateMachine(taskId, this.notificationExecutor);
        taskStateMachine.addStateChangeListener(state -> {
            if (state.isDone()) {
                rowBuffer.setNoMoreRows();
            }
        });
        PrestoSparkTaskExecution taskExecution = new PrestoSparkTaskExecution(taskStateMachine, taskContext, localExecutionPlan, this.taskExecutor, this.splitMonitor, this.notificationExecutor);
        taskExecution.start(taskDescriptor.getSources());
        return new PrestoSparkTaskExecutor(taskContext, taskStateMachine, rowBuffer, this.taskStatsJsonCodec, taskStatsCollector, this.executionExceptionFactory);
    }

    private static class PrestoSparkTaskExecutor
    extends AbstractIterator<Tuple2<Integer, PrestoSparkRow>>
    implements IPrestoSparkTaskExecutor {
        private final TaskContext taskContext;
        private final TaskStateMachine taskStateMachine;
        private final PrestoSparkRowBuffer rowBuffer;
        private final JsonCodec<TaskStats> taskStatsJsonCodec;
        private final CollectionAccumulator<SerializedTaskStats> taskStatsCollector;
        private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
        private List<PrestoSparkRow> currentRowBatch;
        private int indexInBatch;

        private PrestoSparkTaskExecutor(TaskContext taskContext, TaskStateMachine taskStateMachine, PrestoSparkRowBuffer rowBuffer, JsonCodec<TaskStats> taskStatsJsonCodec, CollectionAccumulator<SerializedTaskStats> taskStatsCollector, PrestoSparkExecutionExceptionFactory executionExceptionFactory) {
            this.taskContext = Objects.requireNonNull(taskContext, "taskContext is null");
            this.taskStateMachine = Objects.requireNonNull(taskStateMachine, "taskStateMachine is null");
            this.rowBuffer = Objects.requireNonNull(rowBuffer, "rowBuffer is null");
            this.taskStatsJsonCodec = Objects.requireNonNull(taskStatsJsonCodec, "taskStatsJsonCodec is null");
            this.taskStatsCollector = Objects.requireNonNull(taskStatsCollector, "taskStatsCollector is null");
            this.executionExceptionFactory = Objects.requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
        }

        protected Tuple2<Integer, PrestoSparkRow> computeNext() {
            try {
                return this.doComputeNext();
            }
            catch (RuntimeException e) {
                throw this.executionExceptionFactory.toPrestoSparkExecutionException(e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.taskStateMachine.abort();
                throw new RuntimeException(e);
            }
        }

        private Tuple2<Integer, PrestoSparkRow> doComputeNext() throws InterruptedException {
            PrestoSparkRow row = this.getNextRow();
            if (row != null) {
                return new Tuple2((Object)row.getPartition(), (Object)row);
            }
            TaskState taskState = this.taskStateMachine.getState();
            Preconditions.checkState((boolean)taskState.isDone(), (Object)"task is expected to be done");
            LinkedBlockingQueue failures = this.taskStateMachine.getFailureCauses();
            if (failures.isEmpty()) {
                return (Tuple2)this.endOfData();
            }
            Throwable failure = (Throwable)Iterables.getFirst((Iterable)failures, null);
            Throwables.propagateIfPossible((Throwable)failure, Error.class);
            Throwables.propagateIfPossible((Throwable)failure, RuntimeException.class);
            Throwables.propagateIfPossible((Throwable)failure, InterruptedException.class);
            throw new RuntimeException(failure);
        }

        private PrestoSparkRow getNextRow() throws InterruptedException {
            if (this.currentRowBatch == null || this.indexInBatch >= this.currentRowBatch.size()) {
                this.currentRowBatch = this.rowBuffer.get();
                this.indexInBatch = 0;
                if (this.currentRowBatch == null || this.currentRowBatch.isEmpty()) {
                    return null;
                }
            }
            PrestoSparkRow row = this.currentRowBatch.get(this.indexInBatch);
            ++this.indexInBatch;
            return row;
        }
    }
}

