package com.tc.async.impl;

import com.tc.async.api.ConfigurationContext;
import com.tc.async.api.EventContext;
import com.tc.async.api.EventHandler;
import com.tc.async.api.EventHandlerException;
import com.tc.async.api.Sink;
import com.tc.async.api.Source;
import com.tc.async.api.SpecializedEventContext;
import com.tc.async.api.Stage;
import com.tc.exception.TCRuntimeException;
import com.tc.logging.TCLogger;
import com.tc.logging.TCLoggerProvider;
import com.tc.util.State;
import java.util.ArrayList;

/* loaded from: input_file:com/tc/async/impl/StageImpl.class */
public class StageImpl implements Stage {
    private static final long pollTime = 3000;
    private static final EventContext PAUSE_TOKEN = new EventContext() { // from class: com.tc.async.impl.StageImpl.1
    };
    private final String name;
    private final EventHandler handler;
    private final Sink sink;
    private final WorkerThread[] threads;
    private final ThreadGroup group;
    private final TCLogger logger;
    private boolean isPaused = false;
    static Class class$com$tc$async$api$Stage;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/tc/async/impl/StageImpl$WorkerThread.class */
    public static class WorkerThread extends Thread {
        private static final State RUNNING = new State("RUNNING");
        private static final State PAUSED = new State("PAUSED");
        private State state;
        private final Source source;
        private final EventHandler handler;
        private boolean shutdownRequested;

        public WorkerThread(String str, Source source, EventHandler eventHandler, ThreadGroup threadGroup) {
            super(threadGroup, str);
            this.shutdownRequested = false;
            setDaemon(true);
            this.source = source;
            this.handler = eventHandler;
        }

        public synchronized void shutdown() {
            this.shutdownRequested = true;
        }

        private synchronized boolean shutdownRequested() {
            return this.shutdownRequested;
        }

        synchronized void pause() {
            while (this.state != PAUSED) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    throw new TCRuntimeException(e);
                }
            }
        }

        synchronized void unpause() {
            if (this.state != PAUSED) {
                throw new AssertionError(new StringBuffer().append("Attempt to unpause when not paused: ").append(this.state).toString());
            }
            this.state = RUNNING;
            notifyAll();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.state = RUNNING;
            while (!shutdownRequested()) {
                try {
                    EventContext poll = this.source.poll(StageImpl.pollTime);
                    if (poll == StageImpl.PAUSE_TOKEN) {
                        synchronized (this) {
                            this.state = PAUSED;
                            notifyAll();
                            while (this.state == PAUSED) {
                                wait();
                            }
                        }
                    } else if (poll != null) {
                        if (poll instanceof SpecializedEventContext) {
                            ((SpecializedEventContext) poll).execute();
                        } else {
                            this.handler.handleEvent(poll);
                        }
                    }
                } catch (EventHandlerException e) {
                    if (!shutdownRequested()) {
                        throw new TCRuntimeException(e);
                    }
                    return;
                } catch (InterruptedException e2) {
                    if (!shutdownRequested()) {
                        throw new TCRuntimeException(e2);
                    }
                    return;
                } finally {
                }
            }
        }
    }

    public StageImpl(TCLoggerProvider tCLoggerProvider, String str, EventHandler eventHandler, Sink sink, int i, ThreadGroup threadGroup) {
        Class cls;
        StringBuffer stringBuffer = new StringBuffer();
        if (class$com$tc$async$api$Stage == null) {
            cls = class$("com.tc.async.api.Stage");
            class$com$tc$async$api$Stage = cls;
        } else {
            cls = class$com$tc$async$api$Stage;
        }
        this.logger = tCLoggerProvider.getLogger(stringBuffer.append(cls.getName()).append(": ").append(str).toString());
        this.name = str;
        this.handler = eventHandler;
        this.threads = new WorkerThread[i];
        this.sink = sink;
        this.group = threadGroup;
    }

    @Override // com.tc.async.api.Stage
    public void destroy() {
        stopThreads();
    }

    @Override // com.tc.async.api.Stage
    public void start(ConfigurationContext configurationContext) {
        this.handler.initializeContext(configurationContext);
        startThreads();
    }

    @Override // com.tc.async.api.Stage
    public Sink getSink() {
        return this.sink;
    }

    @Override // com.tc.async.api.Stage
    public String getName() {
        return this.name;
    }

    private synchronized void startThreads() {
        for (int i = 0; i < this.threads.length; i++) {
            this.threads[i] = new WorkerThread(new StringBuffer().append("WorkerThread(").append(this.name).append(",").append(i).append(")").toString(), (Source) this.sink, this.handler, this.group);
            this.threads[i].start();
        }
    }

    private void stopThreads() {
        for (int i = 0; i < this.threads.length; i++) {
            this.threads[i].shutdown();
        }
    }

    public String toString() {
        return new StringBuffer().append("StageImpl(").append(this.name).append(")").toString();
    }

    @Override // com.tc.async.api.Stage
    public void pause() {
        if (this.isPaused) {
            throw new AssertionError("Attempt to pause while already paused.");
        }
        log("Pausing...");
        ArrayList arrayList = new ArrayList(this.threads.length);
        for (int i = 0; i < this.threads.length; i++) {
            arrayList.add(PAUSE_TOKEN);
        }
        this.sink.pause(arrayList);
        for (int i2 = 0; i2 < this.threads.length; i2++) {
            this.threads[i2].pause();
        }
        this.isPaused = true;
        log("Paused.");
    }

    @Override // com.tc.async.api.Stage
    public void unpause() {
        if (!this.isPaused) {
            throw new AssertionError("Attempt to unpause while not paused.");
        }
        log("Unpausing...");
        this.sink.unpause();
        for (int i = 0; i < this.threads.length; i++) {
            this.threads[i].unpause();
        }
        this.isPaused = false;
        log("Unpaused.");
    }

    private void log(Object obj) {
        this.logger.info(new StringBuffer().append("Stage ").append(this.name).append(": ").append(obj).toString());
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }
}
