/*
 * Decompiled with CFR 0.152.
 */
package org.mule.processor;

import java.text.MessageFormat;
import java.util.concurrent.TimeUnit;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import org.mule.DefaultMuleEvent;
import org.mule.OptimizedRequestContext;
import org.mule.api.MessagingException;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.NameableObject;
import org.mule.api.config.ThreadingProfile;
import org.mule.api.execution.ExecutionCallback;
import org.mule.api.lifecycle.Initialisable;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.lifecycle.Lifecycle;
import org.mule.api.lifecycle.LifecycleCallback;
import org.mule.api.lifecycle.LifecycleException;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.service.FailedToQueueEventException;
import org.mule.config.QueueProfile;
import org.mule.config.i18n.CoreMessages;
import org.mule.config.i18n.MessageFactory;
import org.mule.execution.TransactionalErrorHandlingExecutionTemplate;
import org.mule.lifecycle.EmptyLifecycleCallback;
import org.mule.management.stats.QueueStatistics;
import org.mule.processor.AsyncInterceptingMessageProcessor;
import org.mule.processor.AsyncWorkListener;
import org.mule.processor.SedaStageLifecycleManager;
import org.mule.service.Pausable;
import org.mule.service.Resumable;
import org.mule.util.concurrent.WaitableBoolean;
import org.mule.util.queue.Queue;
import org.mule.util.queue.QueueConfiguration;
import org.mule.util.queue.QueueSession;

