/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution;

import com.facebook.presto.execution.SplitConcurrencyController;
import com.facebook.presto.execution.SplitRunner;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.spi.PrestoException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.SetThreadName;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.stats.CpuTimer;
import io.airlift.stats.TimeStat;
import io.airlift.units.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.function.DoubleSupplier;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

@ThreadSafe
public class TaskExecutor {
    private static final Logger log = Logger.get(TaskExecutor.class);
    private static final int GUARANTEED_SPLITS_PER_TASK = 3;
    private static final Duration SPLIT_RUN_QUANTA = new Duration(1.0, TimeUnit.SECONDS);
    private static final AtomicLong NEXT_RUNNER_ID = new AtomicLong();
    private static final AtomicLong NEXT_WORKER_ID = new AtomicLong();
    private final ExecutorService executor;
    private final ThreadPoolExecutorMBean executorMBean;
    private final int runnerThreads;
    private final int minimumNumberOfDrivers;
    private final Ticker ticker;
    @GuardedBy(value="this")
    private final List<TaskHandle> tasks;
    @GuardedBy(value="this")
    private final Set<PrioritizedSplitRunner> allSplits = new HashSet<PrioritizedSplitRunner>();
    private final PriorityBlockingQueue<PrioritizedSplitRunner> pendingSplits;
    private final Set<PrioritizedSplitRunner> runningSplits = Sets.newConcurrentHashSet();
    private final Map<PrioritizedSplitRunner, Future<?>> blockedSplits = new ConcurrentHashMap();
    private final AtomicLongArray completedTasksPerLevel = new AtomicLongArray(5);
    private final TimeStat queuedTime = new TimeStat(TimeUnit.NANOSECONDS);
    private final TimeStat wallTime = new TimeStat(TimeUnit.NANOSECONDS);
    private volatile boolean closed;

    @Inject
    public TaskExecutor(TaskManagerConfig config) {
        this(Objects.requireNonNull(config, "config is null").getMaxWorkerThreads(), config.getMinDrivers());
    }

    public TaskExecutor(int runnerThreads, int minDrivers) {
        this(runnerThreads, minDrivers, Ticker.systemTicker());
    }

