/*
 * Decompiled with CFR 0.152.
 */
package org.apache.edgent.runtime.etiao;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.edgent.execution.Job;
import org.apache.edgent.execution.services.RuntimeServices;
import org.apache.edgent.execution.services.ServiceContainer;
import org.apache.edgent.function.BiConsumer;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.graph.Graph;
import org.apache.edgent.oplet.Oplet;
import org.apache.edgent.runtime.etiao.EtiaoJob;
import org.apache.edgent.runtime.etiao.Invocation;
import org.apache.edgent.runtime.etiao.ThreadFactoryTracker;
import org.apache.edgent.runtime.etiao.TrackingScheduledExecutor;
import org.apache.edgent.runtime.etiao.graph.DirectGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Executable
implements RuntimeServices {
    private EtiaoJob job;
    private final ServiceContainer containerServices;
    private final ThreadFactory controlThreads;
    private final BiConsumer<Object, Throwable> completionHandler;
    private final ThreadFactoryTracker userThreads;
    private final TrackingScheduledExecutor controlScheduler;
    private final TrackingScheduledExecutor userScheduler;
    private Throwable lastError;
    private static final Logger logger = LoggerFactory.getLogger(Executable.class);
    private final ServiceContainer jobServices = new ServiceContainer();
    private List<Invocation<? extends Oplet<?, ?>, ?, ?>> invocations = new ArrayList();
    private Thread completer;
    private boolean completerNotify;

    public Executable(String name, ServiceContainer containerServices) {
        this(name, containerServices, null);
    }

    public Executable(String name, ServiceContainer containerServices, ThreadFactory threads) {
        this.containerServices = containerServices;
        this.controlThreads = threads != null ? threads : Executors.defaultThreadFactory();
        this.completionHandler = new BiConsumer<Object, Throwable>(){
            private static final long serialVersionUID = 1L;

            public void accept(Object source, Throwable t) {
                if (Executable.this.job == null) {
                    throw new IllegalStateException("A job has not been instantiated");
                }
                if (t != null) {
                    Executable.this.setLastError(t);
                    Executable.this.job.updateHealth(t);
                    Executable.this.cleanup();
                } else if (!(Executable.this.job.getCurrentState() != Job.State.RUNNING || source != Executable.this.userScheduler && source != Executable.this.userThreads || Executable.this.hasActiveTasks())) {
                    logger.info("No more active user tasks");
                }
                Executable.this.notifyCompleter();
            }
        };
        this.userThreads = new ThreadFactoryTracker(name, this.controlThreads, this.completionHandler);
        this.controlScheduler = TrackingScheduledExecutor.newScheduler(this.controlThreads, this.completionHandler);
        this.userScheduler = TrackingScheduledExecutor.newScheduler(this.userThreads, this.completionHandler);
    }

    private ThreadFactory getThreads() {
        return this.userThreads;
    }

    public ScheduledExecutorService getScheduler() {
        return this.userScheduler;
    }

    public <T> T getService(Class<T> serviceClass) {
        Object service = this.jobServices.getService(serviceClass);
        if (service != null) {
            return (T)service;
        }
        return (T)this.containerServices.getService(serviceClass);
    }

    public <T extends Oplet<I, O>, I, O> Invocation<T, I, O> addOpletInvocation(T oplet, int inputs, int outputs) {
        Invocation invocation = new Invocation("OP_" + this.invocations.size(), oplet, inputs, outputs);
        this.invocations.add(invocation);
        return invocation;
    }

    public void initialize() {
        this.jobServices.addService(ThreadFactory.class, (Object)this.getThreads());
        this.jobServices.addService(ScheduledExecutorService.class, (Object)this.getScheduler());
        this.invokeAction((Consumer & Serializable)invocation -> invocation.initialize(this.job, this));
    }

    public void start() {
        this.invokeAction((Consumer & Serializable)invocation -> invocation.start());
    }

    public void close() {
        this.getScheduler().shutdownNow();
        this.userThreads.shutdownNow();
        this.invokeAction((Consumer & Serializable)invocation -> {
            try {
                invocation.close();
            }
            catch (Throwable t) {
                logger.debug("Exception caught while closing invocation {}: {}", (Object)invocation.getId(), (Object)t);
            }
            finally {
                this.jobServices.cleanOplet(this.job.getId(), invocation.getId());
                this.job.getContainerServices().cleanOplet(this.job.getId(), invocation.getId());
            }
        });
        this.notifyCompleter();
        List<Runnable> unfinished = this.controlScheduler.shutdownNow();
        if (!unfinished.isEmpty()) {
            logger.warn("Scheduler could not finish {} tasks", (Object)unfinished.size());
        }
    }

    private static long getTimeoutValue(long timeout, TimeUnit units) {
        if (Boolean.getBoolean("edgent.build.ci")) {
            return timeout * 2L;
        }
        return timeout;
    }

    private void invokeAction(Consumer<Invocation<?, ?, ?>> action) {
        ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<Boolean>(this.controlScheduler);
        for (Invocation<? extends Oplet<?, ?>, ?, ?> invocation : this.invocations) {
            completer.submit(() -> {
                action.accept((Object)invocation);
                return true;
            });
        }
        long getFutureTimeout = 10L;
        TimeUnit getFutureTimeoutUnits = TimeUnit.SECONDS;
        getFutureTimeout = Executable.getTimeoutValue(getFutureTimeout, getFutureTimeoutUnits);
        for (int remainingTasks = this.invocations.size(); remainingTasks > 0; --remainingTasks) {
            try {
                Future completed = completer.poll(getFutureTimeout, getFutureTimeoutUnits);
                if (completed == null) {
                    throw new RuntimeException(new TimeoutException(String.format("%d%s timeout", getFutureTimeout, getFutureTimeoutUnits.toString())));
                }
                try {
                    completed.get();
                }
                catch (InterruptedException | CancellationException | ExecutionException e) {
                    logger.error("Exception caught while invoking action: {}", (Throwable)e);
                }
                continue;
            }
            catch (InterruptedException e) {
                logger.error("Exception caught while waiting for future to complete", (Throwable)e);
            }
        }
        this.job.onActionComplete();
    }

    private void cleanup() {
        this.userScheduler.shutdown();
        this.userThreads.shutdown();
    }

    public boolean hasActiveTasks() {
        return this.userScheduler.hasActiveTasks() || this.userThreads.hasActiveNonDaemonThreads();
    }

    public synchronized Throwable getLastError() {
        return this.lastError;
    }

    private synchronized void setLastError(Throwable lastError) {
        this.lastError = lastError;
    }

    public Job createJob(Graph graph, String topologyName, String jobName) {
        this.job = new EtiaoJob((DirectGraph)graph, topologyName, jobName, this.containerServices);
        return this.job;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    final boolean complete(long timeoutMillis) throws InterruptedException, ExecutionException {
        long totalWait = timeoutMillis;
        if (totalWait <= 0L) {
            totalWait = 1000L;
        }
        Executable executable = this;
        synchronized (executable) {
            this.completer = Thread.currentThread();
        }
        long start = System.currentTimeMillis();
        try {
            while (System.currentTimeMillis() - start < totalWait) {
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                Throwable t = this.getLastError();
                if (t != null) {
                    throw Executable.executionException(t);
                }
                if (!this.hasActiveTasks()) break;
                Thread thread = this.completer;
                synchronized (thread) {
                    block19: {
                        if (!this.completerNotify) {
                            try {
                                this.completer.wait(totalWait);
                            }
                            catch (InterruptedException e) {
                                if (this.completerNotify) break block19;
                                throw e;
                            }
                        }
                    }
                    this.completerNotify = false;
                }
            }
        }
        finally {
            Executable executable2 = this;
            synchronized (executable2) {
                this.completer = null;
            }
        }
        if (System.currentTimeMillis() - start >= totalWait) return false;
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyCompleter() {
        Thread completer;
        Object object = this;
        synchronized (object) {
            completer = this.completer;
        }
        if (completer == null) {
            return;
        }
        object = completer;
        synchronized (object) {
            this.completerNotify = true;
            completer.notifyAll();
        }
    }

    static ExecutionException executionException(Throwable t) {
        return t instanceof ExecutionException ? (ExecutionException)t : new ExecutionException(t);
    }
}

