/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.concurrent.SetThreadName;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.event.SplitMonitor;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.ScheduledSplit;
import com.facebook.presto.execution.SplitRunner;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.TaskStateMachine;
import com.facebook.presto.execution.executor.TaskExecutor;
import com.facebook.presto.execution.executor.TaskHandle;
import com.facebook.presto.operator.Driver;
import com.facebook.presto.operator.DriverContext;
import com.facebook.presto.operator.DriverFactory;
import com.facebook.presto.operator.DriverStats;
import com.facebook.presto.operator.PipelineContext;
import com.facebook.presto.operator.PipelineExecutionStrategy;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.sql.planner.LocalExecutionPlanner;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

public class PrestoSparkTaskExecution {
    private static final int MAX_JAVA_DRIVERS_FOR_NATIVE_TASK = 1;
    private final TaskId taskId;
    private final TaskStateMachine taskStateMachine;
    private final TaskContext taskContext;
    private final TaskHandle taskHandle;
    private final TaskExecutor taskExecutor;
    private final Executor notificationExecutor;
    private final SplitMonitor splitMonitor;
    private final List<PlanNodeId> schedulingOrder;
    private final Map<PlanNodeId, DriverSplitRunnerFactory> driverRunnerFactoriesWithSplitLifeCycle;
    private final List<DriverSplitRunnerFactory> driverRunnerFactoriesWithTaskLifeCycle;
    private final AtomicInteger remainingDrivers = new AtomicInteger();
    private final AtomicBoolean started = new AtomicBoolean();