    @VisibleForTesting
    public TaskExecutor(int runnerThreads, int minDrivers, Ticker ticker) {
        Preconditions.checkArgument((runnerThreads > 0 ? 1 : 0) != 0, (Object)"runnerThreads must be at least 1");
        this.executor = Executors.newCachedThreadPool(Threads.threadsNamed((String)"task-processor-%s"));
        this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor)this.executor);
        this.runnerThreads = runnerThreads;
        this.ticker = Objects.requireNonNull(ticker, "ticker is null");
        this.minimumNumberOfDrivers = minDrivers;
        this.pendingSplits = new PriorityBlockingQueue(Runtime.getRuntime().availableProcessors() * 10);
        this.tasks = new LinkedList<TaskHandle>();
    }

    @PostConstruct
    public synchronized void start() {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"TaskExecutor is closed");
        for (int i = 0; i < this.runnerThreads; ++i) {
            this.addRunnerThread();
        }
    }

    @PreDestroy
    public synchronized void stop() {
        this.closed = true;
        this.executor.shutdownNow();
    }

    public synchronized String toString() {
        return MoreObjects.toStringHelper((Object)this).add("runnerThreads", this.runnerThreads).add("allSplits", this.allSplits.size()).add("pendingSplits", this.pendingSplits.size()).add("runningSplits", this.runningSplits.size()).add("blockedSplits", this.blockedSplits.size()).toString();
    }

    private synchronized void addRunnerThread() {
        try {
            this.executor.execute(new Runner());
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }

    public synchronized TaskHandle addTask(TaskId taskId, DoubleSupplier utilizationSupplier, int initialSplitConcurrency, Duration splitConcurrencyAdjustFrequency) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(utilizationSupplier, "utilizationSupplier is null");
        TaskHandle taskHandle = new TaskHandle(taskId, utilizationSupplier, initialSplitConcurrency, splitConcurrencyAdjustFrequency);
        this.tasks.add(taskHandle);
        return taskHandle;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeTask(TaskHandle taskHandle) {
        List splits;
        TaskExecutor taskExecutor = this;
        synchronized (taskExecutor) {
            this.tasks.remove(taskHandle);
            splits = taskHandle.destroy();
            this.allSplits.removeAll(splits);
            this.blockedSplits.keySet().removeAll(splits);
            this.pendingSplits.removeAll(splits);
        }
        for (PrioritizedSplitRunner split : splits) {
            split.destroy();
        }
        long threadUsageNanos = taskHandle.getThreadUsageNanos();
        int priorityLevel = TaskExecutor.calculatePriorityLevel(threadUsageNanos);
        this.completedTasksPerLevel.incrementAndGet(priorityLevel);
        this.addNewEntrants();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<ListenableFuture<?>> enqueueSplits(TaskHandle taskHandle, boolean forceStart, List<? extends SplitRunner> taskSplits) {
        ArrayList<PrioritizedSplitRunner> splitsToDestroy = new ArrayList<PrioritizedSplitRunner>();
        ArrayList finishedFutures = new ArrayList(taskSplits.size());
        TaskExecutor taskExecutor = this;
        synchronized (taskExecutor) {
            for (SplitRunner splitRunner : taskSplits) {
                PrioritizedSplitRunner prioritizedSplitRunner = new PrioritizedSplitRunner(taskHandle, splitRunner, this.ticker);
                if (taskHandle.isDestroyed()) {
                    splitsToDestroy.add(prioritizedSplitRunner);
                } else if (forceStart) {
                    this.startSplit(prioritizedSplitRunner);
                    taskHandle.recordForcedRunningSplit(prioritizedSplitRunner);
                } else {
                    taskHandle.enqueueSplit(prioritizedSplitRunner);
                    this.scheduleTaskIfNecessary(taskHandle);
                    this.addNewEntrants();
                }
                finishedFutures.add(prioritizedSplitRunner.getFinishedFuture());
            }
        }
        for (PrioritizedSplitRunner split : splitsToDestroy) {
            split.destroy();
        }
        return finishedFutures;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void splitFinished(PrioritizedSplitRunner split) {
        TaskExecutor taskExecutor = this;
        synchronized (taskExecutor) {
            this.allSplits.remove(split);
            TaskHandle taskHandle = split.getTaskHandle();
            taskHandle.splitComplete(split);
            this.wallTime.add(Duration.nanosSince((long)split.createdNanos));
            this.scheduleTaskIfNecessary(taskHandle);
            this.addNewEntrants();
        }
        split.destroy();
    }

    private synchronized void scheduleTaskIfNecessary(TaskHandle taskHandle) {
        PrioritizedSplitRunner split;
        if (taskHandle.getRunningSplits() < 3 && (split = taskHandle.pollNextSplit()) != null) {
            this.startSplit(split);
            this.queuedTime.add(Duration.nanosSince((long)split.createdNanos));
        }
    }

    private synchronized void addNewEntrants() {
        PrioritizedSplitRunner split;
        int running = this.allSplits.size();
        for (int i = 0; i < this.minimumNumberOfDrivers - running && (split = this.pollNextSplitWorker()) != null; ++i) {
            this.queuedTime.add(Duration.nanosSince((long)split.createdNanos));
            this.startSplit(split);
        }
    }

    private synchronized void startSplit(PrioritizedSplitRunner split) {
        this.allSplits.add(split);
        this.pendingSplits.put(split);
    }

    private synchronized PrioritizedSplitRunner pollNextSplitWorker() {
        Iterator<TaskHandle> iterator = this.tasks.iterator();
        while (iterator.hasNext()) {
            TaskHandle task = iterator.next();
            PrioritizedSplitRunner split = task.pollNextSplit();
            if (split == null) continue;
            iterator.remove();
            this.tasks.add(task);
            return split;
        }
        return null;
    }

    private static int calculatePriorityLevel(long threadUsageNanos) {
        long millis = TimeUnit.NANOSECONDS.toMillis(threadUsageNanos);
        int priorityLevel = millis < 1000L ? 0 : (millis < 10000L ? 1 : (millis < 60000L ? 2 : (millis < 300000L ? 3 : 4)));
        return priorityLevel;
    }

    @Managed
    public synchronized int getTasks() {
        return this.tasks.size();
    }

    @Managed
    public int getRunnerThreads() {
        return this.runnerThreads;
    }

    @Managed
    public int getMinimumNumberOfDrivers() {
        return this.minimumNumberOfDrivers;
    }

    @Managed
    public synchronized int getTotalSplits() {
        return this.allSplits.size();
    }

    @Managed
    public int getPendingSplits() {
        return this.pendingSplits.size();
    }

    @Managed
    public int getRunningSplits() {
        return this.runningSplits.size();
    }

    @Managed
    public int getBlockedSplits() {
        return this.blockedSplits.size();
    }

    @Managed
    public long getCompletedTasksLevel0() {
        return this.completedTasksPerLevel.get(0);
    }

    @Managed
    public long getCompletedTasksLevel1() {
        return this.completedTasksPerLevel.get(1);
    }

    @Managed
    public long getCompletedTasksLevel2() {
        return this.completedTasksPerLevel.get(2);
    }

    @Managed
    public long getCompletedTasksLevel3() {
        return this.completedTasksPerLevel.get(3);
    }

    @Managed
    public long getCompletedTasksLevel4() {
        return this.completedTasksPerLevel.get(4);
    }

    @Managed
    public long getRunningTasksLevel0() {
        return this.calculateRunningTasksForLevel(0);
    }

    @Managed
    public long getRunningTasksLevel1() {
        return this.calculateRunningTasksForLevel(1);
    }

    @Managed
    public long getRunningTasksLevel2() {
        return this.calculateRunningTasksForLevel(2);
    }

    @Managed
    public long getRunningTasksLevel3() {
        return this.calculateRunningTasksForLevel(3);
    }

    @Managed
    public long getRunningTasksLevel4() {
        return this.calculateRunningTasksForLevel(4);
    }

    @Managed
    @Nested
    public TimeStat getQueuedTime() {
        return this.queuedTime;
    }

    @Managed
    @Nested
    public TimeStat getWallTime() {
        return this.wallTime;
    }

    private synchronized int calculateRunningTasksForLevel(int level) {
        int count = 0;
        for (TaskHandle task : this.tasks) {
            if (TaskExecutor.calculatePriorityLevel(task.getThreadUsageNanos()) != level) continue;
            ++count;
        }
        return count;
    }

    @Managed(description="Task processor executor")
    @Nested
    public ThreadPoolExecutorMBean getProcessorExecutor() {
        return this.executorMBean;
    }

    static /* synthetic */ AtomicLong access$1900() {
        return NEXT_RUNNER_ID;
    }

    private class Runner
    implements Runnable {
        private final long runnerId = TaskExecutor.access$1900().getAndIncrement();

        private Runner() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", new Object[]{this.runnerId});){
                while (!TaskExecutor.this.closed) {
                    PrioritizedSplitRunner split;
                    if (Thread.currentThread().isInterrupted()) return;
                    try {
                        split = (PrioritizedSplitRunner)TaskExecutor.this.pendingSplits.take();
                        if (split.updatePriorityLevel()) {
                            TaskExecutor.this.pendingSplits.put(split);
                            continue;
                        }
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        if (runnerName != null) {
                            if (var2_2 != null) {
                                try {
                                    runnerName.close();
                                }
                                catch (Throwable throwable) {
                                    var2_2.addSuppressed(throwable);
                                }
                            } else {
                                runnerName.close();
                            }
                        }
                        if (TaskExecutor.this.closed) return;
                        TaskExecutor.this.addRunnerThread();
                        return;
                    }
                    try {
                        SetThreadName splitName = new SetThreadName(split.getTaskHandle().getTaskId() + "-" + split.getSplitId(), new Object[0]);
                        Throwable throwable = null;
                        try {
                            boolean finished;
                            ListenableFuture<?> blocked;
                            TaskExecutor.this.runningSplits.add(split);
                            try {
                                blocked = split.process();
                                finished = split.isFinished();
                            }
                            finally {
                                TaskExecutor.this.runningSplits.remove(split);
                            }
                            if (finished) {
                                log.debug("%s is finished", new Object[]{split.getInfo()});
                                TaskExecutor.this.splitFinished(split);
                                continue;
                            }
                            if (blocked.isDone()) {
                                TaskExecutor.this.pendingSplits.put(split);
                                continue;
                            }
                            TaskExecutor.this.blockedSplits.put(split, blocked);
                            blocked.addListener(new Runnable(){

                                @Override
                                public void run() {
                                    TaskExecutor.this.blockedSplits.remove(split);
                                    split.updatePriorityLevel();
                                    TaskExecutor.this.pendingSplits.put(split);
                                }
                            }, (Executor)TaskExecutor.this.executor);
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                        finally {
                            if (splitName == null) continue;
                            if (throwable != null) {
                                try {
                                    splitName.close();
                                }
                                catch (Throwable throwable3) {
                                    throwable.addSuppressed(throwable3);
                                }
                                continue;
                            }
                            splitName.close();
                        }
                    }
                    catch (Throwable t) {
                        if (!split.isDestroyed()) {
                            if (t instanceof PrestoException) {
                                PrestoException e = (PrestoException)t;
                                log.error("Error processing %s: %s: %s", new Object[]{split.getInfo(), e.getErrorCode().getName(), e.getMessage()});
                            } else {
                                log.error(t, "Error processing %s", new Object[]{split.getInfo()});
                            }
                        }
                        TaskExecutor.this.splitFinished(split);
                    }
                }
                return;
            }
            finally {
                if (!TaskExecutor.this.closed) {
                    TaskExecutor.this.addRunnerThread();
                }
            }
        }
    }

    private static class PrioritizedSplitRunner
    implements Comparable<PrioritizedSplitRunner> {
        private final long createdNanos = System.nanoTime();
        private final TaskHandle taskHandle;
        private final int splitId;
        private final long workerId;
        private final SplitRunner split;
        private final Ticker ticker;
        private final SettableFuture<?> finishedFuture = SettableFuture.create();
        private final AtomicBoolean destroyed = new AtomicBoolean();
        private final AtomicInteger priorityLevel = new AtomicInteger();
        private final AtomicLong threadUsageNanos = new AtomicLong();
        private final AtomicLong splitThreadUsageNanos = new AtomicLong();
        private final AtomicLong lastRun = new AtomicLong();
        private final AtomicLong start = new AtomicLong();
        private final AtomicLong cpuTime = new AtomicLong();
        private final AtomicLong processCalls = new AtomicLong();

        private PrioritizedSplitRunner(TaskHandle taskHandle, SplitRunner split, Ticker ticker) {
            this.taskHandle = taskHandle;
            this.splitId = taskHandle.getNextSplitId();
            this.split = split;
            this.ticker = ticker;
            this.workerId = NEXT_WORKER_ID.getAndIncrement();
        }

        private TaskHandle getTaskHandle() {
            return this.taskHandle;
        }

        private ListenableFuture<?> getFinishedFuture() {
            return this.finishedFuture;
        }

        public boolean isDestroyed() {
            return this.destroyed.get();
        }

        public void destroy() {
            this.destroyed.set(true);
            try {
                this.split.close();
            }
            catch (RuntimeException e) {
                log.error((Throwable)e, "Error closing split for task %s", new Object[]{this.taskHandle.getTaskId()});
            }
        }

        public boolean isFinished() {
            boolean finished = this.split.isFinished();
            if (finished) {
                this.finishedFuture.set(null);
            }
            return finished || this.destroyed.get() || this.taskHandle.isDestroyed();
        }

        public long getSplitThreadUsageNanos() {
            return this.splitThreadUsageNanos.get();
        }

        public ListenableFuture<?> process() throws Exception {
            try {
                this.start.compareAndSet(0L, System.currentTimeMillis());
                this.processCalls.incrementAndGet();
                CpuTimer timer = new CpuTimer();
                ListenableFuture<?> blocked = this.split.processFor(SPLIT_RUN_QUANTA);
                CpuTimer.CpuDuration elapsed = timer.elapsedTime();
                long durationNanos = elapsed.getWall().roundTo(TimeUnit.NANOSECONDS);
                this.splitThreadUsageNanos.addAndGet(durationNanos);
                long threadUsageNanos = this.taskHandle.addThreadUsageNanos(durationNanos);
                this.threadUsageNanos.set(threadUsageNanos);
                this.priorityLevel.set(TaskExecutor.calculatePriorityLevel(threadUsageNanos));
                this.lastRun.set(this.ticker.read());
                this.cpuTime.addAndGet(elapsed.getCpu().roundTo(TimeUnit.NANOSECONDS));
                return blocked;
            }
            catch (Throwable e) {
                this.finishedFuture.setException(e);
                throw e;
            }
        }

        public boolean updatePriorityLevel() {
            int newPriority = TaskExecutor.calculatePriorityLevel(this.taskHandle.getThreadUsageNanos());
            if (newPriority == this.priorityLevel.getAndSet(newPriority)) {
                return false;
            }
            this.threadUsageNanos.set(this.taskHandle.getThreadUsageNanos());
            return true;
        }

        @Override
        public int compareTo(PrioritizedSplitRunner o) {
            int level = this.priorityLevel.get();
            int result = Integer.compare(level, o.priorityLevel.get());
            if (result != 0) {
                return result;
            }
            result = level < 4 ? Long.compare(this.threadUsageNanos.get(), o.threadUsageNanos.get()) : Long.compare(this.lastRun.get(), o.lastRun.get());
            if (result != 0) {
                return result;
            }
            return Long.compare(this.workerId, o.workerId);
        }

        public int getSplitId() {
            return this.splitId;
        }

        public String getInfo() {
            return String.format("Split %-15s-%d %s (start = %s, wall = %s ms, cpu = %s ms, calls = %s)", this.taskHandle.getTaskId(), this.splitId, this.split.getInfo(), this.start.get(), System.currentTimeMillis() - this.start.get(), (int)((double)this.cpuTime.get() / 1000000.0), this.processCalls.get());
        }

        public String toString() {
            return String.format("Split %-15s-%d", this.taskHandle.getTaskId(), this.splitId);
        }
    }

    @ThreadSafe
    public static class TaskHandle {
        private final TaskId taskId;
        private final DoubleSupplier utilizationSupplier;
        @GuardedBy(value="this")
        private final Queue<PrioritizedSplitRunner> queuedSplits = new ArrayDeque<PrioritizedSplitRunner>(10);
        @GuardedBy(value="this")
        private final List<PrioritizedSplitRunner> runningSplits = new ArrayList<PrioritizedSplitRunner>(10);
        @GuardedBy(value="this")
        private final List<PrioritizedSplitRunner> forcedRunningSplits = new ArrayList<PrioritizedSplitRunner>(10);
        @GuardedBy(value="this")
        private long taskThreadUsageNanos;
        @GuardedBy(value="this")
        private boolean destroyed;
        @GuardedBy(value="this")
        private final SplitConcurrencyController concurrencyController;
        private final AtomicInteger nextSplitId = new AtomicInteger();

        private TaskHandle(TaskId taskId, DoubleSupplier utilizationSupplier, int initialSplitConcurrency, Duration splitConcurrencyAdjustFrequency) {
            this.taskId = taskId;
            this.utilizationSupplier = utilizationSupplier;
            this.concurrencyController = new SplitConcurrencyController(initialSplitConcurrency, splitConcurrencyAdjustFrequency);
        }

        private synchronized long addThreadUsageNanos(long durationNanos) {
            this.concurrencyController.update(durationNanos, this.utilizationSupplier.getAsDouble(), this.runningSplits.size());
            this.taskThreadUsageNanos += durationNanos;
            return this.taskThreadUsageNanos;
        }

        private TaskId getTaskId() {
            return this.taskId;
        }

        public synchronized boolean isDestroyed() {
            return this.destroyed;
        }

        private synchronized List<PrioritizedSplitRunner> destroy() {
            this.destroyed = true;
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.addAll(this.forcedRunningSplits);
            builder.addAll(this.runningSplits);
            builder.addAll(this.queuedSplits);
            this.forcedRunningSplits.clear();
            this.runningSplits.clear();
            this.queuedSplits.clear();
            return builder.build();
        }

        private synchronized void enqueueSplit(PrioritizedSplitRunner split) {
            Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)"Can not add split to destroyed task handle");
            this.queuedSplits.add(split);
        }

        private synchronized void recordForcedRunningSplit(PrioritizedSplitRunner split) {
            Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)"Can not add split to destroyed task handle");
            this.forcedRunningSplits.add(split);
        }

        @VisibleForTesting
        synchronized int getRunningSplits() {
            return this.runningSplits.size();
        }

        private synchronized long getThreadUsageNanos() {
            return this.taskThreadUsageNanos;
        }

        private synchronized PrioritizedSplitRunner pollNextSplit() {
            if (this.destroyed) {
                return null;
            }
            if (this.runningSplits.size() >= this.concurrencyController.getTargetConcurrency()) {
                return null;
            }
            PrioritizedSplitRunner split = this.queuedSplits.poll();
            if (split != null) {
                this.runningSplits.add(split);
            }
            return split;
        }

        private synchronized void splitComplete(PrioritizedSplitRunner split) {
            this.concurrencyController.splitFinished(split.getSplitThreadUsageNanos(), this.utilizationSupplier.getAsDouble(), this.runningSplits.size());
            this.forcedRunningSplits.remove(split);
            this.runningSplits.remove(split);
        }

        private int getNextSplitId() {
            return this.nextSplitId.getAndIncrement();
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("taskId", (Object)this.taskId).toString();
        }
    }
}

