package org.terracotta.message.pipe;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/terracotta/message/pipe/PipeProcessor.class */
public abstract class PipeProcessor<T> implements Runnable {
    private static final transient Logger logger = LoggerFactory.getLogger(PipeProcessor.class);
    private static final String SHUTDOWN = "normal shutdown using shutdown()";
    private static final String STOP = "shutdown using stop()";
    protected final Pipe<T> pipe;
    private final boolean daemon;
    private volatile String reasonForShutdown = null;
    private transient Thread processThread;

    public PipeProcessor(Pipe<T> pipe, boolean z) {
        this.pipe = pipe;
        this.daemon = z;
    }

    public void start() {
        this.processThread = new Thread(this);
        this.processThread.setDaemon(this.daemon);
        this.processThread.start();
    }

    public void stop() {
        this.reasonForShutdown = STOP;
        if (this.processThread != null) {
            this.processThread.interrupt();
        }
    }

    public void shutdown() {
        this.reasonForShutdown = SHUTDOWN;
        if (this.processThread != null) {
            this.processThread.interrupt();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            logger.info("Running pipe processor...");
            while (this.reasonForShutdown != STOP && (this.reasonForShutdown != SHUTDOWN || this.pipe.peek() != null)) {
                try {
                } catch (InterruptedException e) {
                    logger.debug("Stopping pipe processor after calling stop or shutdown method.");
                } catch (Exception e2) {
                    logger.error("Stopping pipe processor due to exception.", e2);
                }
                if (!event(this.pipe.take())) {
                    logger.debug("Stopping pipe processor due to event break.");
                    this.reasonForShutdown = "normal shutdown due to break";
                    break;
                }
            }
            Thread.interrupted();
            stopped();
            logger.info("Pipe processor stopped.");
        } catch (Throwable th) {
            Thread.interrupted();
            stopped();
            logger.info("Pipe processor stopped.");
            throw th;
        }
    }

    public abstract boolean event(T t) throws Exception;

    protected void stopped() {
    }
}