    public PrestoSparkTaskExecution(TaskStateMachine taskStateMachine, TaskContext taskContext, LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan, TaskExecutor taskExecutor, SplitMonitor splitMonitor, Executor notificationExecutor, ScheduledExecutorService memoryUpdateExecutor, boolean isNativeTask) {
        this.taskStateMachine = Objects.requireNonNull(taskStateMachine, "taskStateMachine is null");
        this.taskId = taskStateMachine.getTaskId();
        this.taskContext = Objects.requireNonNull(taskContext, "taskContext is null");
        this.taskExecutor = Objects.requireNonNull(taskExecutor, "driverExecutor is null");
        this.notificationExecutor = Objects.requireNonNull(notificationExecutor, "notificationExecutor is null");
        this.splitMonitor = Objects.requireNonNull(splitMonitor, "splitMonitor is null");
        this.schedulingOrder = localExecutionPlan.getTableScanSourceOrder();
        ImmutableSet tableScanSources = ImmutableSet.copyOf(this.schedulingOrder);
        ImmutableMap.Builder driverRunnerFactoriesWithSplitLifeCycle = ImmutableMap.builder();
        ImmutableList.Builder driverRunnerFactoriesWithTaskLifeCycle = ImmutableList.builder();
        for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) {
            Optional sourceId = driverFactory.getSourceId();
            if (sourceId.isPresent() && tableScanSources.contains(sourceId.get())) {
                driverRunnerFactoriesWithSplitLifeCycle.put(sourceId.get(), (Object)new DriverSplitRunnerFactory(driverFactory, true));
                continue;
            }
            Preconditions.checkArgument((driverFactory.getPipelineExecutionStrategy() == PipelineExecutionStrategy.UNGROUPED_EXECUTION ? 1 : 0) != 0, (String)"unexpected pipeline execution strategy: %s", (Object)driverFactory.getPipelineExecutionStrategy());
            driverRunnerFactoriesWithTaskLifeCycle.add((Object)new DriverSplitRunnerFactory(driverFactory, false));
        }
        this.driverRunnerFactoriesWithSplitLifeCycle = driverRunnerFactoriesWithSplitLifeCycle.build();
        this.driverRunnerFactoriesWithTaskLifeCycle = driverRunnerFactoriesWithTaskLifeCycle.build();
        Preconditions.checkArgument((boolean)this.driverRunnerFactoriesWithSplitLifeCycle.keySet().equals(tableScanSources), (Object)"Fragment is partitioned, but not all partitioned drivers were found");
        this.taskHandle = PrestoSparkTaskExecution.createTaskHandle(taskStateMachine, taskContext, localExecutionPlan, taskExecutor, isNativeTask);
        Objects.requireNonNull(memoryUpdateExecutor, "memoryUpdateExecutor is null");
        memoryUpdateExecutor.schedule(() -> ((TaskContext)taskContext).updatePeakMemory(), 1L, TimeUnit.SECONDS);
    }

    private static TaskHandle createTaskHandle(TaskStateMachine taskStateMachine, TaskContext taskContext, LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan, TaskExecutor taskExecutor, boolean isNativeTask) {
        TaskHandle taskHandle = taskExecutor.addTask(taskStateMachine.getTaskId(), () -> 0.0, SystemSessionProperties.getInitialSplitsPerNode((Session)taskContext.getSession()), SystemSessionProperties.getSplitConcurrencyAdjustmentInterval((Session)taskContext.getSession()), isNativeTask ? OptionalInt.of(1) : SystemSessionProperties.getMaxDriversPerTask((Session)taskContext.getSession()));
        taskStateMachine.addStateChangeListener(state -> {
            if (state.isDone()) {
                taskExecutor.removeTask(taskHandle);
                for (DriverFactory factory : localExecutionPlan.getDriverFactories()) {
                    factory.noMoreDrivers();
                }
            }
        });
        return taskHandle;
    }

    public void start(List<TaskSource> sources) {
        Objects.requireNonNull(sources, "sources is null");
        Preconditions.checkState((boolean)this.started.compareAndSet(false, true), (Object)"already started");
        this.scheduleDriversForTaskLifeCycle();
        this.scheduleDriversForSplitLifeCycle(sources);
        this.checkTaskCompletion();
    }

    private void scheduleDriversForTaskLifeCycle() {
        ArrayList<DriverSplitRunner> runners = new ArrayList<DriverSplitRunner>();
        for (DriverSplitRunnerFactory driverRunnerFactory : this.driverRunnerFactoriesWithTaskLifeCycle) {
            for (int i = 0; i < driverRunnerFactory.getDriverInstances().orElse(1); ++i) {
                runners.add(driverRunnerFactory.createDriverRunner(null));
            }
        }
        this.enqueueDriverSplitRunner(true, runners);
        for (DriverSplitRunnerFactory driverRunnerFactory : this.driverRunnerFactoriesWithTaskLifeCycle) {
            driverRunnerFactory.noMoreDriverRunner();
            Verify.verify((boolean)driverRunnerFactory.isNoMoreDriverRunner());
        }
    }

    private synchronized void scheduleDriversForSplitLifeCycle(List<TaskSource> sources) {
        Preconditions.checkArgument((boolean)sources.stream().allMatch(TaskSource::isNoMoreSplits), (Object)"All task sources are expected to be final");
        ArrayListMultimap splits = ArrayListMultimap.create();
        for (TaskSource taskSource : sources) {
            splits.putAll((Object)taskSource.getPlanNodeId(), (Iterable)taskSource.getSplits());
        }
        for (PlanNodeId planNodeId : this.schedulingOrder) {
            DriverSplitRunnerFactory driverSplitRunnerFactory = this.driverRunnerFactoriesWithSplitLifeCycle.get(planNodeId);
            List planNodeSplits = splits.get((Object)planNodeId);
            this.scheduleTableScanSource(driverSplitRunnerFactory, planNodeSplits);
        }
    }

    private synchronized void scheduleTableScanSource(DriverSplitRunnerFactory factory, List<ScheduledSplit> splits) {
        factory.splitsAdded(splits.size(), SplitWeight.rawValueSum(splits, scheduledSplit -> scheduledSplit.getSplit().getSplitWeight()));
        ImmutableList.Builder runners = ImmutableList.builder();
        if (SystemSessionProperties.isNativeExecutionEnabled((Session)this.taskContext.getSession())) {
            runners.add((Object)factory.createDriverRunner(splits));
        } else {
            for (ScheduledSplit scheduledSplit2 : splits) {
                runners.add((Object)factory.createDriverRunner((List<ScheduledSplit>)ImmutableList.of((Object)scheduledSplit2)));
            }
        }
        this.enqueueDriverSplitRunner(false, (List<DriverSplitRunner>)runners.build());
        factory.noMoreDriverRunner();
    }

    private synchronized void enqueueDriverSplitRunner(boolean forceRunSplit, List<DriverSplitRunner> runners) {
        List finishedFutures = this.taskExecutor.enqueueSplits(this.taskHandle, forceRunSplit, runners);
        Preconditions.checkState((finishedFutures.size() == runners.size() ? 1 : 0) != 0, (String)"Expected %s futures but got %s", (int)runners.size(), (int)finishedFutures.size());
        for (int i = 0; i < finishedFutures.size(); ++i) {
            ListenableFuture finishedFuture = (ListenableFuture)finishedFutures.get(i);
            final DriverSplitRunner splitRunner = runners.get(i);
            this.remainingDrivers.incrementAndGet();
            Futures.addCallback((ListenableFuture)finishedFuture, (FutureCallback)new FutureCallback<Object>(){

                public void onSuccess(Object result) {
                    try (SetThreadName ignored = new SetThreadName("Task-%s", new Object[]{PrestoSparkTaskExecution.this.taskId});){
                        PrestoSparkTaskExecution.this.remainingDrivers.decrementAndGet();
                        PrestoSparkTaskExecution.this.checkTaskCompletion();
                        PrestoSparkTaskExecution.this.splitMonitor.splitCompletedEvent(PrestoSparkTaskExecution.this.taskId, this.getDriverStats());
                    }
                }

                public void onFailure(Throwable cause) {
                    try (SetThreadName ignored = new SetThreadName("Task-%s", new Object[]{PrestoSparkTaskExecution.this.taskId});){
                        PrestoSparkTaskExecution.this.taskStateMachine.failed(cause);
                        PrestoSparkTaskExecution.this.remainingDrivers.decrementAndGet();
                        PrestoSparkTaskExecution.this.splitMonitor.splitFailedEvent(PrestoSparkTaskExecution.this.taskId, this.getDriverStats(), cause);
                    }
                }

                private DriverStats getDriverStats() {
                    DriverContext driverContext = splitRunner.getDriverContext();
                    DriverStats driverStats = driverContext != null ? driverContext.getDriverStats() : new DriverStats();
                    return driverStats;
                }
            }, (Executor)this.notificationExecutor);
        }
    }

    private synchronized void checkTaskCompletion() {
        if (this.taskStateMachine.getState().isDone()) {
            return;
        }
        for (DriverSplitRunnerFactory driverSplitRunnerFactory : this.driverRunnerFactoriesWithSplitLifeCycle.values()) {
            if (driverSplitRunnerFactory.isNoMoreDriverRunner()) continue;
            return;
        }
        if (this.remainingDrivers.get() != 0) {
            return;
        }
        this.taskStateMachine.finished();
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("taskId", (Object)this.taskId).add("remainingDrivers", this.remainingDrivers.get()).toString();
    }

    private static class DriverSplitRunner
    implements SplitRunner {
        private final DriverSplitRunnerFactory driverSplitRunnerFactory;
        private final DriverContext driverContext;
        @GuardedBy(value="this")
        private boolean closed;
        @Nullable
        private List<ScheduledSplit> scheduledSplits;
        @GuardedBy(value="this")
        private Driver driver;

        private DriverSplitRunner(DriverSplitRunnerFactory driverSplitRunnerFactory, DriverContext driverContext, @Nullable List<ScheduledSplit> scheduledSplits) {
            this.driverSplitRunnerFactory = Objects.requireNonNull(driverSplitRunnerFactory, "driverFactory is null");
            this.driverContext = Objects.requireNonNull(driverContext, "driverContext is null");
            this.scheduledSplits = scheduledSplits;
        }

        public synchronized DriverContext getDriverContext() {
            if (this.driver == null) {
                return null;
            }
            return this.driver.getDriverContext();
        }

        public synchronized boolean isFinished() {
            if (this.closed) {
                return true;
            }
            return this.driver != null && this.driver.isFinished();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ListenableFuture<?> processFor(Duration duration) {
            Driver driver;
            DriverSplitRunner driverSplitRunner = this;
            synchronized (driverSplitRunner) {
                if (this.closed) {
                    return Futures.immediateFuture(null);
                }
                if (this.driver == null) {
                    this.driver = this.driverSplitRunnerFactory.createDriver(this.driverContext, this.scheduledSplits);
                }
                driver = this.driver;
            }
            return driver.processFor(duration);
        }

        public String getInfo() {
            return this.scheduledSplits == null || this.scheduledSplits.isEmpty() ? "" : String.format("DriverRunner splitCount=%d [%s]", this.scheduledSplits.size(), Joiner.on((String)",").join((Iterable)this.scheduledSplits.stream().map(split -> split.getSplit().getConnectorSplit()).collect(Collectors.toList())));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            Driver driver;
            DriverSplitRunner driverSplitRunner = this;
            synchronized (driverSplitRunner) {
                this.closed = true;
                driver = this.driver;
            }
            if (driver != null) {
                driver.close();
            }
        }
    }

    private class DriverSplitRunnerFactory {
        private final DriverFactory driverFactory;
        private final PipelineContext pipelineContext;
        private final AtomicInteger pendingCreation = new AtomicInteger();
        private final AtomicBoolean noMoreDriverRunner = new AtomicBoolean();
        private final AtomicBoolean closed = new AtomicBoolean();

        private DriverSplitRunnerFactory(DriverFactory driverFactory, boolean partitioned) {
            this.driverFactory = Objects.requireNonNull(driverFactory, "driverFactory is null");
            this.pipelineContext = PrestoSparkTaskExecution.this.taskContext.addPipelineContext(driverFactory.getPipelineId(), driverFactory.isInputDriver(), driverFactory.isOutputDriver(), partitioned);
        }

        public DriverSplitRunner createDriverRunner(@Nullable List<ScheduledSplit> scheduledSplits) {
            Preconditions.checkState((!this.noMoreDriverRunner.get() ? 1 : 0) != 0, (String)"Cannot create driver for pipeline: %s", (int)this.pipelineContext.getPipelineId());
            this.pendingCreation.incrementAndGet();
            DriverContext driverContext = this.pipelineContext.addDriverContext(0L, Lifespan.taskWide(), this.driverFactory.getFragmentResultCacheContext());
            return new DriverSplitRunner(this, driverContext, scheduledSplits);
        }

        public Driver createDriver(DriverContext driverContext, @Nullable List<ScheduledSplit> scheduledSplits) {
            Driver driver = this.driverFactory.createDriver(driverContext);
            if (scheduledSplits != null && scheduledSplits.size() > 0) {
                boolean isNativeExecutionEnabled = SystemSessionProperties.isNativeExecutionEnabled((Session)driver.getDriverContext().getSession());
                if (!isNativeExecutionEnabled && scheduledSplits.size() != 1) {
                    throw new IllegalArgumentException(String.format("non-native (java) execution requires only one scheduledSplits but [%d] were found [%s]", scheduledSplits.size(), Joiner.on((String)",").join((Iterable)scheduledSplits.stream().map(ScheduledSplit::toString).collect(Collectors.toList()))));
                }
                PlanNodeId sourceNodeId = isNativeExecutionEnabled ? (PlanNodeId)driver.getSourceId().get() : ((ScheduledSplit)Iterables.getOnlyElement(scheduledSplits)).getPlanNodeId();
                driver.updateSource(new TaskSource(sourceNodeId, (Set)ImmutableSet.copyOf(scheduledSplits), true));
            }
            Verify.verify((this.pendingCreation.get() > 0 ? 1 : 0) != 0, (String)"pendingCreation is expected to be greater than zero", (Object[])new Object[0]);
            this.pendingCreation.decrementAndGet();
            this.closeDriverFactoryIfFullyCreated();
            return driver;
        }

        public void noMoreDriverRunner() {
            if (this.noMoreDriverRunner.get()) {
                return;
            }
            this.noMoreDriverRunner.set(true);
            this.closeDriverFactoryIfFullyCreated();
        }

        public boolean isNoMoreDriverRunner() {
            return this.noMoreDriverRunner.get();
        }

        public void closeDriverFactoryIfFullyCreated() {
            if (this.closed.get()) {
                return;
            }
            if (this.isNoMoreDriverRunner() && this.pendingCreation.get() == 0) {
                if (!this.closed.compareAndSet(false, true)) {
                    return;
                }
                this.driverFactory.noMoreDrivers(Lifespan.taskWide());
                this.driverFactory.noMoreDrivers();
            }
        }

        public OptionalInt getDriverInstances() {
            return this.driverFactory.getDriverInstances();
        }

        public void splitsAdded(int count, long weightSum) {
            this.pipelineContext.splitsAdded(count, weightSum);
        }
    }
}

