/*
 * Decompiled with CFR 0.152.
 */
package com.tc.async.impl;

import com.tc.async.api.ConfigurationContext;
import com.tc.async.api.EventHandler;
import com.tc.async.api.EventHandlerException;
import com.tc.async.api.MultiThreadedEventContext;
import com.tc.async.api.Sink;
import com.tc.async.api.Source;
import com.tc.async.api.Stage;
import com.tc.async.api.StageListener;
import com.tc.async.impl.DirectEventCreator;
import com.tc.async.impl.Event;
import com.tc.async.impl.EventCreator;
import com.tc.async.impl.StageQueue;
import com.tc.exception.TCNotRunningException;
import com.tc.exception.TCRuntimeException;
import com.tc.exception.TCServerRestartException;
import com.tc.exception.TCShutdownServerException;
import com.tc.logging.TCLoggerProvider;
import com.tc.properties.TCPropertiesImpl;
import com.tc.util.concurrent.QueueFactory;
import com.tc.util.concurrent.ThreadUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.slf4j.Logger;
import org.terracotta.tripwire.StageMonitor;
import org.terracotta.tripwire.TripwireFactory;

public class StageImpl<EC>
implements Stage<EC> {
    private static final long pollTime = 3000L;
    private final String name;
    private final EventHandler<EC> handler;
    private final StageQueue<EC> stageQueue;
    private final WorkerThread[] threads;
    private final ThreadGroup group;
    private final StageListener listener;
    private final Logger logger;
    private final int sleepMs;
    private final boolean pausable;
    private volatile boolean paused;
    private volatile boolean shutdown = true;
    private final LongAdder inflight = new LongAdder();
    private final long warnStallTime = TCPropertiesImpl.getProperties().getLong("l2.seda.stage.stall.warning", 500L);
    private volatile long lastWarnTime = 0L;
    private int spinning = 0;
    private StageMonitor event;

    public StageImpl(TCLoggerProvider loggerProvider, String name, Class<EC> type, EventHandler<EC> handler, int queueCount, ThreadGroup group, QueueFactory queueFactory, StageListener listener, int queueSize, boolean canBeDirect, boolean stallLogging) {
        this.logger = loggerProvider.getLogger(Stage.class.getName() + ": " + name);
        this.name = name;
        if (queueCount > 1 && !MultiThreadedEventContext.class.isAssignableFrom(type)) {
            throw new IllegalArgumentException("the requested queue count is greater than one but the event type is not multi-threaded for stage:" + this.name);
        }
        this.threads = new WorkerThread[queueCount];
        this.handler = handler;
        this.stageQueue = StageQueue.StageQueueFactory.factory(queueCount, queueFactory, type, this.eventCreator(canBeDirect), loggerProvider, name, queueSize);
        this.group = group;
        this.listener = listener;
        this.sleepMs = TCPropertiesImpl.getProperties().getInt("seda." + name + ".sleepMs", 0);
        if (this.sleepMs > 0) {
            this.logger.warn("Sleep of " + this.sleepMs + "ms enabled for stage " + name);
        }
        this.pausable = TCPropertiesImpl.getProperties().getBoolean("seda." + name + ".pausable", false);
        if (this.pausable) {
            this.logger.warn("Stage pausing is enabled for stage " + name);
        }
        this.event = TripwireFactory.createStageMonitor(name, queueCount);
        if (!stallLogging) {
            this.lastWarnTime = Long.MAX_VALUE;
        }
    }

    private EventCreator<EC> eventCreator(boolean direct) {
        return direct ? new DirectEventCreator<EC>(this.baseCreator(), () -> this.isEmpty()) : this.baseCreator();
    }

    private EventCreator<EC> baseCreator() {
        return event -> {
            long start = System.nanoTime();
            this.inflight.increment();
            return () -> {
                long exec = System.nanoTime();
                if (exec - start > TimeUnit.MILLISECONDS.toNanos(this.warnStallTime)) {
                    this.warnIfWarranted("queue", event, TimeUnit.NANOSECONDS.toMillis(exec - start));
                }
                try {
                    this.handler.handleEvent(event);
                    long end = System.nanoTime();
                    if (end - exec > TimeUnit.MILLISECONDS.toNanos(this.warnStallTime)) {
                        this.warnIfWarranted("executed", event, TimeUnit.NANOSECONDS.toMillis(end - exec));
                    }
                }
                finally {
                    this.inflight.decrement();
                }
            };
        };
    }

    private void warnIfWarranted(String type, Object event, long time) {
        long now = System.currentTimeMillis();
        if (now - this.lastWarnTime > 1000L) {
            this.lastWarnTime = now;
            this.logger.warn("Stage: {} has {} event {} for {}ms", new Object[]{this.name, type, event, time});
            if (this.listener != null) {
                this.listener.stageStalled(this.name, time, this.inflight.intValue());
            }
        }
    }

    @Override
    public boolean isEmpty() {
        return this.inflight.sum() == 0L;
    }

    @Override
    public int size() {
        return this.inflight.intValue();
    }

    @Override
    public void setSpinningCount(int spin) {
        this.spinning = spin;
    }

    public void trackExtraStatistics(boolean enable) {
        this.stageQueue.enableAdditionalStatistics(enable);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        StageImpl stageImpl = this;
        synchronized (stageImpl) {
            if (this.shutdown) {
                return;
            }
            this.shutdown = true;
        }
        this.stageQueue.close();
        this.event.unregister();
        this.stopThreads();
    }

    @Override
    public void destroy() {
        if (!this.shutdown) {
            this.stop();
            this.handler.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(ConfigurationContext context) {
        StageImpl stageImpl = this;
        synchronized (stageImpl) {
            if (!this.shutdown) {
                return;
            }
            this.shutdown = false;
        }
        this.handler.initializeContext(context);
        this.startThreads(context.getIdentifier());
        this.event.register();
    }

    @Override
    public Sink<EC> getSink() {
        return this.stageQueue;
    }

    @Override
    public int pause() {
        this.paused = true;
        return this.inflight.intValue();
    }

    @Override
    public void unpause() {
        this.paused = false;
    }

    private synchronized void startThreads(String contextId) {
        for (int i = 0; i < this.threads.length; ++i) {
            String threadName = contextId != null ? contextId + " - " : "";
            threadName = threadName + "WorkerThread(" + this.name + ", " + i;
            threadName = this.threads.length > 1 ? threadName + ", " + this.stageQueue.getSource(i).getSourceName() + ")" : threadName + ")";
            this.threads[i] = new WorkerThread(threadName, this.stageQueue.getSource(i));
            this.threads[i].start();
        }
    }

    private synchronized void stopThreads() {
        for (WorkerThread thread : this.threads) {
            try {
                thread.join();
            }
            catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
        }
    }

    @Override
    public String getName() {
        return this.name;
    }

    public String toString() {
        return "StageImpl(" + this.name + ")";
    }

    void waitForIdle() {
        Arrays.stream(this.threads).forEach(t -> {
            while (!t.isIdle()) {
                ThreadUtil.reallySleep(500L);
            }
        });
    }

    @Override
    public Map<String, ?> getState() {
        LinkedHashMap<String, Object> data = new LinkedHashMap<String, Object>();
        ArrayList tl = new ArrayList(this.threads.length);
        Arrays.stream(this.threads).forEach(t -> {
            if (t != null) {
                tl.add(((WorkerThread)t).getStats());
            }
        });
        data.put("name", this.name);
        data.put("threadCount", this.threads.length);
        data.put("backlog", this.inflight.sum());
        data.put("sink", this.stageQueue.getState());
        data.put("threads", tl);
        return data;
    }

    private static boolean isTCNotRunningException(Throwable e) {
        while (e != null) {
            if (e instanceof TCNotRunningException) {
                return true;
            }
            e = e.getCause();
        }
        return false;
    }

    @Override
    public boolean isStarted() {
        return !this.shutdown;
    }

    private class WorkerThread<EC>
    extends Thread {
        private final Source source;
        private volatile boolean idle;
        private long idleTime;
        private long runTime;
        private long count;

        public WorkerThread(String name, Source source) {
            super(StageImpl.this.group, name);
            this.idle = false;
            this.idleTime = 0L;
            this.runTime = 0L;
            this.count = 0L;
            this.setDaemon(true);
            this.source = source;
        }

        private void handleStageDebugPauses() {
            if (StageImpl.this.sleepMs > 0) {
                ThreadUtil.reallySleep(StageImpl.this.sleepMs);
            }
            while (StageImpl.this.paused || StageImpl.this.pausable && "paused".equalsIgnoreCase(System.getProperty(StageImpl.this.name))) {
                if (!StageImpl.this.paused) {
                    StageImpl.this.logger.info("Stage paused, sleeping for 1s");
                }
                ThreadUtil.reallySleep(1000L);
            }
        }

        public boolean isIdle() {
            return this.idle && this.source.isEmpty();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            boolean spinner;
            int spinCount = 0;
            boolean bl = spinner = StageImpl.this.spinning > 0;
            while (!StageImpl.this.shutdown || !this.source.isEmpty()) {
                Event ctxt = null;
                try {
                    long stopped = System.nanoTime();
                    this.idle = true;
                    Event event = ctxt = spinner ? this.source.poll(0L) : this.source.poll(3000L);
                    if (ctxt != null) {
                        this.idle = false;
                        long running = System.nanoTime();
                        this.handleStageDebugPauses();
                        this.idleTime += running - stopped;
                        ctxt.call();
                        long finishRun = System.nanoTime();
                        this.runTime += finishRun - running;
                        ++this.count;
                        StageImpl.this.event.eventOccurred(StageImpl.this.size(), finishRun - running);
                        spinCount = 0;
                        spinner = StageImpl.this.spinning > 0;
                        continue;
                    }
                    this.idleTime += System.nanoTime() - stopped;
                    if (spinCount++ < StageImpl.this.spinning) continue;
                    spinner = false;
                }
                catch (InterruptedException ie) {
                    if (StageImpl.this.shutdown) continue;
                    throw new TCRuntimeException(ie);
                }
                catch (EventHandlerException ie) {
                    if (StageImpl.this.shutdown) continue;
                    throw new TCRuntimeException(ie);
                }
                catch (TCServerRestartException restart) {
                    throw restart;
                }
                catch (TCShutdownServerException shutdown) {
                    throw shutdown;
                }
                catch (Exception e) {
                    if (StageImpl.isTCNotRunningException(e)) {
                        if (StageImpl.this.shutdown) continue;
                        StageImpl.this.logger.info("Ignoring " + TCNotRunningException.class.getSimpleName() + " while handling context: " + ctxt);
                        continue;
                    }
                    StageImpl.this.logger.error("Uncaught exception in stage", (Throwable)e);
                    throw new TCRuntimeException("Uncaught exception in stage", e);
                }
                finally {
                    ctxt = null;
                }
            }
        }

        private Map<String, ?> getStats() {
            LinkedHashMap<String, Number> state = new LinkedHashMap<String, Number>();
            state.put("idle", this.idleTime);
            state.put("run", this.runTime);
            state.put("processed", this.count);
            state.put("backlog", this.source.size());
            return state;
        }
    }
}