public class SedaStageInterceptingMessageProcessor
extends AsyncInterceptingMessageProcessor
implements Work,
Lifecycle,
Pausable,
Resumable {
    protected static final String QUEUE_NAME_PREFIX = "seda.queue";
    public static int DEFAULT_QUEUE_SIZE_MAX_THREADS_FACTOR = 4;
    protected QueueProfile queueProfile;
    protected int queueTimeout;
    protected QueueStatistics queueStatistics;
    protected String queueName;
    protected Queue queue;
    protected QueueConfiguration queueConfiguration;
    private WaitableBoolean running = new WaitableBoolean(false);
    protected SedaStageLifecycleManager lifecycleManager;

    public SedaStageInterceptingMessageProcessor(String threadName, String queueName, QueueProfile queueProfile, int queueTimeout, ThreadingProfile threadingProfile, QueueStatistics queueStatistics, MuleContext muleContext) {
        super(threadingProfile, threadName, muleContext.getConfiguration().getShutdownTimeout());
        this.queueName = queueName;
        this.queueProfile = queueProfile;
        this.queueTimeout = queueTimeout;
        this.queueStatistics = queueStatistics;
        this.muleContext = muleContext;
        this.lifecycleManager = new SedaStageLifecycleManager(queueName, this);
        this.configureDefaultQueueSize(queueProfile, threadingProfile);
    }

    protected void configureDefaultQueueSize(QueueProfile queueProfile, ThreadingProfile threadingProfile) {
        if (queueProfile != null && queueProfile.getMaxOutstandingMessages() == 0) {
            queueProfile.setMaxOutstandingMessages(threadingProfile.getMaxThreadsActive() * DEFAULT_QUEUE_SIZE_MAX_THREADS_FACTOR);
        }
    }

    @Override
    protected void processNextAsync(MuleEvent event) throws MuleException {
        try {
            if (this.isStatsEnabled()) {
                this.queueStatistics.incQueuedEvent();
            }
            this.enqueue(event);
        }
        catch (Exception e) {
            throw new FailedToQueueEventException(CoreMessages.interruptedQueuingEventFor(this.getStageDescription()), event, (Throwable)e);
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)("MuleEvent added to queue for: " + this.getStageDescription()));
        }
    }

    protected boolean isStatsEnabled() {
        return this.queueStatistics != null && this.queueStatistics.isEnabled();
    }

    protected void enqueue(MuleEvent event) throws Exception {
        boolean queued;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)MessageFormat.format("{1}: Putting event on queue {2}", this.queue.getName(), this.getStageDescription(), event));
        }
        if (!(queued = this.queue.offer(DefaultMuleEvent.copy(event), this.threadTimeout))) {
            String message = String.format("The queue for '%1s' did not accept new event within %2d %3s", new Object[]{this.getStageDescription(), this.threadTimeout, TimeUnit.MILLISECONDS});
            throw new FailedToQueueEventException(CoreMessages.createStaticMessage(message), event);
        }
        this.fireAsyncScheduledNotification(event);
    }

    protected MuleEvent dequeue() throws Exception {
        MuleEvent event;
        if (this.queue == null) {
            return null;
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)MessageFormat.format("{0}: Polling queue {1}, timeout = {2}", this.getStageName(), this.getStageDescription(), this.queueTimeout));
        }
        if ((event = (MuleEvent)this.queue.poll(this.queueTimeout)) != null && this.lifecycleManager.isPhaseComplete("pause")) {
            this.queue.untake(event);
            return null;
        }
        return event;
    }

    protected void rollbackDequeue(MuleEvent event) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)MessageFormat.format("{1}: Putting event back on queue {2}", this.queue.getName(), this.getStageDescription(), event));
        }
        try {
            this.queue.untake(event);
        }
        catch (Exception e) {
            this.logger.error((Object)e);
        }
    }

    public void run() {
        DefaultMuleEvent event = null;
        QueueSession queueSession = this.muleContext.getQueueManager().getQueueSession();
        this.running.set(true);
        while (!this.lifecycleManager.getState().isStopped()) {
            try {
                if (this.lifecycleManager.isPhaseComplete("pause")) {
                    this.waitIfPaused();
                    if (this.lifecycleManager.getState().isStopping()) {
                        if (this.isQueuePersistent() || queueSession == null || this.getQueueSize() <= 0) break;
                        this.logger.warn((Object)CoreMessages.stopPausedSedaStageNonPeristentQueueMessageLoss(this.getQueueSize(), this.getQueueName()));
                        break;
                    }
                }
                if (this.lifecycleManager.getState().isStopping() && (this.isQueuePersistent() || queueSession == null || this.getQueueSize() <= 0)) break;
                event = (DefaultMuleEvent)this.dequeue();
            }
            catch (InterruptedException ie) {
                break;
            }
            catch (Exception e) {
                this.muleContext.getExceptionListener().handleException(e);
            }
            if (event == null) continue;
            final DefaultMuleEvent eventToProcess = event;
            TransactionalErrorHandlingExecutionTemplate executionTemplate = TransactionalErrorHandlingExecutionTemplate.createMainExecutionTemplate(this.muleContext, event.getFlowConstruct().getExceptionListener());
            ExecutionCallback<MuleEvent> processingCallback = new ExecutionCallback<MuleEvent>(){

                @Override
                public MuleEvent process() throws Exception {
                    if (SedaStageInterceptingMessageProcessor.this.isStatsEnabled()) {
                        SedaStageInterceptingMessageProcessor.this.queueStatistics.decQueuedEvent();
                    }
                    if (SedaStageInterceptingMessageProcessor.this.logger.isDebugEnabled()) {
                        SedaStageInterceptingMessageProcessor.this.logger.debug((Object)MessageFormat.format("{0}: Dequeued event from {1}", SedaStageInterceptingMessageProcessor.this.getStageDescription(), SedaStageInterceptingMessageProcessor.this.getQueueName()));
                    }
                    AsyncInterceptingMessageProcessor.AsyncMessageProcessorWorker work = new AsyncInterceptingMessageProcessor.AsyncMessageProcessorWorker(SedaStageInterceptingMessageProcessor.this, eventToProcess, false);
                    try {
                        SedaStageInterceptingMessageProcessor.this.workManagerSource.getWorkManager().scheduleWork(work, Long.MAX_VALUE, null, new AsyncWorkListener(SedaStageInterceptingMessageProcessor.this.next));
                    }
                    catch (Exception e) {
                        OptimizedRequestContext.unsafeSetEvent(work.getEvent());
                        throw new MessagingException(work.getEvent(), e, (MessageProcessor)SedaStageInterceptingMessageProcessor.this);
                    }
                    return null;
                }
            };
            try {
                executionTemplate.execute((ExecutionCallback)processingCallback);
            }
            catch (MessagingException e) {
            }
            catch (Exception e) {
                this.muleContext.getExceptionListener().handleException(e);
            }
        }
        this.running.set(false);
    }

    protected boolean isQueuePersistent() {
        return this.queueConfiguration == null ? false : this.queueConfiguration.isPersistent();
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    protected String getQueueName() {
        return String.format("%s(%s)", QUEUE_NAME_PREFIX, this.getStageName());
    }

    protected String getStageName() {
        if (this.queueName != null) {
            return this.queueName;
        }
        if (this.next instanceof NameableObject) {
            return ((NameableObject)((Object)this.next)).getName();
        }
        return String.format("%s.%s", this.next.getClass().getName(), this.next.hashCode());
    }

    protected String getStageDescription() {
        return "SEDA Stage " + this.getStageName();
    }

    protected void waitIfPaused() throws InterruptedException {
        if (this.logger.isDebugEnabled() && this.lifecycleManager.isPhaseComplete("pause")) {
            this.logger.debug((Object)(this.getStageDescription() + " is paused. Polling halted until resumed is called"));
        }
        while (this.lifecycleManager.isPhaseComplete("pause") && !this.lifecycleManager.getState().isStopping()) {
            Thread.sleep(50L);
        }
    }

    public void release() {
        this.running.set(false);
    }

    @Override
    public void initialise() throws InitialisationException {
        this.lifecycleManager.fireInitialisePhase(new LifecycleCallback<SedaStageInterceptingMessageProcessor>(){

            @Override
            public void onTransition(String phaseName, SedaStageInterceptingMessageProcessor object) throws MuleException {
                if (SedaStageInterceptingMessageProcessor.this.next == null) {
                    throw new IllegalStateException("Next message processor cannot be null with this InterceptingMessageProcessor");
                }
                SedaStageInterceptingMessageProcessor.this.queueConfiguration = SedaStageInterceptingMessageProcessor.this.queueProfile.configureQueue(SedaStageInterceptingMessageProcessor.this.getMuleContext(), SedaStageInterceptingMessageProcessor.this.getQueueName(), SedaStageInterceptingMessageProcessor.this.muleContext.getQueueManager());
                SedaStageInterceptingMessageProcessor.this.queue = SedaStageInterceptingMessageProcessor.this.muleContext.getQueueManager().getQueueSession().getQueue(SedaStageInterceptingMessageProcessor.this.getQueueName());
                if (SedaStageInterceptingMessageProcessor.this.queue == null) {
                    throw new InitialisationException(MessageFactory.createStaticMessage("Queue not created for " + SedaStageInterceptingMessageProcessor.this.getStageDescription()), (Initialisable)SedaStageInterceptingMessageProcessor.this);
                }
            }
        });
    }

    @Override
    public void start() throws MuleException {
        this.lifecycleManager.fireStartPhase(new LifecycleCallback<SedaStageInterceptingMessageProcessor>(){

            @Override
            public void onTransition(String phaseName, SedaStageInterceptingMessageProcessor object) throws MuleException {
                if (SedaStageInterceptingMessageProcessor.this.queue == null) {
                    throw new IllegalStateException("Not initialised");
                }
                SedaStageInterceptingMessageProcessor.super.start();
                try {
                    SedaStageInterceptingMessageProcessor.this.workManagerSource.getWorkManager().scheduleWork(SedaStageInterceptingMessageProcessor.this, Long.MAX_VALUE, null, new AsyncWorkListener(SedaStageInterceptingMessageProcessor.this.next));
                }
                catch (WorkException e) {
                    throw new LifecycleException(CoreMessages.failedToStart(SedaStageInterceptingMessageProcessor.this.getStageDescription()), e, this);
                }
            }
        });
    }

    @Override
    public void stop() throws MuleException {
        this.lifecycleManager.fireStopPhase(new LifecycleCallback<SedaStageInterceptingMessageProcessor>(){

            @Override
            public void onTransition(String phaseName, SedaStageInterceptingMessageProcessor object) throws MuleException {
                try {
                    SedaStageInterceptingMessageProcessor.this.running.whenFalse(null);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                SedaStageInterceptingMessageProcessor.super.stop();
            }
        });
    }

    @Override
    public void dispose() {
        this.lifecycleManager.fireDisposePhase(new LifecycleCallback<SedaStageInterceptingMessageProcessor>(){

            @Override
            public void onTransition(String phaseName, SedaStageInterceptingMessageProcessor object) throws MuleException {
                SedaStageInterceptingMessageProcessor.this.queue = null;
            }
        });
    }

    @Override
    public void pause() throws MuleException {
        this.lifecycleManager.firePausePhase(new EmptyLifecycleCallback<SedaStageInterceptingMessageProcessor>());
    }

    @Override
    public void resume() throws MuleException {
        this.lifecycleManager.fireResumePhase(new EmptyLifecycleCallback<SedaStageInterceptingMessageProcessor>());
    }
}

