/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.jms.server.endpoint;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.delegate.BrowserDelegate;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.selector.Selector;
import org.jboss.jms.server.DestinationManager;
import org.jboss.jms.server.JMSCondition;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.destination.ManagedDestination;
import org.jboss.jms.server.destination.ManagedQueue;
import org.jboss.jms.server.destination.ManagedTopic;
import org.jboss.jms.server.endpoint.Ack;
import org.jboss.jms.server.endpoint.Cancel;
import org.jboss.jms.server.endpoint.DeliveryRecovery;
import org.jboss.jms.server.endpoint.ServerBrowserEndpoint;
import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
import org.jboss.jms.server.endpoint.ServerConsumerEndpoint;
import org.jboss.jms.server.endpoint.SessionEndpoint;
import org.jboss.jms.server.endpoint.advised.AdvisedSupport;
import org.jboss.jms.server.endpoint.advised.BrowserAdvised;
import org.jboss.jms.server.endpoint.advised.ConsumerAdvised;
import org.jboss.jms.server.messagecounter.MessageCounter;
import org.jboss.jms.util.ExceptionUtil;
import org.jboss.jms.util.MessageQueueNameHelper;
import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Channel;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.message.MessageReference;
import org.jboss.messaging.core.plugin.IDManager;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.util.id.GUID;

