/*
 * Decompiled with CFR 0.152.
 */
package org.mule.routing.requestreply;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.buffer.BoundedFifoBuffer;
import org.mule.OptimizedRequestContext;
import org.mule.api.MessagingException;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.MuleMessageCollection;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.construct.FlowConstructAware;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.processor.RequestReplyRequesterMessageProcessor;
import org.mule.api.routing.ResponseTimeoutException;
import org.mule.api.source.MessageSource;
import org.mule.config.i18n.CoreMessages;
import org.mule.context.notification.RoutingNotification;
import org.mule.processor.AbstractInterceptingMessageProcessor;
import org.mule.util.ObjectUtils;
import org.mule.util.concurrent.Latch;

public abstract class AbstractAsyncRequestReplyRequester
extends AbstractInterceptingMessageProcessor
implements RequestReplyRequesterMessageProcessor,
FlowConstructAware {
    public static final int MAX_PROCESSED_GROUPS = 50000;
    protected volatile long timeout = -1L;
    protected volatile boolean failOnTimeout = true;
    protected MessageSource replyMessageSource;
    protected FlowConstruct flowConstruct;
    private final MessageProcessor internalAsyncReplyMessageProcessor = new InternalAsyncReplyMessageProcessor();
    protected final Map<String, Latch> locks = new ConcurrentHashMap<String, Latch>();
    protected final ConcurrentMap responseEvents = new ConcurrentHashMap();
    protected final Object processedLock = new Object();
    protected final BoundedFifoBuffer processed = new BoundedFifoBuffer(50000);

    @Override
    public MuleEvent process(MuleEvent event) throws MuleException {
        if (this.replyMessageSource == null) {
            return this.processNext(event);
        }
        this.locks.put(this.getAsyncReplyCorrelationId(event), new Latch());
        this.sendAsyncRequest(event);
        return this.receiveAsyncReply(event);
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public void setFailOnTimeout(boolean failOnTimeout) {
        this.failOnTimeout = failOnTimeout;
    }

    @Override
    public void setReplySource(MessageSource messageSource) {
        this.verifyReplyMessageSource(messageSource);
        this.replyMessageSource = messageSource;
        messageSource.setListener(this.internalAsyncReplyMessageProcessor);
    }

    protected void verifyReplyMessageSource(MessageSource messageSource) {
    }

    protected String getAsyncReplyCorrelationId(MuleEvent event) {
        if (event.getMessage() instanceof MuleMessageCollection) {
            return event.getMessage().getCorrelationId();
        }
        return event.getFlowConstruct().getMessageInfoMapping().getCorrelationId(event.getMessage());
    }

    protected void sendAsyncRequest(MuleEvent event) throws MuleException {
        this.processNext(event);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected MuleEvent receiveAsyncReply(MuleEvent event) throws MessagingException {
        String asyncReplyCorrelationId = this.getAsyncReplyCorrelationId(event);
        Latch asyncReplyLatch = this.locks.get(asyncReplyCorrelationId);
        boolean interruptedWhileWaiting = false;
        boolean resultAvailable = false;
        MuleEvent result = null;
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Waiting for async reply message with id: " + asyncReplyCorrelationId));
            }
            if (this.timeout <= 0L) {
                asyncReplyLatch.await();
                resultAvailable = true;
            } else {
                resultAvailable = asyncReplyLatch.await(this.timeout, TimeUnit.MILLISECONDS);
            }
            if (!resultAvailable) {
                this.postLatchAwait(asyncReplyCorrelationId);
                resultAvailable = asyncReplyLatch.getCount() == 0L;
            }
        }
        catch (InterruptedException e) {
            interruptedWhileWaiting = true;
        }
        finally {
            this.locks.remove(asyncReplyCorrelationId);
            result = (MuleEvent)this.responseEvents.remove(asyncReplyCorrelationId);
            if (interruptedWhileWaiting) {
                Thread.currentThread().interrupt();
            }
        }
        if (interruptedWhileWaiting) {
            Thread.currentThread().interrupt();
        }
        if (resultAvailable) {
            if (result == null) {
                throw new IllegalStateException("Response MuleEvent is null");
            }
            return OptimizedRequestContext.criticalSetEvent(result);
        }
        this.addProcessed(asyncReplyCorrelationId);
        if (this.failOnTimeout) {
            event.getMuleContext().fireNotification(new RoutingNotification(event.getMessage(), null, 1302));
            throw new ResponseTimeoutException(CoreMessages.responseTimedOutWaitingForId((int)this.timeout, asyncReplyCorrelationId), event, null);
        }
        return null;
    }

    protected void postLatchAwait(String asyncReplyCorrelationId) throws MessagingException {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addProcessed(Object id) {
        Object object = this.processedLock;
        synchronized (object) {
            if (this.processed.isFull()) {
                this.processed.remove();
            }
            this.processed.add(id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean isAlreadyProcessed(Object id) {
        Object object = this.processedLock;
        synchronized (object) {
            return this.processed.contains(id);
        }
    }

    @Override
    public String toString() {
        return ObjectUtils.toString(this);
    }

    @Override
    public void setFlowConstruct(FlowConstruct flowConstruct) {
        this.flowConstruct = flowConstruct;
    }

    class InternalAsyncReplyMessageProcessor
    implements MessageProcessor {
        InternalAsyncReplyMessageProcessor() {
        }

        @Override
        public MuleEvent process(MuleEvent event) throws MuleException {
            String messageId = AbstractAsyncRequestReplyRequester.this.getAsyncReplyCorrelationId(event);
            if (AbstractAsyncRequestReplyRequester.this.isAlreadyProcessed(messageId)) {
                if (AbstractAsyncRequestReplyRequester.this.logger.isDebugEnabled()) {
                    AbstractAsyncRequestReplyRequester.this.logger.debug((Object)("An event was received for an event group that has already been processed, this is probably because the async-reply timed out. Correlation Id is: " + messageId + ". Dropping event"));
                }
                event.getMuleContext().fireNotification(new RoutingNotification(event.getMessage(), event.getEndpoint().getEndpointURI().toString(), 1301));
                return null;
            }
            AbstractAsyncRequestReplyRequester.this.addProcessed(messageId);
            MuleEvent previousResult = AbstractAsyncRequestReplyRequester.this.responseEvents.putIfAbsent(messageId, event);
            if (previousResult != null) {
                throw new IllegalStateException("Detected duplicate result message with id: " + messageId);
            }
            Latch l = AbstractAsyncRequestReplyRequester.this.locks.get(messageId);
            if (l != null) {
                l.countDown();
            } else {
                AbstractAsyncRequestReplyRequester.this.logger.warn((Object)("Unexpected  message with id " + messageId + " received.   This message will be discarded."));
            }
            return null;
        }
    }
}

