/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.qpid.AMQException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.Acquired;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BasicMessageConsumer_0_10
extends BasicMessageConsumer<UnprocessedMessage_0_10> {
    private final Logger _logger = LoggerFactory.getLogger(this.getClass());
    private AMQSession_0_10 _0_10session;
    private final boolean _preAcquire;
    private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
    private String _consumerTagString;
    private final long _capacity;
    private final boolean _serverJmsSelectorSupport;

    protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession<?, ?> session, Map<String, Object> rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException {
        super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
        this._0_10session = (AMQSession_0_10)session;
        this._serverJmsSelectorSupport = connection.isSupportedServerFeature("qpid.jms-selector");
        this._preAcquire = this.evaluatePreAcquire(browseOnly, destination, this._serverJmsSelectorSupport);
        this._capacity = this.evaluateCapacity(destination);
        if (this._0_10session.isResolved(destination) && 2 == destination.getAddressType()) {
            boolean namedQueue;
            boolean bl = namedQueue = destination.getLink() != null && destination.getLink().getName() != null;
            if (!namedQueue) {
                this.setDestination(destination.copyDestination());
                this.getDestination().setQueueName(null);
            }
        }
    }

    @Override
    public void setConsumerTag(int consumerTag) {
        super.setConsumerTag(consumerTag);
        this._consumerTagString = String.valueOf(consumerTag);
    }

    public String getConsumerTagString() {
        return this._consumerTagString;
    }

    @Override
    public void notifyMessage(AbstractJMSMessage jmsMessage) {
        try {
            if (this.checkPreConditions(jmsMessage)) {
                if (this.isMessageListenerSet() && this._capacity == 0L) {
                    this.messageFlow();
                }
                this._logger.debug("messageOk, trying to notify");
                super.notifyMessage(jmsMessage);
            } else if (this._capacity == 0L) {
                this.messageFlow();
            }
        }
        catch (QpidException e) {
            this._logger.error("Received an Exception when receiving message", (Throwable)e);
            this.getSession().getAMQConnection().closed(e);
        }
    }

    @Override
    void sendCancel() throws QpidException {
        this._0_10session.getQpidSession().messageCancel(this.getConsumerTagString(), new Option[0]);
        this.postSubscription();
        try {
            this._0_10session.getQpidSession().sync();
            this.getSession().confirmConsumerCancelled(this.getConsumerTag());
        }
        catch (SessionException se) {
            this._0_10session.setCurrentException(se);
        }
        QpidException amqe = this._0_10session.getCurrentException();
        if (amqe != null) {
            throw amqe;
        }
    }

    @Override
    void notifyMessage(UnprocessedMessage_0_10 messageFrame) {
        super.notifyMessage(messageFrame);
    }

    @Override
    protected void preDeliver(AbstractJMSMessage jmsMsg) {
        super.preDeliver(jmsMsg);
        if (this.getAcknowledgeMode() == 257) {
            this.getSession().addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
        }
    }

    @Override
    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception {
        AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)this.getSession()).getQpidSession());
        return this.getMessageFactory().createMessage(msg.getMessageTransfer());
    }

    private boolean checkPreConditions(AbstractJMSMessage message) throws QpidException {
        boolean messageOk = true;
        try {
            if (!this._serverJmsSelectorSupport && this.getMessageSelectorFilter() != null) {
                messageOk = this.getMessageSelectorFilter().matches(message);
            }
        }
        catch (Exception e) {
            throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", (Throwable)e);
        }
        if (this._logger.isDebugEnabled()) {
            this._logger.debug("messageOk " + messageOk);
            this._logger.debug("_preAcquire " + this._preAcquire);
        }
        if (!messageOk) {
            if (this._preAcquire) {
                if (this._logger.isDebugEnabled()) {
                    this._logger.debug("filterMessage - trying to ack message");
                }
                this.acknowledgeMessage(message);
            } else {
                if (this._logger.isDebugEnabled()) {
                    this._logger.debug("filterMessage - not ack'ing message as not acquired");
                }
                this.flushUnwantedMessage(message);
            }
        } else if (!this._preAcquire && !this.isBrowseOnly()) {
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("filterMessage - trying to acquire message");
            }
            messageOk = this.acquireMessage(message);
            this._logger.debug("filterMessage - message acquire status : " + messageOk);
        }
        return messageOk;
    }

    private void acknowledgeMessage(AbstractJMSMessage message) throws QpidException {
        this._0_10session.messageAcknowledge((RangeSet)Range.newInstance((int)((int)message.getDeliveryTag())), this.getAcknowledgeMode() != 257);
        QpidException amqe = this._0_10session.getCurrentException();
        if (amqe != null) {
            throw amqe;
        }
    }

    private void flushUnwantedMessage(AbstractJMSMessage message) throws QpidException {
        this._0_10session.flushProcessed((RangeSet)Range.newInstance((int)((int)message.getDeliveryTag())), false);
        QpidException amqe = this._0_10session.getCurrentException();
        if (amqe != null) {
            throw amqe;
        }
    }

    private boolean acquireMessage(AbstractJMSMessage message) throws QpidException {
        boolean result = false;
        Acquired acq = (Acquired)this._0_10session.getQpidSession().messageAcquire((RangeSet)Range.newInstance((int)((int)message.getDeliveryTag())), new Option[0]).get();
        RangeSet acquired = acq.getTransfers();
        if (acquired != null && acquired.size() > 0) {
            result = true;
        }
        return result;
    }

    private void messageFlow() {
        this._0_10session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, new Option[]{Option.UNRELIABLE});
    }

    @Override
    public void setMessageListener(MessageListener messageListener) throws JMSException {
        super.setMessageListener(messageListener);
        try {
            if (messageListener != null && this._capacity == 0L) {
                this.messageFlow();
            }
            if (messageListener != null && !this.getSynchronousQueue().isEmpty()) {
                Iterator messages = this.getSynchronousQueue().iterator();
                while (messages.hasNext()) {
                    AbstractJMSMessage message = (AbstractJMSMessage)messages.next();
                    messages.remove();
                    this.getSession().rejectMessage(message, true);
                }
            }
        }
        catch (TransportException e) {
            throw this.getSession().toJMSException("Exception while setting message listener:" + e.getMessage(), e);
        }
    }

    @Override
    public void failedOverPost() {
        if (this._0_10session.isStarted() && this._syncReceive.get()) {
            this.messageFlow();
        }
    }

    @Override
    public Object getMessageFromQueue(long l) throws InterruptedException {
        Object o;
        if (this._capacity == 0L) {
            this._syncReceive.set(true);
        }
        if (this._0_10session.isStarted() && this.isMessageListenerSet() && this._capacity == 0L && this.getSynchronousQueue().isEmpty()) {
            this.messageFlow();
        }
        if ((o = super.getMessageFromQueue(l)) == null && this._0_10session.isStarted()) {
            this._0_10session.getQpidSession().messageFlush(this.getConsumerTagString(), new Option[]{Option.UNRELIABLE, Option.SYNC});
            this._0_10session.getQpidSession().sync();
            this._0_10session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.BYTE, -1L, new Option[]{Option.UNRELIABLE});
            if (this._capacity > 0L) {
                this._0_10session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, this._capacity, new Option[]{Option.UNRELIABLE});
            }
            this._0_10session.syncDispatchQueue(false);
            o = super.getMessageFromQueue(-1L);
        }
        if (this._capacity == 0L) {
            this._syncReceive.set(false);
        }
        return o;
    }

    @Override
    void postDeliver(AbstractJMSMessage msg) {
        super.postDeliver(msg);
        switch (this.getAcknowledgeMode()) {
            case 0: {
                this._0_10session.sendTxCompletionsIfNecessary();
                break;
            }
            case 257: {
                if (this.getSession().isInRecovery()) break;
                this.getSession().acknowledgeMessage(msg.getDeliveryTag(), false);
                break;
            }
            case 1: {
                if (this.getSession().isInRecovery() || !this.getSession().getAMQConnection().getSyncAck()) break;
                ((AMQSession_0_10)this.getSession()).getQpidSession().sync();
            }
        }
    }

    @Override
    Message receiveBrowse() throws JMSException {
        return this.receiveNoWait();
    }

    @Override
    void releasePendingMessages() {
        if (this.getSynchronousQueue().size() > 0) {
            RangeSet ranges = RangeSetFactory.createRangeSet();
            Iterator iterator = this.getSynchronousQueue().iterator();
            while (iterator.hasNext()) {
                Object o = iterator.next();
                if (o instanceof AbstractJMSMessage) {
                    ranges.add((int)((AbstractJMSMessage)o).getDeliveryTag());
                    iterator.remove();
                    continue;
                }
                this._logger.error("Queue contained a :" + o.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
                iterator.remove();
            }
            this._0_10session.flushProcessed(ranges, false);
            this._0_10session.getQpidSession().messageRelease(ranges, new Option[0]);
            this.clearReceiveQueue();
        }
    }

    void postSubscription() throws QpidException {
        AMQDestination dest = this.getDestination();
        if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) {
            if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS || dest.getDelete() == AMQDestination.AddressOption.RECEIVER) {
                this.getSession().handleNodeDelete(dest);
            }
            this.getSession().handleLinkDelete(dest);
            if (!this.isDurableSubscriber()) {
                ((AMQSession_0_10)this.getSession()).deleteSubscriptionQueue(dest);
            }
        }
    }

    long getCapacity() {
        return this._capacity;
    }

    boolean isPreAcquire() {
        return this._preAcquire;
    }

    private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination, boolean serverJmsSelectorSupport) {
        boolean preAcquire;
        if (browseOnly) {
            preAcquire = false;
        } else {
            boolean isQueue = destination instanceof AMQQueue || this.getDestination().getAddressType() == 1;
            preAcquire = serverJmsSelectorSupport || !isQueue || this.getMessageSelectorFilter() == null;
        }
        return preAcquire;
    }

    private long evaluateCapacity(AMQDestination destination) {
        long capacity = 0L;
        if (destination.getLink() != null && destination.getLink().getConsumerCapacity() >= 0) {
            capacity = destination.getLink().getConsumerCapacity();
        } else if (this.getSession().prefetch()) {
            capacity = this.getSession().getPrefetch();
        }
        return capacity;
    }

    @Override
    public Message receive(long l) throws JMSException {
        long capacity = this.getCapacity();
        try {
            Message message;
            AMQSession_0_10 session = (AMQSession_0_10)this.getSession();
            if (capacity == 0L && this.getMessageListener() == null) {
                session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, new Option[]{Option.UNRELIABLE});
                session.sync();
            }
            if ((message = super.receive(l)) == null && capacity == 0L && this.getMessageListener() == null) {
                session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, 0L, new Option[]{Option.UNRELIABLE});
                session.sync();
                message = super.receiveNoWait();
            }
            return message;
        }
        catch (QpidException e) {
            throw JMSExceptionHelper.chainJMSException(new JMSException("BasicMessageConsumer.receive failed"), e);
        }
    }

    @Override
    public Message receiveNoWait() throws JMSException {
        long capacity = this.getCapacity();
        try {
            Message message;
            AMQSession_0_10 session = (AMQSession_0_10)this.getSession();
            if (capacity == 0L && this.getMessageListener() == null) {
                session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, new Option[]{Option.UNRELIABLE});
                session.sync();
            }
            if ((message = super.receiveNoWait()) == null && capacity == 0L && this.getMessageListener() == null) {
                session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, 0L, new Option[]{Option.UNRELIABLE});
                session.sync();
                message = super.receiveNoWait();
            }
            return message;
        }
        catch (QpidException e) {
            throw JMSExceptionHelper.chainJMSException(new JMSException("BasicMessageConsumer.receiveNoWait failed."), e);
        }
    }
}

