package com.tc.async.impl;

import com.tc.async.api.ConfigurationContext;
import com.tc.async.api.EventContext;
import com.tc.async.api.EventHandler;
import com.tc.async.api.EventHandlerException;
import com.tc.async.api.Sink;
import com.tc.async.api.Source;
import com.tc.async.api.SpecializedEventContext;
import com.tc.async.api.Stage;
import com.tc.exception.PlatformRejoinException;
import com.tc.exception.TCNotRunningException;
import com.tc.exception.TCRuntimeException;
import com.tc.logging.TCLogger;
import com.tc.logging.TCLoggerProvider;
import com.tc.properties.TCPropertiesImpl;
import com.tc.text.PrettyPrinter;
import com.tc.util.concurrent.QueueFactory;
import com.tc.util.concurrent.ThreadUtil;

/* loaded from: input_file:L1/terracotta-l1-ee-4.3.10.1.12.jar/com/tc/async/impl/StageImpl.class_terracotta */
public class StageImpl implements Stage {
    private static final long pollTime = 3000;
    private final String name;
    private final EventHandler handler;
    private final StageQueueImpl stageQueue;
    private final WorkerThread[] threads;
    private final ThreadGroup group;
    private final TCLogger logger;
    private final int sleepMs;
    private final boolean pausable;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:L1/terracotta-l1-ee-4.3.10.1.12.jar/com/tc/async/impl/StageImpl$WorkerThread.class_terracotta */
    public static class WorkerThread extends Thread {
        private final Source source;
        private final EventHandler handler;
        private volatile boolean shutdownRequested;
        private final TCLogger tcLogger;
        private final int sleepMs;
        private final boolean pausable;
        private final String stageName;

        public WorkerThread(String str, Source source, EventHandler eventHandler, ThreadGroup threadGroup, TCLogger tCLogger, int i, boolean z, String str2) {
            super(threadGroup, str);
            this.shutdownRequested = false;
            this.tcLogger = tCLogger;
            setDaemon(true);
            this.source = source;
            this.handler = eventHandler;
            this.sleepMs = i;
            this.pausable = z;
            this.stageName = str2;
        }

        public void shutdown() {
            this.shutdownRequested = true;
        }

        private boolean shutdownRequested() {
            return this.shutdownRequested;
        }

        private void handleStageDebugPauses() {
            if (this.sleepMs > 0) {
                ThreadUtil.reallySleep(this.sleepMs);
            }
            while (this.pausable && "paused".equalsIgnoreCase(System.getProperty(this.stageName))) {
                this.tcLogger.info("Stage paused, sleeping for 1s");
                ThreadUtil.reallySleep(1000L);
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!shutdownRequested()) {
                EventContext eventContext = null;
                try {
                    eventContext = this.source.poll(StageImpl.pollTime);
                    if (eventContext != null) {
                        handleStageDebugPauses();
                        if (eventContext instanceof SpecializedEventContext) {
                            ((SpecializedEventContext) eventContext).execute();
                        } else {
                            this.handler.handleEvent(eventContext);
                        }
                    }
                } catch (Exception e) {
                    if (StageImpl.isTCNotRunningException(e)) {
                        if (shutdownRequested()) {
                            return;
                        } else {
                            this.tcLogger.info("Ignoring " + TCNotRunningException.class.getSimpleName() + " while handling context: " + eventContext);
                        }
                    } else {
                        if (!StageImpl.isRejoinInProgressException(e)) {
                            throw new TCRuntimeException("Uncaught exception in stage", e);
                        }
                        if (shutdownRequested()) {
                            return;
                        } else {
                            this.tcLogger.info("Ignoring " + PlatformRejoinException.class.getSimpleName() + " while handling context: " + eventContext, e);
                        }
                    }
                } catch (EventHandlerException e2) {
                    if (!shutdownRequested()) {
                        throw new TCRuntimeException(e2);
                    }
                    return;
                } catch (InterruptedException e3) {
                    if (!shutdownRequested()) {
                        throw new TCRuntimeException(e3);
                    }
                    return;
                } finally {
                }
            }
        }
    }

    public StageImpl(TCLoggerProvider tCLoggerProvider, String str, EventHandler eventHandler, int i, int i2, ThreadGroup threadGroup, QueueFactory queueFactory, int i3) {
        this.logger = tCLoggerProvider.getLogger(Stage.class.getName() + ": " + str);
        this.name = str;
        this.handler = eventHandler;
        this.threads = new WorkerThread[i];
        if (i2 > i) {
            this.logger.warn("Thread to Queue Ratio " + i2 + " > Worker Threads " + i);
        }
        this.stageQueue = new StageQueueImpl(i, i2, queueFactory, tCLoggerProvider, str, i3);
        this.group = threadGroup;
        this.sleepMs = TCPropertiesImpl.getProperties().getInt("seda." + str + ".sleepMs", 0);
        if (this.sleepMs > 0) {
            this.logger.warn("Sleep of " + this.sleepMs + "ms enabled for stage " + str);
        }
        this.pausable = TCPropertiesImpl.getProperties().getBoolean("seda." + str + ".pausable", false);
        if (this.pausable) {
            this.logger.warn("Stage pausing is enabled for stage " + str);
        }
    }

    @Override // com.tc.async.api.Stage
    public void destroy() {
        stopThreads();
    }

    @Override // com.tc.async.api.Stage
    public void start(ConfigurationContext configurationContext) {
        this.handler.initializeContext(configurationContext);
        startThreads();
    }

    @Override // com.tc.async.api.Stage
    public Sink getSink() {
        return this.stageQueue;
    }

    private synchronized void startThreads() {
        for (int i = 0; i < this.threads.length; i++) {
            String str = "WorkerThread(" + this.name + ", " + i;
            this.threads[i] = new WorkerThread(this.threads.length > 1 ? str + ", " + this.stageQueue.getSource(i).getSourceName() + ")" : str + ")", this.stageQueue.getSource(i), this.handler, this.group, this.logger, this.sleepMs, this.pausable, this.name);
            this.threads[i].start();
        }
    }

    private void stopThreads() {
        for (WorkerThread workerThread : this.threads) {
            workerThread.shutdown();
            workerThread.interrupt();
        }
        this.handler.destroy();
    }

    @Override // com.tc.async.api.Stage
    public String getName() {
        return this.name;
    }

    public String toString() {
        return "StageImpl(" + this.name + ")";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isTCNotRunningException(Throwable th) {
        Throwable th2 = null;
        while (th != null) {
            th2 = th;
            th = th.getCause();
        }
        return th2 instanceof TCNotRunningException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isRejoinInProgressException(Throwable th) {
        Throwable th2 = null;
        while (th != null) {
            th2 = th;
            th = th.getCause();
        }
        return th2 instanceof PlatformRejoinException;
    }

    @Override // com.tc.text.PrettyPrintable
    public PrettyPrinter prettyPrint(PrettyPrinter prettyPrinter) {
        prettyPrinter.print("Queue depth: " + getSink().size() + " " + this.name).flush();
        return prettyPrinter;
    }
}
