/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class TaskletExecutionService {
    private static final IdleStrategy IDLER = new BackoffIdleStrategy(0L, 0L, TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MILLISECONDS.toNanos(1L));
    private final ExecutorService blockingTaskletExecutor = Executors.newCachedThreadPool(new BlockingTaskThreadFactory());
    private final CooperativeWorker[] cooperativeWorkers;
    private final Thread[] cooperativeThreadPool;
    private final String hzInstanceName;
    private final ILogger logger;
    private final AtomicInteger cooperativeThreadIndex = new AtomicInteger();
    private volatile boolean isShutdown;

    public TaskletExecutionService(HazelcastInstance hz, int threadCount) {
        this.hzInstanceName = hz.getName();
        this.cooperativeWorkers = new CooperativeWorker[threadCount];
        this.cooperativeThreadPool = new Thread[threadCount];
        this.logger = hz.getLoggingService().getLogger(TaskletExecutionService.class);
    }

    CompletableFuture<Void> beginExecute(@Nonnull List<? extends Tasklet> tasklets, @Nonnull CompletableFuture<Void> cancellationFuture, @Nonnull ClassLoader jobClassLoader) {
        this.ensureStillRunning();
        ExecutionTracker executionTracker = new ExecutionTracker(tasklets.size(), cancellationFuture);
        try {
            Map<Boolean, List<Tasklet>> byCooperation = tasklets.stream().collect(Collectors.partitioningBy(Tasklet::isCooperative));
            this.submitCooperativeTasklets(executionTracker, jobClassLoader, byCooperation.get(true));
            this.submitBlockingTasklets(executionTracker, jobClassLoader, byCooperation.get(false));
        }
        catch (Throwable t) {
            executionTracker.future.internalCompleteExceptionally(t);
        }
        return executionTracker.future;
    }

    public void shutdown() {
        this.isShutdown = true;
        this.blockingTaskletExecutor.shutdownNow();
    }

    private void ensureStillRunning() {
        if (this.isShutdown) {
            throw new IllegalStateException("Execution service was already ordered to shut down");
        }
    }

    private void submitBlockingTasklets(ExecutionTracker executionTracker, ClassLoader jobClassLoader, List<Tasklet> tasklets) {
        CountDownLatch startedLatch = new CountDownLatch(tasklets.size());
        executionTracker.blockingFutures = tasklets.stream().map(t -> new BlockingWorker(new TaskletTracker((Tasklet)t, executionTracker, jobClassLoader), startedLatch)).map(this.blockingTaskletExecutor::submit).collect(Collectors.toList());
        Util.uncheckRun(startedLatch::await);
    }

    private void submitCooperativeTasklets(ExecutionTracker executionTracker, ClassLoader jobClassLoader, List<Tasklet> tasklets) {
        this.ensureThreadsStarted();
        List[] trackersByThread = new List[this.cooperativeWorkers.length];
        Arrays.setAll(trackersByThread, i -> new ArrayList());
        for (Tasklet t : tasklets) {
            t.init();
            trackersByThread[this.cooperativeThreadIndex.getAndUpdate(i -> (i + 1) % trackersByThread.length)].add(new TaskletTracker(t, executionTracker, jobClassLoader));
        }
        for (int i2 = 0; i2 < trackersByThread.length; ++i2) {
            this.cooperativeWorkers[i2].trackers.addAll(trackersByThread[i2]);
        }
        Arrays.stream(this.cooperativeThreadPool).forEach(LockSupport::unpark);
    }

    private synchronized void ensureThreadsStarted() {
        if (this.cooperativeWorkers[0] != null) {
            return;
        }
        Arrays.setAll(this.cooperativeWorkers, i -> new CooperativeWorker(this.cooperativeWorkers));
        Arrays.setAll(this.cooperativeThreadPool, i -> new Thread((Runnable)this.cooperativeWorkers[i], String.format("hz.%s.jet.cooperative.thread-%d", this.hzInstanceName, i)));
        Arrays.stream(this.cooperativeThreadPool).forEach(Thread::start);
    }

    private String trackersToString() {
        return Arrays.stream(this.cooperativeWorkers).flatMap(w -> ((CooperativeWorker)w).trackers.stream()).map(Object::toString).sorted().collect(Collectors.joining("\n")) + "\n-----------------";
    }

    private final class ExecutionTracker {
        final NonCompletableFuture future = new NonCompletableFuture();
        List<Future> blockingFutures;
        private final AtomicInteger completionLatch;
        private final AtomicReference<Throwable> executionException = new AtomicReference();

        ExecutionTracker(int taskletCount, CompletableFuture<Void> cancellationFuture) {
            this.completionLatch = new AtomicInteger(taskletCount);
            cancellationFuture.whenComplete((BiConsumer)ExceptionUtil.withTryCatch(TaskletExecutionService.this.logger, (r, e) -> {
                if (!(e instanceof CancellationException)) {
                    this.exception(new IllegalStateException("cancellationFuture was completed with something other than CancellationException: " + e, (Throwable)e));
                    return;
                }
                this.exception((Throwable)e);
                this.blockingFutures.forEach(f -> f.cancel(true));
            }));
        }

        void exception(Throwable t) {
            this.executionException.compareAndSet(null, t);
        }

        void taskletDone() {
            if (this.completionLatch.decrementAndGet() == 0) {
                Throwable ex = this.executionException.get();
                if (ex == null) {
                    this.future.internalComplete();
                } else {
                    this.future.internalCompleteExceptionally(ex);
                }
            }
        }

        boolean executionCompletedExceptionally() {
            return this.executionException.get() != null;
        }
    }

    private final class BlockingTaskThreadFactory
    implements ThreadFactory {
        private final AtomicInteger seq = new AtomicInteger();

        private BlockingTaskThreadFactory() {
        }

        @Override
        public Thread newThread(@Nonnull Runnable r) {
            return new Thread(r, String.format("hz.%s.jet.blocking.thread-%d", TaskletExecutionService.this.hzInstanceName, this.seq.getAndIncrement()));
        }
    }

    private static final class TaskletTracker {
        final Tasklet tasklet;
        final ExecutionTracker executionTracker;
        final ClassLoader jobClassLoader;
        final AtomicReference<CooperativeWorker> stealingWorker = new AtomicReference();

        TaskletTracker(Tasklet tasklet, ExecutionTracker executionTracker, ClassLoader jobClassLoader) {
            this.tasklet = tasklet;
            this.executionTracker = executionTracker;
            this.jobClassLoader = jobClassLoader;
        }

        public String toString() {
            return "Tracking " + this.tasklet;
        }
    }

    private final class CooperativeWorker
    implements Runnable {
        private static final int COOPERATIVE_LOGGING_THRESHOLD = 5;
        private final List<TaskletTracker> trackers;
        private final CooperativeWorker[] colleagues;

        CooperativeWorker(CooperativeWorker[] colleagues) {
            this.colleagues = colleagues;
            this.trackers = new CopyOnWriteArrayList<TaskletTracker>();
        }

        @Override
        public void run() {
            Thread thread = Thread.currentThread();
            ClassLoader clBackup = thread.getContextClassLoader();
            long idleCount = 0L;
            while (!TaskletExecutionService.this.isShutdown) {
                boolean madeProgress = false;
                for (TaskletTracker t2 : this.trackers) {
                    long elapsedMs;
                    CooperativeWorker stealingWorker;
                    long start = 0L;
                    if (TaskletExecutionService.this.logger.isFinestEnabled()) {
                        start = System.nanoTime();
                    }
                    if ((stealingWorker = t2.stealingWorker.get()) != null) {
                        t2.stealingWorker.set(null);
                        this.trackers.remove(t2);
                        stealingWorker.trackers.add(t2);
                        LoggingUtil.logFine(TaskletExecutionService.this.logger, "Tasklet %s was stolen from this worker", t2.tasklet);
                        continue;
                    }
                    try {
                        thread.setContextClassLoader(t2.jobClassLoader);
                        ProgressState result = t2.tasklet.call();
                        if (result.isDone()) {
                            this.dismissTasklet(t2);
                        } else {
                            madeProgress |= result.isMadeProgress();
                        }
                    }
                    catch (Throwable e) {
                        TaskletExecutionService.this.logger.warning("Exception in " + t2.tasklet, e);
                        t2.executionTracker.exception((Throwable)((Object)new JetException("Exception in " + t2.tasklet + ": " + e, e)));
                    }
                    if (t2.executionTracker.executionCompletedExceptionally()) {
                        this.dismissTasklet(t2);
                    }
                    if (!TaskletExecutionService.this.logger.isFinestEnabled() || (elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) <= 5L) continue;
                    TaskletExecutionService.this.logger.finest("Cooperative tasklet call of '" + t2.tasklet + "' took more than " + 5 + " ms: " + elapsedMs + "ms");
                }
                if (madeProgress) {
                    idleCount = 0L;
                    continue;
                }
                thread.setContextClassLoader(clBackup);
                IDLER.idle(++idleCount);
            }
            this.trackers.forEach(t -> t.executionTracker.taskletDone());
            this.trackers.clear();
        }

        private void dismissTasklet(TaskletTracker t) {
            t.executionTracker.taskletDone();
            this.trackers.remove(t);
            this.stealWork();
        }

        private void stealWork() {
            block0: while (true) {
                TaskletTracker t;
                List<TaskletTracker> toStealFrom = this.trackers;
                for (CooperativeWorker w : this.colleagues) {
                    if (w.trackers.size() <= toStealFrom.size()) continue;
                    toStealFrom = w.trackers;
                }
                if (toStealFrom.size() < this.trackers.size() + 2) {
                    return;
                }
                Iterator<TaskletTracker> iterator = toStealFrom.iterator();
                do {
                    if (!iterator.hasNext()) continue block0;
                    t = (TaskletTracker)iterator.next();
                } while (!t.stealingWorker.compareAndSet(null, this));
                break;
            }
        }
    }

    private final class BlockingWorker
    implements Runnable {
        private final TaskletTracker tracker;
        private final CountDownLatch startedLatch;

        private BlockingWorker(TaskletTracker tracker, CountDownLatch startedLatch) {
            this.tracker = tracker;
            this.startedLatch = startedLatch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ClassLoader clBackup = Thread.currentThread().getContextClassLoader();
            Tasklet t = this.tracker.tasklet;
            Thread.currentThread().setContextClassLoader(this.tracker.jobClassLoader);
            try {
                ProgressState result;
                this.startedLatch.countDown();
                t.init();
                long idleCount = 0L;
                do {
                    if ((result = t.call()).isMadeProgress()) {
                        idleCount = 0L;
                        continue;
                    }
                    IDLER.idle(++idleCount);
                } while (!result.isDone() && !this.tracker.executionTracker.executionCompletedExceptionally() && !TaskletExecutionService.this.isShutdown);
            }
            catch (Throwable e) {
                TaskletExecutionService.this.logger.warning("Exception in " + t, e);
                this.tracker.executionTracker.exception((Throwable)((Object)new JetException("Exception in " + t + ": " + e, e)));
            }
            finally {
                Thread.currentThread().setContextClassLoader(clBackup);
                this.tracker.executionTracker.taskletDone();
            }
        }
    }
}