public class ServerSessionEndpoint
implements SessionEndpoint {
    private static final Logger log = Logger.getLogger(ServerSessionEndpoint.class);
    static final String TEMP_QUEUE_MESSAGECOUNTER_PREFIX = "TempQueue.";
    public static final String JBOSS_MESSAGING_ORIG_DESTINATION = "JBM_ORIG_DESTINATION";
    public static final String JBOSS_MESSAGING_ORIG_MESSAGE_ID = "JBM_ORIG_MESSAGE_ID";
    public static final String JBOSS_MESSAGING_ACTUAL_EXPIRY_TIME = "JBM_ACTUAL_EXPIRY";
    private boolean trace = log.isTraceEnabled();
    private int id;
    private volatile boolean closed;
    private ServerConnectionEndpoint connectionEndpoint;
    private ServerPeer sp;
    private Map consumers;
    private Map browsers;
    private PersistenceManager pm;
    private MessageStore ms;
    private DestinationManager dm;
    private IDManager idm;
    private TransactionRepository tr;
    private PostOffice postOffice;
    private int nodeId;
    private int maxDeliveryAttempts;
    private Queue defaultDLQ;
    private Queue defaultExpiryQueue;
    private Map deliveries;
    private SynchronizedLong deliveryIdSequence;
    QueuedExecutor executor = new QueuedExecutor();

    ServerSessionEndpoint(int sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception {
        this.id = sessionID;
        this.connectionEndpoint = connectionEndpoint;
        this.sp = connectionEndpoint.getServerPeer();
        this.pm = this.sp.getPersistenceManagerInstance();
        this.ms = this.sp.getMessageStore();
        this.dm = this.sp.getDestinationManager();
        this.postOffice = this.sp.getPostOfficeInstance();
        this.idm = this.sp.getChannelIDManager();
        this.nodeId = this.sp.getServerPeerID();
        this.tr = this.sp.getTxRepository();
        this.consumers = new HashMap();
        this.browsers = new HashMap();
        this.defaultDLQ = this.sp.getDefaultDLQInstance();
        this.defaultExpiryQueue = this.sp.getDefaultExpiryQueueInstance();
        this.tr = this.sp.getTxRepository();
        this.maxDeliveryAttempts = this.sp.getDefaultMaxDeliveryAttempts();
        this.deliveries = new ConcurrentHashMap();
        this.deliveryIdSequence = new SynchronizedLong(0L);
    }

    public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination, String selector, boolean noLocal, String subscriptionName, boolean isCC) throws JMSException {
        try {
            return this.createConsumerDelegateInternal(jmsDestination, selector, noLocal, subscriptionName);
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " createConsumerDelegate");
        }
    }

    public BrowserDelegate createBrowserDelegate(JBossDestination jmsDestination, String selector) throws JMSException {
        try {
            return this.createBrowserDelegateInternal(jmsDestination, selector);
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " createBrowserDelegate");
        }
    }

    public JBossQueue createQueue(String name) throws JMSException {
        try {
            if (this.closed) {
                throw new IllegalStateException("Session is closed");
            }
            ManagedDestination dest = this.dm.getDestination(name, true);
            if (dest == null) {
                throw new JMSException("There is no administratively defined queue with name:" + name);
            }
            return new JBossQueue(dest.getName());
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " createQueue");
        }
    }

    public JBossTopic createTopic(String name) throws JMSException {
        try {
            if (this.closed) {
                throw new IllegalStateException("Session is closed");
            }
            ManagedDestination dest = this.dm.getDestination(name, false);
            if (dest == null) {
                throw new JMSException("There is no administratively defined topic with name:" + name);
            }
            return new JBossTopic(name);
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " createTopic");
        }
    }

    public void close() throws JMSException {
        try {
            this.localClose();
            this.connectionEndpoint.removeSession(this.id);
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " close");
        }
    }

    public long closing() throws JMSException {
        if (this.trace) {
            log.trace(this + " closing (noop)");
        }
        return -1L;
    }

    public void send(JBossMessage message, boolean checkForDuplicates) throws JMSException {
        try {
            this.connectionEndpoint.sendMessage(message, null, checkForDuplicates);
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " send");
        }
    }

    public void acknowledgeDelivery(Ack ack) throws JMSException {
        try {
            this.acknowledgeDeliveryInternal(ack);
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledgeDelivery");
        }
    }

    public void acknowledgeDeliveries(List acks) throws JMSException {
        if (this.trace) {
            log.trace(this + " acknowledges deliveries " + acks);
        }
        try {
            Iterator iter = acks.iterator();
            while (iter.hasNext()) {
                Ack ack = (Ack)iter.next();
                this.acknowledgeDeliveryInternal(ack);
            }
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledgeDeliveries");
        }
    }

    public void cancelDelivery(Cancel cancel) throws JMSException {
        if (this.trace) {
            log.trace(this + " cancelDelivery " + cancel);
        }
        try {
            Delivery del = this.cancelDeliveryInternal(cancel);
            this.promptDelivery((Channel)del.getObserver());
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDelivery");
        }
    }

    public void cancelDeliveries(List cancels) throws JMSException {
        if (this.trace) {
            log.trace(this + " cancels deliveries " + cancels);
        }
        try {
            HashSet<DeliveryObserver> channels = new HashSet<DeliveryObserver>();
            for (int i = cancels.size() - 1; i >= 0; --i) {
                Cancel cancel = (Cancel)cancels.get(i);
                if (this.trace) {
                    log.trace(this + " cancelling delivery " + cancel.getDeliveryId());
                }
                Delivery del = this.cancelDeliveryInternal(cancel);
                channels.add(del.getObserver());
            }
            this.promptDelivery(channels);
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDeliveries");
        }
    }

    public void recoverDeliveries(List deliveryRecoveryInfos) throws JMSException {
        if (this.trace) {
            log.trace(this + "recovers deliveries " + deliveryRecoveryInfos);
        }
        try {
            String queueName;
            if (this.postOffice.isLocal()) {
                throw new IllegalStateException("Recovering deliveries but post office is not clustered!");
            }
            long maxDeliveryId = 0L;
            HashMap<String, ArrayList<DeliveryRecovery>> ackMap = new HashMap<String, ArrayList<DeliveryRecovery>>();
            Iterator<Object> iter = deliveryRecoveryInfos.iterator();
            while (iter.hasNext()) {
                DeliveryRecovery deliveryInfo = (DeliveryRecovery)iter.next();
                queueName = deliveryInfo.getQueueName();
                ArrayList<DeliveryRecovery> acks = (ArrayList<DeliveryRecovery>)ackMap.get(queueName);
                if (acks == null) {
                    acks = new ArrayList<DeliveryRecovery>();
                    ackMap.put(queueName, acks);
                }
                acks.add(deliveryInfo);
            }
            iter = ackMap.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry entry = (Map.Entry)iter.next();
                queueName = (String)entry.getKey();
                Binding binding = this.postOffice.getBindingForQueueName(queueName);
                if (binding == null) {
                    throw new IllegalStateException("Cannot find channel with queue name: " + queueName);
                }
                List acks = (List)entry.getValue();
                ArrayList<Long> ids = new ArrayList<Long>(acks.size());
                Iterator iter2 = acks.iterator();
                while (iter2.hasNext()) {
                    DeliveryRecovery info = (DeliveryRecovery)iter2.next();
                    ids.add(new Long(info.getMessageID()));
                }
                Queue queue = binding.getQueue();
                JMSCondition cond = (JMSCondition)binding.getCondition();
                ManagedDestination dest = this.sp.getDestinationManager().getDestination(cond.getName(), cond.isQueue());
                if (dest == null) {
                    throw new IllegalStateException("Cannot find managed destination with name " + cond.getName() + " isQueue" + cond.isQueue());
                }
                Queue dlqToUse = dest.getDLQ() == null ? this.defaultDLQ : dest.getDLQ();
                Queue expiryQueueToUse = dest.getExpiryQueue() == null ? this.defaultExpiryQueue : dest.getExpiryQueue();
                List dels = queue.recoverDeliveries(ids);
                Iterator iter22 = dels.iterator();
                Iterator iter3 = acks.iterator();
                while (iter22.hasNext()) {
                    Delivery del = (Delivery)iter22.next();
                    DeliveryRecovery info = (DeliveryRecovery)iter3.next();
                    long deliveryId = info.getDeliveryID();
                    maxDeliveryId = Math.max(maxDeliveryId, deliveryId);
                    if (this.trace) {
                        log.trace(this + " Recovered delivery " + deliveryId + ", " + del);
                    }
                    this.deliveries.put(new Long(deliveryId), new DeliveryRecord(del, -1, dlqToUse, expiryQueueToUse, dest.getRedeliveryDelay()));
                }
            }
            this.deliveryIdSequence = new SynchronizedLong(maxDeliveryId + 1L);
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " recoverDeliveries");
        }
    }

    public void addTemporaryDestination(JBossDestination dest) throws JMSException {
        try {
            if (this.closed) {
                throw new IllegalStateException("Session is closed");
            }
            if (!dest.isTemporary()) {
                throw new InvalidDestinationException("Destination:" + dest + " is not a temporary destination");
            }
            this.connectionEndpoint.addTemporaryDestination(dest);
            int fullSize = this.connectionEndpoint.getDefaultTempQueueFullSize();
            int pageSize = this.connectionEndpoint.getDefaultTempQueuePageSize();
            int downCacheSize = this.connectionEndpoint.getDefaultTempQueueDownCacheSize();
            ManagedDestination mDest = dest.isTopic() ? new ManagedTopic(dest.getName(), fullSize, pageSize, downCacheSize) : new ManagedQueue(dest.getName(), fullSize, pageSize, downCacheSize);
            this.dm.registerDestination(mDest);
            if (dest.isQueue()) {
                PagingFilteredQueue coreQueue = new PagingFilteredQueue(dest.getName(), this.idm.getID(), this.ms, this.pm, true, false, -1, null, fullSize, pageSize, downCacheSize);
                String counterName = TEMP_QUEUE_MESSAGECOUNTER_PREFIX + dest.getName();
                MessageCounter counter = new MessageCounter(counterName, null, coreQueue, false, false, this.sp.getDefaultMessageCounterHistoryDayLimit());
                this.sp.getMessageCounterManager().registerMessageCounter(counterName, counter);
                this.postOffice.bindQueue(new JMSCondition(true, dest.getName()), coreQueue);
            }
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " addTemporaryDestination");
        }
    }

    public void deleteTemporaryDestination(JBossDestination dest) throws JMSException {
        try {
            if (this.closed) {
                throw new IllegalStateException("Session is closed");
            }
            if (!dest.isTemporary()) {
                throw new InvalidDestinationException("Destination:" + dest + " is not a temporary destination");
            }
            ManagedDestination mDest = this.dm.getDestination(dest.getName(), dest.isQueue());
            if (mDest == null) {
                throw new InvalidDestinationException("No such destination: " + dest);
            }
            if (dest.isQueue()) {
                this.postOffice.unbindQueue(dest.getName());
                String counterName = TEMP_QUEUE_MESSAGECOUNTER_PREFIX + dest.getName();
                this.connectionEndpoint.removeTemporaryDestination(dest);
                MessageCounter counter = this.sp.getMessageCounterManager().unregisterMessageCounter(counterName);
                if (counter == null) {
                    throw new IllegalStateException("Cannot find counter to unregister " + counterName);
                }
            } else {
                Collection bindings = this.postOffice.getBindingsForCondition(new JMSCondition(false, dest.getName()));
                if (!bindings.isEmpty()) {
                    throw new IllegalStateException("Cannot delete temporary destination, since it has active consumer(s)");
                }
            }
            this.dm.unregisterDestination(mDest);
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " deleteTemporaryDestination");
        }
    }

    public void unsubscribe(String subscriptionName) throws JMSException {
        log.debug(this + " unsubscribing " + subscriptionName);
        try {
            if (this.closed) {
                throw new IllegalStateException("Session is closed");
            }
            if (subscriptionName == null) {
                throw new InvalidDestinationException("Destination is null");
            }
            String clientID = this.connectionEndpoint.getClientID();
            if (clientID == null) {
                throw new JMSException("null clientID on connection");
            }
            String queueName = MessageQueueNameHelper.createSubscriptionName(clientID, subscriptionName);
            Binding binding = this.postOffice.getBindingForQueueName(queueName);
            if (binding == null) {
                throw new InvalidDestinationException("Cannot find durable subscription with name " + subscriptionName + " to unsubscribe");
            }
            Queue sub = binding.getQueue();
            if (sub.getNumberOfReceivers() != 0) {
                throw new IllegalStateException("Cannot unsubscribe durable subscription " + subscriptionName + " since it has active subscribers");
            }
            JMSCondition topicCond = (JMSCondition)binding.getCondition();
            String topicName = topicCond.getName();
            ManagedDestination mDest = this.dm.getDestination(topicName, false);
            if (mDest.isClustered() && !this.postOffice.isLocal()) {
                ClusteredPostOffice cpo = (ClusteredPostOffice)this.postOffice;
                cpo.unbindClusteredQueue(queueName);
            } else {
                this.postOffice.unbindQueue(queueName);
            }
            String counterName = "Subscription." + sub.getName();
            MessageCounter counter = this.sp.getMessageCounterManager().unregisterMessageCounter(counterName);
            if (counter == null) {
                throw new IllegalStateException("Cannot find counter to remove " + counterName);
            }
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " unsubscribe");
        }
    }

    public ServerConnectionEndpoint getConnectionEndpoint() {
        return this.connectionEndpoint;
    }

    public String toString() {
        return "SessionEndpoint[" + this.id + "]";
    }

    void expireDelivery(Delivery del, Queue expiryQueue) throws Throwable {
        if (this.trace) {
            log.trace(this + " detected expired message " + del.getReference());
        }
        if (expiryQueue != null) {
            if (this.trace) {
                log.trace(this + " sending expired message to expiry queue " + expiryQueue);
            }
            JBossMessage copy = this.makeCopyForDLQOrExpiry(true, del);
            this.moveInTransaction(copy, del, expiryQueue);
        } else {
            log.warn("No expiry queue has been configured so removing expired " + del.getReference());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeBrowser(int browserId) throws Exception {
        Map map = this.browsers;
        synchronized (map) {
            if (this.browsers.remove(new Integer(browserId)) == null) {
                throw new IllegalStateException("Cannot find browser with id " + browserId + " to remove");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeConsumer(int consumerId) throws Exception {
        Map map = this.consumers;
        synchronized (map) {
            if (this.consumers.remove(new Integer(consumerId)) == null) {
                throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void localClose() throws Throwable {
        HashMap browsersClone;
        HashMap consumersClone;
        if (this.closed) {
            throw new IllegalStateException("Session is already closed");
        }
        if (this.trace) {
            log.trace(this + " close()");
        }
        Map map = this.consumers;
        synchronized (map) {
            consumersClone = new HashMap(this.consumers);
        }
        Iterator i = consumersClone.values().iterator();
        while (i.hasNext()) {
            ((ServerConsumerEndpoint)i.next()).localClose();
        }
        this.consumers.clear();
        Map map2 = this.browsers;
        synchronized (map2) {
            browsersClone = new HashMap(this.browsers);
        }
        Iterator i2 = browsersClone.values().iterator();
        while (i2.hasNext()) {
            ((ServerBrowserEndpoint)i2.next()).localClose();
        }
        this.browsers.clear();
        ArrayList entries = new ArrayList(this.deliveries.entrySet());
        Collections.sort(entries, new Comparator(){

            public int compare(Object obj1, Object obj2) {
                Map.Entry entry1 = (Map.Entry)obj1;
                Map.Entry entry2 = (Map.Entry)obj2;
                Long id1 = (Long)entry1.getKey();
                Long id2 = (Long)entry2.getKey();
                return id2.compareTo(id1);
            }
        });
        Iterator iter = entries.iterator();
        HashSet<DeliveryObserver> channels = new HashSet<DeliveryObserver>();
        if (this.trace) {
            log.trace(this + " cancelling " + entries.size() + " deliveries");
        }
        while (iter.hasNext()) {
            Map.Entry entry = (Map.Entry)iter.next();
            if (this.trace) {
                log.trace(this + " cancelling delivery with delivery id: " + entry.getKey());
            }
            DeliveryRecord rec = (DeliveryRecord)entry.getValue();
            rec.del.cancel();
            channels.add(rec.del.getObserver());
        }
        this.promptDelivery(channels);
        this.executor.shutdownAfterProcessingCurrentTask();
        this.deliveries.clear();
        this.sp.removeSession(new Integer(this.id));
        Dispatcher.instance.unregisterTarget(this.id, (Object)this);
        this.closed = true;
    }

    void cancelDelivery(long deliveryId) throws Throwable {
        DeliveryRecord rec = (DeliveryRecord)this.deliveries.remove(new Long(deliveryId));
        if (rec == null) {
            throw new IllegalStateException("Cannot find delivery to cancel " + deliveryId);
        }
        rec.del.cancel();
    }

    long addDelivery(Delivery del, int consumerId, Queue dlq, Queue expiryQueue, long redeliveryDelay) {
        long deliveryId = this.deliveryIdSequence.increment();
        this.deliveries.put(new Long(deliveryId), new DeliveryRecord(del, consumerId, dlq, expiryQueue, redeliveryDelay));
        if (this.trace) {
            log.trace(this + " added delivery " + deliveryId + ": " + del);
        }
        return deliveryId;
    }

    void acknowledgeTransactionally(List acks, Transaction tx) throws Throwable {
        DeliveryCallback deliveryCallback;
        if (this.trace) {
            log.trace(this + " acknowledging transactionally " + acks.size() + " messages for " + tx);
        }
        if ((deliveryCallback = (DeliveryCallback)tx.getCallback(this)) == null) {
            deliveryCallback = new DeliveryCallback();
            tx.addCallback(deliveryCallback, this);
        }
        Iterator i = acks.iterator();
        while (i.hasNext()) {
            Ack ack = (Ack)i.next();
            Long id = new Long(ack.getDeliveryID());
            DeliveryRecord rec = (DeliveryRecord)this.deliveries.get(id);
            if (rec == null) {
                log.warn("Cannot find delivery to acknowledge " + ack);
                continue;
            }
            deliveryCallback.addDeliveryId(id);
            rec.del.acknowledge(tx);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setStarted(boolean s) throws Throwable {
        HashMap consumersClone;
        Map map = this.consumers;
        synchronized (map) {
            consumersClone = new HashMap(this.consumers);
        }
        Iterator i = consumersClone.values().iterator();
        while (i.hasNext()) {
            ServerConsumerEndpoint sce = (ServerConsumerEndpoint)i.next();
            if (s) {
                sce.start();
                continue;
            }
            sce.stop();
        }
    }

    void promptDelivery(final Channel channel) {
        try {
            this.executor.execute(new Runnable(){

                public void run() {
                    channel.deliver();
                }
            });
        }
        catch (Throwable t) {
            log.error("Failed to prompt delivery", t);
        }
    }

    private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable {
        DeliveryRecord rec = (DeliveryRecord)this.deliveries.remove(new Long(cancel.getDeliveryId()));
        if (rec == null) {
            throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
        }
        boolean expired = cancel.isExpired() || rec.del.getReference().getMessage().isExpired();
        boolean reachedMaxDeliveryAttempts = cancel.isReachedMaxDeliveryAttempts() || cancel.getDeliveryCount() >= this.maxDeliveryAttempts;
        Delivery del = rec.del;
        if (!expired && !reachedMaxDeliveryAttempts) {
            del.getReference().setDeliveryCount(cancel.getDeliveryCount());
            if (rec.redeliveryDelay != 0L) {
                del.getReference().setScheduledDeliveryTime(System.currentTimeMillis() + rec.redeliveryDelay);
            }
            del.cancel();
        } else if (expired) {
            JBossMessage copy = this.makeCopyForDLQOrExpiry(true, del);
            this.moveInTransaction(copy, del, rec.expiryQueue);
        } else {
            JBossMessage copy = this.makeCopyForDLQOrExpiry(false, del);
            this.moveInTransaction(copy, del, rec.dlq);
        }
        return rec.del;
    }

    private JBossMessage makeCopyForDLQOrExpiry(boolean expiry, Delivery del) throws Exception {
        JBossMessage msg = (JBossMessage)del.getReference().getMessage();
        JBossMessage copy = msg.doCopy();
        long newMessageId = this.sp.getMessageIDManager().getID();
        copy.setMessageId(newMessageId);
        copy.setExpiration(0L);
        String origMessageId = msg.getJMSMessageID();
        String origDest = msg.getJMSDestination().toString();
        copy.setStringProperty(JBOSS_MESSAGING_ORIG_MESSAGE_ID, origMessageId);
        copy.setStringProperty(JBOSS_MESSAGING_ORIG_DESTINATION, origDest);
        if (expiry) {
            long actualExpiryTime = System.currentTimeMillis();
            copy.setLongProperty(JBOSS_MESSAGING_ACTUAL_EXPIRY_TIME, actualExpiryTime);
        }
        return copy;
    }

    private void moveInTransaction(JBossMessage msg, Delivery del, Queue queue) throws Throwable {
        Transaction tx = this.tr.createTransaction();
        MessageReference ref = this.ms.reference(msg);
        try {
            if (queue != null) {
                queue.handle(null, ref, tx);
                del.acknowledge(tx);
            } else {
                log.warn("Cannot move to destination since destination has not been deployed! The message will be removed");
                del.acknowledge(tx);
            }
            tx.commit();
        }
        catch (Throwable t) {
            tx.rollback();
            throw t;
        }
        finally {
            if (ref != null) {
                ref.releaseMemoryReference();
            }
        }
        if (queue != null) {
            this.promptDelivery(queue);
        }
    }

    private void acknowledgeDeliveryInternal(Ack ack) throws Throwable {
        DeliveryRecord rec;
        if (this.trace) {
            log.trace(this + " acknowledging delivery " + ack);
        }
        if ((rec = (DeliveryRecord)this.deliveries.remove(new Long(ack.getDeliveryID()))) == null) {
            log.warn("Cannot find " + ack + " to acknowledge, " + "maybe it was already acknowledged before failover!");
            return;
        }
        rec.del.acknowledge(null);
        if (this.trace) {
            log.trace(this + " acknowledged delivery " + ack);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConsumerDelegate createConsumerDelegateInternal(JBossDestination jmsDestination, String selectorString, boolean noLocal, String subscriptionName) throws Throwable {
        ConsumerAdvised advised;
        Object q;
        ManagedDestination mDest;
        if (this.closed) {
            throw new IllegalStateException("Session is closed");
        }
        if ("".equals(selectorString)) {
            selectorString = null;
        }
        if (this.trace) {
            log.trace(this + " creating consumer for " + jmsDestination + (selectorString == null ? "" : ", selector '" + selectorString + "'") + (subscriptionName == null ? "" : ", subscription '" + subscriptionName + "'") + (noLocal ? ", noLocal" : ""));
        }
        if ((mDest = this.dm.getDestination(jmsDestination.getName(), jmsDestination.isQueue())) == null) {
            throw new InvalidDestinationException("No such destination: " + jmsDestination);
        }
        if (jmsDestination.isTemporary() && !this.connectionEndpoint.hasTemporaryDestination(jmsDestination)) {
            String msg = "Cannot create a message consumer on a different connection to that which created the temporary destination";
            throw new IllegalStateException(msg);
        }
        int consumerID = this.connectionEndpoint.getServerPeer().getNextObjectID();
        Binding binding = null;
        Selector selector = null;
        if (selectorString != null) {
            selector = new Selector(selectorString);
        }
        if (jmsDestination.isTopic()) {
            JMSCondition topicCond = new JMSCondition(false, jmsDestination.getName());
            if (subscriptionName == null) {
                PagingFilteredQueue q2;
                if (log.isTraceEnabled()) {
                    log.trace(this + " creating new non-durable subscription on " + jmsDestination);
                }
                if (this.postOffice.isLocal() || !mDest.isClustered()) {
                    q2 = new PagingFilteredQueue(new GUID().toString(), this.idm.getID(), this.ms, this.pm, true, false, mDest.getMaxSize(), selector, mDest.getFullSize(), mDest.getPageSize(), mDest.getDownCacheSize());
                    binding = this.postOffice.bindQueue(topicCond, q2);
                } else {
                    q2 = new LocalClusteredQueue((ClusteredPostOffice)this.postOffice, this.nodeId, new GUID().toString(), this.idm.getID(), this.ms, this.pm, true, false, mDest.getMaxSize(), selector, this.tr, mDest.getFullSize(), mDest.getPageSize(), mDest.getDownCacheSize());
                    ClusteredPostOffice cpo = (ClusteredPostOffice)this.postOffice;
                    binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q2);
                }
                String counterName = "Subscription." + q2.getName();
                int dayLimitToUse = mDest.getMessageCounterHistoryDayLimit();
                if (dayLimitToUse == -1) {
                    dayLimitToUse = this.sp.getDefaultMessageCounterHistoryDayLimit();
                }
                MessageCounter counter = new MessageCounter(counterName, null, q2, true, false, dayLimitToUse);
                this.sp.getMessageCounterManager().registerMessageCounter(counterName, counter);
            } else {
                if (jmsDestination.isTemporary()) {
                    throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
                }
                String clientID = this.connectionEndpoint.getClientID();
                if (clientID == null) {
                    throw new JMSException("Cannot create durable subscriber without a valid client ID");
                }
                String name = MessageQueueNameHelper.createSubscriptionName(clientID, subscriptionName);
                binding = this.postOffice.getBindingForQueueName(name);
                if (binding == null) {
                    PagingFilteredQueue q3;
                    if (this.trace) {
                        log.trace(this + " creating new durable subscription on " + jmsDestination);
                    }
                    if (this.postOffice.isLocal()) {
                        q3 = new PagingFilteredQueue(name, this.idm.getID(), this.ms, this.pm, true, true, mDest.getMaxSize(), selector, mDest.getFullSize(), mDest.getPageSize(), mDest.getDownCacheSize());
                        binding = this.postOffice.bindQueue(topicCond, q3);
                    } else {
                        q3 = new LocalClusteredQueue((ClusteredPostOffice)this.postOffice, this.nodeId, name, this.idm.getID(), this.ms, this.pm, true, true, mDest.getMaxSize(), selector, this.tr, mDest.getFullSize(), mDest.getPageSize(), mDest.getDownCacheSize());
                        ClusteredPostOffice cpo = (ClusteredPostOffice)this.postOffice;
                        binding = mDest.isClustered() ? cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q3) : cpo.bindQueue(topicCond, q3);
                    }
                    String counterName = "Subscription." + q3.getName();
                    MessageCounter counter = new MessageCounter(counterName, subscriptionName, q3, true, true, mDest.getMessageCounterHistoryDayLimit());
                    this.sp.getMessageCounterManager().registerMessageCounter(counterName, counter);
                } else {
                    JMSCondition cond;
                    boolean topicChanged;
                    boolean selectorChanged;
                    if (this.trace) {
                        log.trace(this + " subscription " + subscriptionName + " already exists");
                    }
                    String filterString = binding.getQueue().getFilter() != null ? binding.getQueue().getFilter().getFilterString() : null;
                    boolean bl = selectorChanged = selectorString == null && filterString != null || filterString == null && selectorString != null || filterString != null && selectorString != null && !filterString.equals(selectorString);
                    if (this.trace) {
                        log.trace("selector " + (selectorChanged ? "has" : "has NOT") + " changed");
                    }
                    boolean bl2 = topicChanged = !(cond = (JMSCondition)binding.getCondition()).getName().equals(jmsDestination.getName());
                    if (log.isTraceEnabled()) {
                        log.trace("topic " + (topicChanged ? "has" : "has NOT") + " changed");
                    }
                    if (selectorChanged || topicChanged) {
                        if (this.trace) {
                            log.trace("topic or selector changed so deleting old subscription");
                        }
                        if (mDest.isClustered() && !this.postOffice.isLocal()) {
                            ClusteredPostOffice cpo = (ClusteredPostOffice)this.postOffice;
                            cpo.unbindClusteredQueue(name);
                        } else {
                            this.postOffice.unbindQueue(name);
                        }
                        if (this.postOffice.isLocal()) {
                            q = new PagingFilteredQueue(name, this.idm.getID(), this.ms, this.pm, true, true, mDest.getMaxSize(), selector, mDest.getFullSize(), mDest.getPageSize(), mDest.getDownCacheSize());
                            binding = this.postOffice.bindQueue(topicCond, (Queue)q);
                        } else {
                            q = new LocalClusteredQueue((ClusteredPostOffice)this.postOffice, this.nodeId, name, this.idm.getID(), this.ms, this.pm, true, true, mDest.getMaxSize(), selector, this.tr, mDest.getFullSize(), mDest.getPageSize(), mDest.getDownCacheSize());
                            ClusteredPostOffice cpo = (ClusteredPostOffice)this.postOffice;
                            binding = mDest.isClustered() ? cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q) : cpo.bindQueue(topicCond, (LocalClusteredQueue)q);
                        }
                        String counterName = "Subscription." + ((PagingFilteredQueue)q).getName();
                        MessageCounter counter = new MessageCounter(counterName, subscriptionName, (Queue)q, true, true, mDest.getMessageCounterHistoryDayLimit());
                        this.sp.getMessageCounterManager().registerMessageCounter(counterName, counter);
                    }
                }
            }
        } else {
            binding = this.postOffice.getBindingForQueueName(jmsDestination.getName());
            if (binding == null) {
                throw new IllegalStateException("Cannot find binding for jms queue: " + jmsDestination.getName());
            }
        }
        int prefetchSize = this.connectionEndpoint.getPrefetchSize();
        Queue dlqToUse = mDest.getDLQ() == null ? this.defaultDLQ : mDest.getDLQ();
        Queue expiryQueueToUse = mDest.getExpiryQueue() == null ? this.defaultExpiryQueue : mDest.getExpiryQueue();
        long redeliveryDelay = mDest.getRedeliveryDelay();
        if (redeliveryDelay == 0L) {
            redeliveryDelay = this.sp.getDefaultRedeliveryDelay();
        }
        ServerConsumerEndpoint ep = new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(), binding.getQueue().getName(), this, selectorString, noLocal, jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelay);
        q = AspectManager.instance();
        synchronized (q) {
            advised = new ConsumerAdvised(ep);
        }
        Dispatcher.instance.registerTarget(consumerID, (AdvisedSupport)advised);
        ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, prefetchSize, this.maxDeliveryAttempts);
        Map map = this.consumers;
        synchronized (map) {
            this.consumers.put(new Integer(consumerID), ep);
        }
        log.debug(this + " created and registered " + ep);
        return stub;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private BrowserDelegate createBrowserDelegateInternal(JBossDestination jmsDestination, String selector) throws Throwable {
        BrowserAdvised advised;
        if (this.closed) {
            throw new IllegalStateException("Session is closed");
        }
        if (jmsDestination == null) {
            throw new InvalidDestinationException("null destination");
        }
        if (jmsDestination.isTopic()) {
            throw new IllegalStateException("Cannot browse a topic");
        }
        if (this.dm.getDestination(jmsDestination.getName(), jmsDestination.isQueue()) == null) {
            throw new InvalidDestinationException("No such destination: " + jmsDestination);
        }
        log.debug(this + " creating browser for " + jmsDestination + (selector == null ? "" : ", selector '" + selector + "'"));
        Binding binding = this.postOffice.getBindingForQueueName(jmsDestination.getName());
        int browserID = this.connectionEndpoint.getServerPeer().getNextObjectID();
        ServerBrowserEndpoint ep = new ServerBrowserEndpoint(this, browserID, (PagingFilteredQueue)binding.getQueue(), selector);
        Map map = this.browsers;
        synchronized (map) {
            this.browsers.put(new Integer(browserID), ep);
        }
        AspectManager aspectManager = AspectManager.instance();
        synchronized (aspectManager) {
            advised = new BrowserAdvised(ep);
        }
        Dispatcher.instance.registerTarget(browserID, (AdvisedSupport)advised);
        ClientBrowserDelegate stub = new ClientBrowserDelegate(browserID);
        log.debug(this + " created and registered " + ep);
        return stub;
    }

    private void promptDelivery(Set channels) {
        Iterator iter = channels.iterator();
        while (iter.hasNext()) {
            DeliveryObserver observer = (DeliveryObserver)iter.next();
            this.promptDelivery((Channel)observer);
        }
    }

    private class DeliveryCallback
    implements TxCallback {
        List delList = new ArrayList();

        private DeliveryCallback() {
        }

        public void beforePrepare() {
        }

        public void beforeCommit(boolean onePhase) {
        }

        public void beforeRollback(boolean onePhase) {
        }

        public void afterPrepare() {
        }

        public synchronized void afterCommit(boolean onePhase) throws TransactionException {
            Iterator iter = this.delList.iterator();
            while (iter.hasNext()) {
                Long deliveryId = (Long)iter.next();
                ServerSessionEndpoint.this.deliveries.remove(deliveryId);
            }
        }

        public void afterRollback(boolean onePhase) throws TransactionException {
        }

        synchronized void addDeliveryId(Long deliveryId) {
            this.delList.add(deliveryId);
        }
    }

    private static class DeliveryRecord {
        Delivery del;
        int consumerId;
        Queue dlq;
        Queue expiryQueue;
        long redeliveryDelay;

        DeliveryRecord(Delivery del, int consumerId, Queue dlq, Queue expiryQueue, long redeliveryDelay) {
            this.del = del;
            this.consumerId = consumerId;
            this.dlq = dlq;
            this.expiryQueue = expiryQueue;
            this.redeliveryDelay = redeliveryDelay;
        }
    }
}

