/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.internal.util.concurrent.IdleStrategy;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.internal.util.executor.ExecutorType;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.JetProperties;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.metrics.MetricsImpl;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class TaskletExecutionService {
    public static final String TASKLET_INIT_CLOSE_EXECUTOR_NAME = "jet:tasklet_initClose";
    private final ExecutorService blockingTaskletExecutor = Executors.newCachedThreadPool(new BlockingTaskThreadFactory());
    private final ExecutionService hzExecutionService;
    private final CooperativeWorker[] cooperativeWorkers;
    private final Thread[] cooperativeThreadPool;
    private final String hzInstanceName;
    private final ILogger logger;
    private int cooperativeThreadIndex;
    @Probe(name="blockingWorkerCount")
    private final Counter blockingWorkerCount = MwCounter.newMwCounter();
    private volatile boolean isShutdown;
    private final Object lock = new Object();
    private volatile IdleStrategy idlerCooperative;
    private volatile IdleStrategy idlerNonCooperative;

    public TaskletExecutionService(NodeEngineImpl nodeEngine, int threadCount, HazelcastProperties properties) {
        this.hzExecutionService = nodeEngine.getExecutionService();
        this.hzExecutionService.register(TASKLET_INIT_CLOSE_EXECUTOR_NAME, Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE, ExecutorType.CACHED);
        this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
        this.cooperativeWorkers = new CooperativeWorker[threadCount];
        this.cooperativeThreadPool = new Thread[threadCount];
        this.logger = nodeEngine.getLoggingService().getLogger(TaskletExecutionService.class);
        this.idlerCooperative = this.createIdler(properties, JetProperties.JET_IDLE_COOPERATIVE_MIN_MICROSECONDS, JetProperties.JET_IDLE_COOPERATIVE_MAX_MICROSECONDS);
        this.idlerNonCooperative = this.createIdler(properties, JetProperties.JET_IDLE_NONCOOPERATIVE_MIN_MICROSECONDS, JetProperties.JET_IDLE_NONCOOPERATIVE_MAX_MICROSECONDS);
        Arrays.setAll(this.cooperativeWorkers, i -> new CooperativeWorker());
        Arrays.setAll(this.cooperativeThreadPool, i -> new Thread((Runnable)this.cooperativeWorkers[i], String.format("hz.%s.jet.cooperative.thread-%d", this.hzInstanceName, i)));
        Arrays.stream(this.cooperativeThreadPool).forEach(Thread::start);
        MetricsRegistry registry = nodeEngine.getMetricsRegistry();
        MetricDescriptor descriptor = registry.newMetricDescriptor().withTag("module", "jet");
        registry.registerStaticMetrics(descriptor, (Object)this);
        for (int i2 = 0; i2 < this.cooperativeWorkers.length; ++i2) {
            registry.registerStaticMetrics(descriptor.withDiscriminator("cooperativeWorker", String.valueOf(i2)), (Object)this.cooperativeWorkers[i2]);
        }
    }

    CompletableFuture<Void> beginExecute(@Nonnull List<? extends Tasklet> tasklets, @Nonnull CompletableFuture<Void> cancellationFuture, @Nonnull ClassLoader jobClassLoader) {
        ExecutionTracker executionTracker = new ExecutionTracker(tasklets.size(), cancellationFuture);
        try {
            Map<Boolean, List<Tasklet>> byCooperation = tasklets.stream().collect(Collectors.partitioningBy(Tasklet::isCooperative));
            this.submitCooperativeTasklets(executionTracker, jobClassLoader, byCooperation.get(true));
            this.submitBlockingTasklets(executionTracker, jobClassLoader, byCooperation.get(false));
        }
        catch (Throwable t) {
            executionTracker.future.internalCompleteExceptionally(t);
        }
        return executionTracker.future;
    }

    public void shutdown() {
        this.isShutdown = true;
        this.blockingTaskletExecutor.shutdownNow();
        this.hzExecutionService.shutdownExecutor(TASKLET_INIT_CLOSE_EXECUTOR_NAME);
    }

    private void submitBlockingTasklets(ExecutionTracker executionTracker, ClassLoader jobClassLoader, List<Tasklet> tasklets) {
        CountDownLatch startedLatch = new CountDownLatch(tasklets.size());
        executionTracker.blockingFutures = tasklets.stream().map(t -> new BlockingWorker(new TaskletTracker((Tasklet)t, executionTracker, jobClassLoader), startedLatch)).map(this.blockingTaskletExecutor::submit).collect(Collectors.toList());
        Util.uncheckRun(startedLatch::await);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitCooperativeTasklets(ExecutionTracker executionTracker, ClassLoader jobClassLoader, List<Tasklet> tasklets) {
        List[] trackersByThread = new List[this.cooperativeWorkers.length];
        Arrays.setAll(trackersByThread, i -> new ArrayList());
        List futures = tasklets.stream().map(tasklet -> this.hzExecutionService.submit(TASKLET_INIT_CLOSE_EXECUTOR_NAME, () -> Util.doWithClassLoader(jobClassLoader, tasklet::init))).collect(Collectors.toList());
        this.awaitAll(futures);
        Object object = this.lock;
        synchronized (object) {
            for (Tasklet t : tasklets) {
                trackersByThread[this.cooperativeThreadIndex].add(new TaskletTracker(t, executionTracker, jobClassLoader));
                this.cooperativeThreadIndex = (this.cooperativeThreadIndex + 1) % trackersByThread.length;
            }
        }
        for (int i2 = 0; i2 < trackersByThread.length; ++i2) {
            this.cooperativeWorkers[i2].trackers.addAll(trackersByThread[i2]);
        }
        Arrays.stream(this.cooperativeThreadPool).forEach(LockSupport::unpark);
    }

    private void awaitAll(List<? extends Future<?>> futures) {
        Throwable firstFailure = null;
        int failureCount = 0;
        for (Future<?> future : futures) {
            try {
                future.get();
            }
            catch (InterruptedException | ExecutionException e) {
                Throwable peeled = ExceptionUtil.peel(e);
                this.logger.severe("Tasklet initialization failed", peeled);
                firstFailure = firstFailure != null ? firstFailure : peeled;
                ++failureCount;
            }
        }
        if (firstFailure != null) {
            throw new JetException(String.format("%,d of %,d tasklets failed to initialize. One of the failures is attached as the cause and its summary is %s", failureCount, futures.size(), firstFailure), firstFailure);
        }
    }

    public void awaitWorkerTermination() {
        assert (this.isShutdown) : "Not shut down";
        try {
            while (!this.blockingTaskletExecutor.awaitTermination(1L, TimeUnit.MINUTES)) {
                this.logger.warning("Blocking tasklet executor did not terminate in 1 minute");
            }
            for (Thread t : this.cooperativeThreadPool) {
                t.join();
            }
        }
        catch (InterruptedException e) {
            ExceptionUtil.sneakyThrow(e);
        }
    }

    private BackoffIdleStrategy createIdler(HazelcastProperties props, HazelcastProperty minProp, HazelcastProperty maxProp) {
        int min = props.getInteger(minProp);
        int max = props.getInteger(maxProp);
        String minName = minProp.getName();
        String maxName = maxProp.getName();
        if (min >= max) {
            this.logger.warning(String.format("The property %s must be set less than or equal to %s but current values are: %s=%d, %s=%d. Using minimum value as maximum instead.", minName, maxName, minName, min, maxName, max));
            max = min;
        }
        this.logger.info(String.format("Creating idler with %s=%d\u00b5s,%s=%d\u00b5s", minName, min, maxName, max));
        return new BackoffIdleStrategy(0L, 0L, minProp.getTimeUnit().toNanos(min), maxProp.getTimeUnit().toNanos(max));
    }

    private final class ExecutionTracker {
        final NonCompletableFuture future = new NonCompletableFuture();
        volatile List<Future> blockingFutures = Collections.emptyList();
        private final AtomicInteger completionLatch;
        private final AtomicReference<Throwable> executionException = new AtomicReference();

        ExecutionTracker(int taskletCount, CompletableFuture<Void> cancellationFuture) {
            this.completionLatch = new AtomicInteger(taskletCount);
            cancellationFuture.whenComplete((BiConsumer)ExceptionUtil.withTryCatch(TaskletExecutionService.this.logger, (r, e) -> {
                if (e == null) {
                    e = new IllegalStateException("cancellationFuture should be completed exceptionally");
                }
                this.exception((Throwable)e);
                this.blockingFutures.forEach(f -> f.cancel(true));
            }));
        }

        void exception(Throwable t) {
            this.executionException.compareAndSet(null, t);
        }

        void taskletDone() {
            if (this.completionLatch.decrementAndGet() == 0) {
                Throwable ex = this.executionException.get();
                if (ex == null) {
                    this.future.internalComplete();
                } else {
                    this.future.internalCompleteExceptionally(ex);
                }
            }
        }

        boolean executionCompletedExceptionally() {
            return this.executionException.get() != null;
        }
    }

    private final class BlockingTaskThreadFactory
    implements ThreadFactory {
        private final AtomicInteger seq = new AtomicInteger();

        private BlockingTaskThreadFactory() {
        }

        @Override
        public Thread newThread(@Nonnull Runnable r) {
            return new Thread(r, String.format("hz.%s.jet.blocking.thread-%d", TaskletExecutionService.this.hzInstanceName, this.seq.getAndIncrement()));
        }
    }

    private static final class TaskletTracker {
        final Tasklet tasklet;
        final ExecutionTracker executionTracker;
        final ClassLoader jobClassLoader;

        TaskletTracker(Tasklet tasklet, ExecutionTracker executionTracker, ClassLoader jobClassLoader) {
            this.tasklet = tasklet;
            this.executionTracker = executionTracker;
            this.jobClassLoader = jobClassLoader;
        }

        public String toString() {
            return "Tracking " + this.tasklet;
        }
    }

    private final class CooperativeWorker
    implements Runnable {
        private static final int COOPERATIVE_LOGGING_THRESHOLD = 5;
        @Probe(name="taskletCount")
        private final CopyOnWriteArrayList<TaskletTracker> trackers;
        @Probe(name="iterationCount")
        private final Counter iterationCount = SwCounter.newSwCounter();
        private final ProgressTracker progressTracker = new ProgressTracker();
        private final Consumer<TaskletTracker> runTasklet = this::runTasklet;
        private boolean finestLogEnabled;
        private Thread myThread;
        private MetricsImpl.Container userMetricsContextContainer;

        CooperativeWorker() {
            this.trackers = new CopyOnWriteArrayList();
        }

        @Override
        public void run() {
            this.myThread = Thread.currentThread();
            this.userMetricsContextContainer = MetricsImpl.container();
            IdleStrategy idlerLocal = TaskletExecutionService.this.idlerCooperative;
            long idleCount = 0L;
            while (!TaskletExecutionService.this.isShutdown) {
                this.finestLogEnabled = TaskletExecutionService.this.logger.isFinestEnabled();
                this.progressTracker.reset();
                this.trackers.forEach(this.runTasklet);
                this.iterationCount.inc();
                if (this.progressTracker.isMadeProgress()) {
                    idleCount = 0L;
                    continue;
                }
                idlerLocal.idle(++idleCount);
            }
            this.trackers.forEach(t -> t.executionTracker.taskletDone());
            this.trackers.clear();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void runTasklet(TaskletTracker t) {
            long elapsedMs;
            long start = 0L;
            if (this.finestLogEnabled) {
                start = System.nanoTime();
            }
            try {
                this.myThread.setContextClassLoader(t.jobClassLoader);
                this.userMetricsContextContainer.setContext(t.tasklet.getMetricsContext());
                ProgressState result = t.tasklet.call();
                if (result.isDone()) {
                    this.dismissTasklet(t);
                }
                this.progressTracker.mergeWith(result);
            }
            catch (Throwable e) {
                TaskletExecutionService.this.logger.warning("Exception in " + t.tasklet, e);
                t.executionTracker.exception((Throwable)((Object)new JetException("Exception in " + t.tasklet + ": " + e, e)));
            }
            finally {
                this.userMetricsContextContainer.setContext(null);
            }
            if (t.executionTracker.executionCompletedExceptionally()) {
                this.dismissTasklet(t);
            }
            if (this.finestLogEnabled && (elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) > 5L) {
                TaskletExecutionService.this.logger.finest("Cooperative tasklet call of '" + t.tasklet + "' took more than " + 5 + " ms: " + elapsedMs + "ms");
            }
        }

        private void dismissTasklet(TaskletTracker t) {
            LoggingUtil.logFinest(TaskletExecutionService.this.logger, "Tasklet %s is done", t.tasklet);
            t.executionTracker.taskletDone();
            this.trackers.remove(t);
        }
    }

    private final class BlockingWorker
    implements Runnable {
        private final TaskletTracker tracker;
        private final CountDownLatch startedLatch;

        private BlockingWorker(TaskletTracker tracker, CountDownLatch startedLatch) {
            this.tracker = tracker;
            this.startedLatch = startedLatch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ClassLoader clBackup = Thread.currentThread().getContextClassLoader();
            Tasklet t = this.tracker.tasklet;
            Thread.currentThread().setContextClassLoader(this.tracker.jobClassLoader);
            IdleStrategy idlerLocal = TaskletExecutionService.this.idlerNonCooperative;
            MetricsImpl.Container userMetricsContextContainer = MetricsImpl.container();
            try {
                ProgressState result;
                TaskletExecutionService.this.blockingWorkerCount.inc();
                userMetricsContextContainer.setContext(t.getMetricsContext());
                this.startedLatch.countDown();
                t.init();
                long idleCount = 0L;
                do {
                    if ((result = t.call()).isMadeProgress()) {
                        idleCount = 0L;
                        continue;
                    }
                    idlerLocal.idle(++idleCount);
                } while (!result.isDone() && !this.tracker.executionTracker.executionCompletedExceptionally() && !TaskletExecutionService.this.isShutdown);
            }
            catch (Throwable e) {
                TaskletExecutionService.this.logger.warning("Exception in " + t, e);
                this.tracker.executionTracker.exception((Throwable)((Object)new JetException("Exception in " + t + ": " + e, e)));
            }
            finally {
                TaskletExecutionService.this.blockingWorkerCount.inc(-1L);
                userMetricsContextContainer.setContext(null);
                Thread.currentThread().setContextClassLoader(clBackup);
                this.tracker.executionTracker.taskletDone();
            }
        }
    }
}

