/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.logging.ILogger;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class ExecutionService {
    private static final IdleStrategy IDLER = new BackoffIdleStrategy(0L, 0L, TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MILLISECONDS.toNanos(1L));
    private final ExecutorService blockingTaskletExecutor = Executors.newCachedThreadPool(new BlockingTaskThreadFactory());
    private final CooperativeWorker[] workers;
    private final Thread[] threads;
    private final String hzInstanceName;
    private final ILogger logger;

    public ExecutionService(HazelcastInstance hz, int threadCount) {
        this.hzInstanceName = hz.getName();
        this.workers = new CooperativeWorker[threadCount];
        this.threads = new Thread[threadCount];
        this.logger = hz.getLoggingService().getLogger(ExecutionService.class);
    }

    public CompletionStage<Void> execute(List<? extends Tasklet> tasklets, @Nonnull Consumer<CompletionStage<Void>> doneCallback) {
        this.ensureStillRunning();
        JobFuture jobFuture = new JobFuture(tasklets.size(), doneCallback);
        try {
            Map<Boolean, List<Tasklet>> byCooperation = tasklets.stream().collect(Collectors.partitioningBy(Tasklet::isCooperative));
            this.submitCooperativeTasklets(jobFuture, byCooperation.get(true));
            this.submitBlockingTasklets(jobFuture, byCooperation.get(false));
        }
        catch (Throwable t) {
            jobFuture.completeExceptionally(t);
            doneCallback.accept(jobFuture);
        }
        return jobFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        this.blockingTaskletExecutor.shutdown();
        ExecutionService executionService = this;
        synchronized (executionService) {
            for (CooperativeWorker worker : this.workers) {
                if (worker == null) continue;
                worker.isShutdown = true;
            }
        }
    }

    private void ensureStillRunning() {
        if (this.blockingTaskletExecutor.isShutdown()) {
            throw new IllegalStateException("Execution service was ordered to shut down");
        }
    }

    private void submitBlockingTasklets(JobFuture jobFuture, List<Tasklet> tasklets) {
        jobFuture.blockingFutures = tasklets.stream().map(t -> new BlockingWorker(new TaskletTracker((Tasklet)t, jobFuture))).map(this.blockingTaskletExecutor::submit).collect(Collectors.toList());
    }

    private void submitCooperativeTasklets(JobFuture jobFuture, List<Tasklet> tasklets) {
        this.ensureThreadsStarted();
        List[] trackersByThread = new List[this.workers.length];
        Arrays.setAll(trackersByThread, i -> new ArrayList());
        int i2 = 0;
        for (Tasklet t : tasklets) {
            t.init();
            trackersByThread[i2++ % trackersByThread.length].add(new TaskletTracker(t, jobFuture));
        }
        for (i2 = 0; i2 < trackersByThread.length; ++i2) {
            this.workers[i2].trackers.addAll(trackersByThread[i2]);
        }
        Arrays.stream(this.threads).forEach(LockSupport::unpark);
    }

    private synchronized void ensureThreadsStarted() {
        if (this.workers[0] != null) {
            return;
        }
        Arrays.setAll(this.workers, i -> new CooperativeWorker(this.workers));
        Arrays.setAll(this.threads, i -> this.createThread(this.workers[i], "cooperative", i));
        Arrays.stream(this.threads).forEach(Thread::start);
    }

    private Thread createThread(Runnable r, String executorName, int seq) {
        return new Thread(r, this.threadNamePrefix() + executorName + ".thread-" + seq);
    }

    private String threadNamePrefix() {
        return "hz." + this.hzInstanceName + ".jet.";
    }

    private static final class JobFuture
    extends CompletableFuture<Void> {
        private final AtomicInteger completionLatch;
        private final Consumer<CompletionStage<Void>> doneCallback;
        private List<Future> blockingFutures;

        JobFuture(int taskletCount, Consumer<CompletionStage<Void>> doneCallback) {
            this.doneCallback = doneCallback;
            this.completionLatch = new AtomicInteger(taskletCount);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            if (cancelled) {
                this.blockingFutures.forEach(f -> f.cancel(true));
            }
            return cancelled;
        }

        @SuppressFBWarnings(value={"NP_NONNULL_PARAM_VIOLATION"}, justification="CompletableFuture<Void>")
        private void taskletDone() {
            if (this.completionLatch.decrementAndGet() == 0) {
                this.complete(null);
                if (this.doneCallback != null) {
                    this.doneCallback.accept(this);
                }
            }
        }
    }

    private final class BlockingTaskThreadFactory
    implements ThreadFactory {
        private final AtomicInteger seq = new AtomicInteger();

        private BlockingTaskThreadFactory() {
        }

        @Override
        public Thread newThread(Runnable r) {
            return ExecutionService.this.createThread(r, "blocking", this.seq.getAndIncrement());
        }
    }

    private static final class TaskletTracker {
        final Tasklet tasklet;
        final JobFuture jobFuture;
        final AtomicReference<CooperativeWorker> stealingWorker = new AtomicReference();

        TaskletTracker(Tasklet tasklet, JobFuture jobFuture) {
            this.tasklet = tasklet;
            this.jobFuture = jobFuture;
        }
    }

    private class CooperativeWorker
    implements Runnable {
        private final List<TaskletTracker> trackers;
        private final CooperativeWorker[] colleagues;
        private volatile boolean isShutdown;

        CooperativeWorker(CooperativeWorker[] colleagues) {
            this.colleagues = colleagues;
            this.trackers = new CopyOnWriteArrayList<TaskletTracker>();
        }

        @Override
        public void run() {
            long idleCount = 0L;
            while (!this.isShutdown) {
                boolean madeProgress = false;
                for (TaskletTracker t : this.trackers) {
                    CooperativeWorker stealingWorker = t.stealingWorker.get();
                    if (stealingWorker != null) {
                        t.stealingWorker.set(null);
                        this.trackers.remove(t);
                        stealingWorker.trackers.add(t);
                        continue;
                    }
                    try {
                        ProgressState result = t.tasklet.call();
                        if (result.isDone()) {
                            this.dismissTasklet(t);
                        } else {
                            madeProgress |= result.isMadeProgress();
                        }
                    }
                    catch (Throwable e) {
                        ExecutionService.this.logger.warning("Exception in " + t.tasklet, e);
                        t.jobFuture.completeExceptionally((Throwable)((Object)new JetException("Exception in " + t.tasklet + ": " + e, e)));
                    }
                    if (!t.jobFuture.isCompletedExceptionally()) continue;
                    this.dismissTasklet(t);
                }
                if (madeProgress) {
                    idleCount = 0L;
                    continue;
                }
                IDLER.idle(++idleCount);
            }
            this.trackers.clear();
        }

        private void dismissTasklet(TaskletTracker t) {
            t.jobFuture.taskletDone();
            this.trackers.remove(t);
            this.stealWork();
        }

        private void stealWork() {
            block0: while (true) {
                TaskletTracker t;
                List<TaskletTracker> toStealFrom = this.trackers;
                for (CooperativeWorker w : this.colleagues) {
                    if (w.trackers.size() <= toStealFrom.size()) continue;
                    toStealFrom = w.trackers;
                }
                if (toStealFrom.size() <= this.trackers.size() + 1) {
                    return;
                }
                Iterator<TaskletTracker> iterator = toStealFrom.iterator();
                do {
                    if (!iterator.hasNext()) continue block0;
                    t = (TaskletTracker)iterator.next();
                } while (!t.stealingWorker.compareAndSet(null, this));
                break;
            }
        }
    }

    private final class BlockingWorker
    implements Runnable {
        private final TaskletTracker tracker;

        private BlockingWorker(TaskletTracker tracker) {
            this.tracker = tracker;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Tasklet t = this.tracker.tasklet;
            try {
                ProgressState result;
                t.init();
                long idleCount = 0L;
                while (!(result = t.call()).isDone() && !this.tracker.jobFuture.isCompletedExceptionally()) {
                    if (result.isMadeProgress()) {
                        idleCount = 0L;
                        continue;
                    }
                    IDLER.idle(++idleCount);
                }
            }
            catch (Throwable e) {
                ExecutionService.this.logger.warning("Exception in " + t, e);
                this.tracker.jobFuture.completeExceptionally((Throwable)((Object)new JetException("Exception in " + t + ": " + e, e)));
            }
            finally {
                this.tracker.jobFuture.taskletDone();
            }
        }
    }
}

