/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.logging.ILogger;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class TaskletExecutionService {
    private static final IdleStrategy IDLER = new BackoffIdleStrategy(0L, 0L, TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MILLISECONDS.toNanos(1L));
    private final ExecutorService blockingTaskletExecutor = Executors.newCachedThreadPool(new BlockingTaskThreadFactory());
    private final CooperativeWorker[] cooperativeWorkers;
    private final Thread[] cooperativeThreadPool;
    private final String hzInstanceName;
    private final ILogger logger;
    private final AtomicInteger cooperativeThreadIndex = new AtomicInteger();
    private volatile boolean isShutdown;

    public TaskletExecutionService(HazelcastInstance hz, int threadCount) {
        this.hzInstanceName = hz.getName();
        this.cooperativeWorkers = new CooperativeWorker[threadCount];
        this.cooperativeThreadPool = new Thread[threadCount];
        this.logger = hz.getLoggingService().getLogger(TaskletExecutionService.class);
    }

    public CompletionStage<Void> execute(@Nonnull List<? extends Tasklet> tasklets, @Nonnull Consumer<CompletionStage<Void>> doneCallback, @Nonnull ClassLoader jobClassLoader) {
        this.ensureStillRunning();
        JobFuture jobFuture = new JobFuture(tasklets.size(), doneCallback);
        try {
            Map<Boolean, List<Tasklet>> byCooperation = tasklets.stream().collect(Collectors.partitioningBy(Tasklet::isCooperative));
            this.submitCooperativeTasklets(jobFuture, jobClassLoader, byCooperation.get(true));
            this.submitBlockingTasklets(jobFuture, jobClassLoader, byCooperation.get(false));
        }
        catch (Throwable t) {
            jobFuture.completeExceptionally(t);
            doneCallback.accept(jobFuture);
        }
        return jobFuture;
    }

    public void shutdown() {
        this.isShutdown = true;
        this.blockingTaskletExecutor.shutdownNow();
    }

    private void ensureStillRunning() {
        if (this.isShutdown) {
            throw new IllegalStateException("Execution service was already ordered to shut down");
        }
    }

    private void submitBlockingTasklets(JobFuture jobFuture, ClassLoader jobClassLoader, List<Tasklet> tasklets) {
        jobFuture.blockingFutures = tasklets.stream().map(t -> new BlockingWorker(new TaskletTracker((Tasklet)t, jobFuture, jobClassLoader))).map(this.blockingTaskletExecutor::submit).collect(Collectors.toList());
    }

    private void submitCooperativeTasklets(JobFuture jobFuture, ClassLoader jobClassLoader, List<Tasklet> tasklets) {
        this.ensureThreadsStarted();
        List[] trackersByThread = new List[this.cooperativeWorkers.length];
        Arrays.setAll(trackersByThread, i -> new ArrayList());
        for (Tasklet t : tasklets) {
            t.init(jobFuture);
            trackersByThread[this.cooperativeThreadIndex.getAndUpdate(i -> (i + 1) % trackersByThread.length)].add(new TaskletTracker(t, jobFuture, jobClassLoader));
        }
        for (int i2 = 0; i2 < trackersByThread.length; ++i2) {
            this.cooperativeWorkers[i2].trackers.addAll(trackersByThread[i2]);
        }
        Arrays.stream(this.cooperativeThreadPool).forEach(LockSupport::unpark);
    }

    private synchronized void ensureThreadsStarted() {
        if (this.cooperativeWorkers[0] != null) {
            return;
        }
        Arrays.setAll(this.cooperativeWorkers, i -> new CooperativeWorker(this.cooperativeWorkers));
        Arrays.setAll(this.cooperativeThreadPool, i -> new Thread((Runnable)this.cooperativeWorkers[i], String.format("hz.%s.jet.cooperative.thread-%d", this.hzInstanceName, i)));
        Arrays.stream(this.cooperativeThreadPool).forEach(Thread::start);
    }

    private String trackersToString() {
        return Arrays.stream(this.cooperativeWorkers).flatMap(w -> ((CooperativeWorker)w).trackers.stream()).map(Object::toString).sorted().collect(Collectors.joining("\n")) + "\n-----------------";
    }

    private static final class JobFuture
    extends CompletableFuture<Void> {
        private final AtomicInteger completionLatch;
        private final Consumer<CompletionStage<Void>> doneCallback;
        private List<Future> blockingFutures;

        JobFuture(int taskletCount, Consumer<CompletionStage<Void>> doneCallback) {
            this.doneCallback = doneCallback;
            this.completionLatch = new AtomicInteger(taskletCount);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            if (cancelled) {
                this.blockingFutures.forEach(f -> f.cancel(true));
            }
            return cancelled;
        }

        @SuppressFBWarnings(value={"NP_NONNULL_PARAM_VIOLATION"}, justification="CompletableFuture<Void>")
        private void taskletDone() {
            if (this.completionLatch.decrementAndGet() == 0) {
                this.complete(null);
                if (this.doneCallback != null) {
                    this.doneCallback.accept(this);
                }
            }
        }
    }

    private final class BlockingTaskThreadFactory
    implements ThreadFactory {
        private final AtomicInteger seq = new AtomicInteger();

        private BlockingTaskThreadFactory() {
        }

        @Override
        public Thread newThread(@Nonnull Runnable r) {
            return new Thread(r, String.format("hz.%s.jet.blocking.thread-%d", TaskletExecutionService.this.hzInstanceName, this.seq.getAndIncrement()));
        }
    }

    private static final class TaskletTracker {
        final Tasklet tasklet;
        final JobFuture jobFuture;
        final ClassLoader jobClassLoader;
        final AtomicReference<CooperativeWorker> stealingWorker = new AtomicReference();

        TaskletTracker(Tasklet tasklet, JobFuture jobFuture, ClassLoader jobClassLoader) {
            this.tasklet = tasklet;
            this.jobFuture = jobFuture;
            this.jobClassLoader = jobClassLoader;
        }

        public String toString() {
            return "Tracking " + this.tasklet;
        }
    }

    private final class CooperativeWorker
    implements Runnable {
        private final List<TaskletTracker> trackers;
        private final CooperativeWorker[] colleagues;

        CooperativeWorker(CooperativeWorker[] colleagues) {
            this.colleagues = colleagues;
            this.trackers = new CopyOnWriteArrayList<TaskletTracker>();
        }

        @Override
        public void run() {
            Thread thread = Thread.currentThread();
            ClassLoader clBackup = thread.getContextClassLoader();
            long idleCount = 0L;
            while (!TaskletExecutionService.this.isShutdown) {
                boolean madeProgress = false;
                for (TaskletTracker t : this.trackers) {
                    CooperativeWorker stealingWorker = t.stealingWorker.get();
                    if (stealingWorker != null) {
                        t.stealingWorker.set(null);
                        this.trackers.remove(t);
                        stealingWorker.trackers.add(t);
                        continue;
                    }
                    try {
                        thread.setContextClassLoader(t.jobClassLoader);
                        ProgressState result = t.tasklet.call();
                        if (result.isDone()) {
                            this.dismissTasklet(t);
                        } else {
                            madeProgress |= result.isMadeProgress();
                        }
                    }
                    catch (Throwable e) {
                        TaskletExecutionService.this.logger.warning("Exception in " + t.tasklet, e);
                        t.jobFuture.completeExceptionally(new JetException("Exception in " + t.tasklet + ": " + e, e));
                    }
                    if (!t.jobFuture.isCompletedExceptionally()) continue;
                    this.dismissTasklet(t);
                }
                if (madeProgress) {
                    idleCount = 0L;
                    continue;
                }
                thread.setContextClassLoader(clBackup);
                IDLER.idle(++idleCount);
            }
            this.trackers.clear();
        }

        private void dismissTasklet(TaskletTracker t) {
            t.jobFuture.taskletDone();
            this.trackers.remove(t);
            this.stealWork();
        }

        private void stealWork() {
            block0: while (true) {
                TaskletTracker t;
                List<TaskletTracker> toStealFrom = this.trackers;
                for (CooperativeWorker w : this.colleagues) {
                    if (w.trackers.size() <= toStealFrom.size()) continue;
                    toStealFrom = w.trackers;
                }
                if (toStealFrom.size() < this.trackers.size() + 2) {
                    return;
                }
                Iterator<TaskletTracker> iterator = toStealFrom.iterator();
                do {
                    if (!iterator.hasNext()) continue block0;
                    t = (TaskletTracker)iterator.next();
                } while (!t.stealingWorker.compareAndSet(null, this));
                break;
            }
        }
    }

    private final class BlockingWorker
    implements Runnable {
        private final TaskletTracker tracker;

        private BlockingWorker(TaskletTracker tracker) {
            this.tracker = tracker;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ClassLoader clBackup = Thread.currentThread().getContextClassLoader();
            Tasklet t = this.tracker.tasklet;
            Thread.currentThread().setContextClassLoader(this.tracker.jobClassLoader);
            try {
                ProgressState result;
                t.init(this.tracker.jobFuture);
                long idleCount = 0L;
                while (!((result = t.call()).isDone() || this.tracker.jobFuture.isDone() || TaskletExecutionService.this.isShutdown)) {
                    if (result.isMadeProgress()) {
                        idleCount = 0L;
                        continue;
                    }
                    IDLER.idle(++idleCount);
                }
            }
            catch (Throwable e) {
                TaskletExecutionService.this.logger.warning("Exception in " + t, e);
                this.tracker.jobFuture.completeExceptionally(new JetException("Exception in " + t + ": " + e, e));
            }
            finally {
                Thread.currentThread().setContextClassLoader(clBackup);
                this.tracker.jobFuture.taskletDone();
            }
        }
    }
}

