/*
 * Decompiled with CFR 0.152.
 */
package org.mule.processor;

import java.text.MessageFormat;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import org.mule.DefaultMuleEvent;
import org.mule.api.MessagingException;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.NamedObject;
import org.mule.api.context.WorkManagerSource;
import org.mule.api.exception.MessagingExceptionHandler;
import org.mule.api.exception.SystemExceptionHandler;
import org.mule.api.lifecycle.Initialisable;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.lifecycle.Lifecycle;
import org.mule.api.lifecycle.LifecycleException;
import org.mule.api.lifecycle.LifecycleState;
import org.mule.api.service.FailedToQueueEventException;
import org.mule.config.QueueProfile;
import org.mule.config.i18n.CoreMessages;
import org.mule.config.i18n.MessageFactory;
import org.mule.management.stats.QueueStatistics;
import org.mule.processor.AsyncInterceptingMessageProcessor;
import org.mule.util.concurrent.WaitableBoolean;
import org.mule.util.queue.Queue;
import org.mule.util.queue.QueueSession;
import org.mule.work.AbstractMuleEventWork;

public class SedaStageInterceptingMessageProcessor
extends AsyncInterceptingMessageProcessor
implements WorkListener,
Work,
Lifecycle {
    protected static final String QUEUE_NAME_PREFIX = "seda.queue";
    protected QueueProfile queueProfile;
    protected int queueTimeout;
    protected LifecycleState lifecycleState;
    protected QueueStatistics queueStatistics;
    protected MuleContext muleContext;
    protected String name;
    protected Queue queue;
    private WaitableBoolean queueDraining = new WaitableBoolean(false);

    public SedaStageInterceptingMessageProcessor(String name, QueueProfile queueProfile, int queueTimeout, WorkManagerSource workManagerSource, boolean doThreading, LifecycleState lifecycleState, QueueStatistics queueStatistics, MuleContext muleContext) {
        super(workManagerSource, doThreading);
        this.name = name;
        this.queueProfile = queueProfile;
        this.queueTimeout = queueTimeout;
        this.lifecycleState = lifecycleState;
        this.queueStatistics = queueStatistics;
        this.muleContext = muleContext;
    }

    public MuleEvent process(MuleEvent event) throws MuleException {
        if (this.next == null) {
            return event;
        }
        if (event.getEndpoint().getExchangePattern().hasResponse() || event.getEndpoint().getTransactionConfig().isTransacted()) {
            return this.processNext(event);
        }
        this.processAsync(event);
        return null;
    }

    protected void processAsync(MuleEvent event) throws MuleException {
        try {
            if (this.isStatsEnabled()) {
                this.queueStatistics.incQueuedEvent();
            }
            this.enqueue(event);
        }
        catch (Exception e) {
            throw new FailedToQueueEventException(CoreMessages.interruptedQueuingEventFor(this.getStageDescription()), event, (Throwable)e);
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)("MuleEvent added to queue for: " + this.getStageDescription()));
        }
    }

    protected boolean isStatsEnabled() {
        return this.queueStatistics != null && this.queueStatistics.isEnabled();
    }

    protected void enqueue(MuleEvent event) throws Exception {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)MessageFormat.format("{1}: Putting event on queue {2}", this.queue.getName(), this.getStageDescription(), event));
        }
        this.queue.put(event);
    }

    protected MuleEvent dequeue() throws Exception {
        MuleEvent event;
        if (this.queue == null) {
            return null;
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)MessageFormat.format("{0}: Polling queue {1}, timeout = {2}", this.getStageName(), this.getStageDescription(), this.queueTimeout));
        }
        if ((event = (MuleEvent)this.queue.poll(this.queueTimeout)) != null && this.lifecycleState.isPhaseComplete("pause")) {
            this.queue.untake(event);
            return null;
        }
        return event;
    }

    public void run() {
        DefaultMuleEvent event = null;
        QueueSession queueSession = this.muleContext.getQueueManager().getQueueSession();
        while (!this.lifecycleState.isStopped()) {
            try {
                if (this.lifecycleState.isPhaseComplete("pause")) {
                    this.waitIfPaused();
                    if (this.lifecycleState.isStopping()) {
                        this.queueDraining.set(true);
                        if (!this.isQueuePersistent() && queueSession != null && this.getQueueSize() > 0) {
                            this.logger.warn((Object)CoreMessages.stopPausedSedaStageNonPeristentQueueMessageLoss(this.getQueueSize(), this.getQueueName()));
                        }
                        this.queueDraining.set(false);
                        break;
                    }
                }
                if (this.lifecycleState.isStopping() && (this.isQueuePersistent() || queueSession == null || this.getQueueSize() <= 0)) {
                    this.queueDraining.set(false);
                    break;
                }
                event = (DefaultMuleEvent)this.dequeue();
            }
            catch (InterruptedException ie) {
                this.queueDraining.set(false);
                break;
            }
            catch (Exception e) {
                SystemExceptionHandler exceptionListener = this.muleContext.getExceptionListener();
                if (e instanceof MuleException) {
                    exceptionListener.handleException(e);
                }
                exceptionListener.handleException(new MessagingException(CoreMessages.eventProcessingFailedFor(this.getStageDescription()), event, (Throwable)e));
            }
            if (event == null) continue;
            try {
                if (this.isStatsEnabled()) {
                    this.queueStatistics.decQueuedEvent();
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)MessageFormat.format("{0}: Dequeued event from {1}", this.getStageDescription(), this.getQueueName()));
                }
                SedaStageWorker work = new SedaStageWorker(event);
                if (this.doThreading) {
                    this.workManagerSource.getWorkManager().scheduleWork(work, Long.MAX_VALUE, null, this);
                    continue;
                }
                work.run();
            }
            catch (Exception e) {
                event.getFlowConstruct().getExceptionListener().handleException(e, event);
            }
        }
    }

    protected boolean isQueuePersistent() {
        return this.queueProfile.isPersistent();
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    protected String getQueueName() {
        return String.format("%s(%s)", QUEUE_NAME_PREFIX, this.getStageName());
    }

    protected String getStageName() {
        if (this.name != null) {
            return this.name;
        }
        if (this.next instanceof NamedObject) {
            return ((NamedObject)((Object)this.next)).getName();
        }
        return String.format("%s.%s", this.next.getClass().getName(), this.next.hashCode());
    }

    protected String getStageDescription() {
        return "SEDA Stage " + this.getStageName();
    }

    protected void waitIfPaused() throws InterruptedException {
        if (this.logger.isDebugEnabled() && this.lifecycleState.isPhaseComplete("pause")) {
            this.logger.debug((Object)(this.getStageDescription() + " is paused. Polling halted until resumed is called"));
        }
        while (this.lifecycleState.isPhaseComplete("pause") && !this.lifecycleState.isStopping()) {
            Thread.sleep(500L);
        }
    }

    public void release() {
        this.queueDraining.set(false);
    }

    public void initialise() throws InitialisationException {
        if (this.next == null) {
            throw new IllegalStateException("Next message processor cannot be null with this InterceptingMessageProcessor");
        }
        this.queueProfile.configureQueue(this.getQueueName(), this.muleContext.getQueueManager());
        this.queue = this.muleContext.getQueueManager().getQueueSession().getQueue(this.getQueueName());
        if (this.queue == null) {
            throw new InitialisationException(MessageFactory.createStaticMessage("Queue not created for " + this.getStageDescription()), (Initialisable)this);
        }
    }

    public void start() throws MuleException {
        if (this.queue == null) {
            throw new IllegalStateException("Not initialised");
        }
        try {
            this.workManagerSource.getWorkManager().scheduleWork(this, Long.MAX_VALUE, null, this);
        }
        catch (WorkException e) {
            throw new LifecycleException(CoreMessages.failedToStart(this.getStageDescription()), e, this);
        }
    }

    public void stop() throws MuleException {
        if (this.queue != null && this.queue.size() > 0) {
            try {
                this.queueDraining.whenFalse(null);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public void dispose() {
        this.queue = null;
    }

    private class SedaStageWorker
    extends AbstractMuleEventWork {
        public SedaStageWorker(MuleEvent event) {
            super(event);
        }

        protected void doRun() {
            try {
                SedaStageInterceptingMessageProcessor.this.processNext(this.event);
            }
            catch (Exception e) {
                this.event.getSession().setValid(false);
                MessagingExceptionHandler exceptionListener = this.event.getFlowConstruct().getExceptionListener();
                if (e instanceof MessagingException) {
                    exceptionListener.handleException(e, this.event);
                }
                exceptionListener.handleException(new MessagingException(CoreMessages.eventProcessingFailedFor(SedaStageInterceptingMessageProcessor.this.getStageDescription()), this.event, (Throwable)e), this.event);
            }
        }
    }
}

