/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.AbstractSubscription;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicSubscription
extends AbstractSubscription {
    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
    private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0L);
    protected PendingMessageCursor matched;
    protected final SystemUsage usageManager;
    protected AtomicLong dispatchedCounter = new AtomicLong();
    boolean singleDestination = true;
    Destination destination;
    private final Scheduler scheduler;
    private int maximumPendingMessages = -1;
    private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
    private int discarded;
    private final Object matchedListMutex = new Object();
    private final AtomicLong enqueueCounter = new AtomicLong(0L);
    private final AtomicLong dequeueCounter = new AtomicLong(0L);
    private final AtomicBoolean prefetchWindowOpen = new AtomicBoolean(false);
    private int memoryUsageHighWaterMark = 95;
    protected int maxProducersToAudit = 1024;
    protected int maxAuditDepth = 1000;
    protected boolean enableAudit = false;
    protected ActiveMQMessageAudit audit;
    protected boolean active = false;

    public TopicSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
        super(broker, context, info);
        this.usageManager = usageManager;
        String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
        this.matched = info.getDestination().isTemporary() || broker.getTempDataStore() == null ? new VMPendingMessageCursor(false) : new FilePendingMessageCursor(broker, matchedName, false);
        this.scheduler = broker.getScheduler();
    }

    public void init() throws Exception {
        this.matched.setSystemUsage(this.usageManager);
        this.matched.setMemoryUsageHighWaterMark(this.getCursorMemoryHighWaterMark());
        this.matched.start();
        if (this.enableAudit) {
            this.audit = new ActiveMQMessageAudit(this.maxAuditDepth, this.maxProducersToAudit);
        }
        this.active = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void add(MessageReference node) throws Exception {
        if (this.isDuplicate(node)) {
            return;
        }
        node = new IndirectMessageReference(node.getMessage());
        this.enqueueCounter.incrementAndGet();
        if (!this.isFull() && this.matched.isEmpty()) {
            this.dispatch(node);
            this.setSlowConsumer(false);
        } else {
            if (this.info.getPrefetchSize() > 1 && this.matched.size() > this.info.getPrefetchSize() && !this.isSlowConsumer()) {
                LOG.warn(this.toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow");
                this.setSlowConsumer(true);
                for (Destination dest : this.destinations) {
                    dest.slowConsumer(this.getContext(), this);
                }
            }
            if (this.maximumPendingMessages != 0) {
                Object object;
                boolean warnedAboutWait = false;
                while (this.active) {
                    object = this.matchedListMutex;
                    synchronized (object) {
                        while (this.matched.isFull()) {
                            if (this.getContext().getStopping().get()) {
                                LOG.warn(this.toString() + ": stopped waiting for space in pendingMessage cursor for: " + node.getMessageId());
                                this.enqueueCounter.decrementAndGet();
                                return;
                            }
                            if (!warnedAboutWait) {
                                LOG.info(this.toString() + ": Pending message cursor [" + this.matched + "] is full, temp usage (" + this.matched.getSystemUsage().getTempUsage().getPercentUsage() + "%) or memory usage (" + this.matched.getSystemUsage().getMemoryUsage().getPercentUsage() + "%) limit reached, blocking message add() pending the release of resources.");
                                warnedAboutWait = true;
                            }
                            this.matchedListMutex.wait(20L);
                        }
                        if (this.matched.tryAddMessageLast(node, 10L)) {
                            break;
                        }
                    }
                }
                object = this.matchedListMutex;
                synchronized (object) {
                    if (this.maximumPendingMessages > 0) {
                        int max = this.messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
                        if (this.maximumPendingMessages > 0 && this.maximumPendingMessages < max) {
                            max = this.maximumPendingMessages;
                        }
                        if (!this.matched.isEmpty() && this.matched.size() > max) {
                            this.removeExpiredMessages();
                        }
                        while (!this.matched.isEmpty() && this.matched.size() > this.maximumPendingMessages) {
                            int pageInSize = this.matched.size() - this.maximumPendingMessages;
                            pageInSize = Math.max(1000, pageInSize);
                            LinkedList<MessageReference> list = null;
                            MessageReference[] oldMessages = null;
                            PendingMessageCursor pendingMessageCursor = this.matched;
                            synchronized (pendingMessageCursor) {
                                list = this.matched.pageInList(pageInSize);
                                oldMessages = this.messageEvictionStrategy.evictMessages(list);
                                for (MessageReference ref : list) {
                                    ref.decrementReferenceCount();
                                }
                            }
                            int messagesToEvict = 0;
                            if (oldMessages != null) {
                                messagesToEvict = oldMessages.length;
                                for (int i = 0; i < messagesToEvict; ++i) {
                                    MessageReference oldMessage = oldMessages[i];
                                    this.discard(oldMessage);
                                }
                            }
                            if (messagesToEvict != 0) continue;
                            LOG.warn("No messages to evict returned for " + this.destination + " from eviction strategy: " + this.messageEvictionStrategy + " out of " + list.size() + " candidates");
                            break;
                        }
                    }
                }
                this.dispatchMatched();
            }
        }
    }

    private boolean isDuplicate(MessageReference node) {
        boolean duplicate = false;
        if (this.enableAudit && this.audit != null) {
            duplicate = this.audit.isDuplicate(node);
            if (LOG.isDebugEnabled() && duplicate) {
                LOG.debug(this + ", ignoring duplicate add: " + node.getMessageId());
            }
        }
        return duplicate;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removeExpiredMessages() throws IOException {
        try {
            this.matched.reset();
            while (this.matched.hasNext()) {
                MessageReference node = this.matched.next();
                node.decrementReferenceCount();
                if (!this.broker.isExpired(node)) continue;
                this.matched.remove();
                this.dispatchedCounter.incrementAndGet();
                node.decrementReferenceCount();
                ((Destination)node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
                this.broker.messageExpired(this.getContext(), node, this);
                break;
            }
        }
        finally {
            this.matched.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
        Object object = this.matchedListMutex;
        synchronized (object) {
            try {
                this.matched.reset();
                while (this.matched.hasNext()) {
                    MessageReference node = this.matched.next();
                    node.decrementReferenceCount();
                    if (!node.getMessageId().equals(mdn.getMessageId())) continue;
                    this.matched.remove();
                    this.dispatchedCounter.incrementAndGet();
                    node.decrementReferenceCount();
                    break;
                }
            }
            finally {
                this.matched.release();
            }
        }
    }

    @Override
    public synchronized void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception {
        super.acknowledge(context, ack);
        if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
            if (context.isInTransaction()) {
                context.getTransaction().addSynchronization(new Synchronization(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void afterCommit() throws Exception {
                        TopicSubscription topicSubscription = TopicSubscription.this;
                        synchronized (topicSubscription) {
                            if (TopicSubscription.this.singleDestination && TopicSubscription.this.destination != null) {
                                TopicSubscription.this.destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
                            }
                        }
                        TopicSubscription.this.dequeueCounter.addAndGet(ack.getMessageCount());
                        TopicSubscription.this.dispatchMatched();
                    }
                });
            } else {
                if (this.singleDestination && this.destination != null) {
                    this.destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
                    this.destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
                }
                this.dequeueCounter.addAndGet(ack.getMessageCount());
            }
            this.dispatchMatched();
            return;
        }
        if (ack.isDeliveredAck()) {
            if (this.destination != null && !ack.isInTransaction()) {
                this.destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
                this.destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
            }
            this.dequeueCounter.addAndGet(ack.getMessageCount());
            this.dispatchMatched();
            return;
        }
        if (ack.isRedeliveredAck()) {
            return;
        }
        throw new JMSException("Invalid acknowledgment: " + ack);
    }

    @Override
    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
        if (this.getPrefetchSize() == 0) {
            this.prefetchWindowOpen.set(true);
            this.dispatchMatched();
            if (this.prefetchWindowOpen.get()) {
                if (pull.getTimeout() == -1L) {
                    this.prefetchWindowOpen.set(false);
                    this.dispatch(null);
                }
                if (pull.getTimeout() > 0L) {
                    this.scheduler.executeAfterDelay(new Runnable(){

                        @Override
                        public void run() {
                            TopicSubscription.this.pullTimeout();
                        }
                    }, pull.getTimeout());
                }
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void pullTimeout() {
        Object object = this.matchedListMutex;
        synchronized (object) {
            if (this.prefetchWindowOpen.compareAndSet(true, false)) {
                try {
                    this.dispatch(null);
                }
                catch (Exception e) {
                    this.context.getConnection().serviceException(e);
                }
            }
        }
    }

    @Override
    public int getPendingQueueSize() {
        return this.matched();
    }

    @Override
    public int getDispatchedQueueSize() {
        return (int)(this.dispatchedCounter.get() - this.dequeueCounter.get());
    }

    public int getMaximumPendingMessages() {
        return this.maximumPendingMessages;
    }

    @Override
    public long getDispatchedCounter() {
        return this.dispatchedCounter.get();
    }

    @Override
    public long getEnqueueCounter() {
        return this.enqueueCounter.get();
    }

    @Override
    public long getDequeueCounter() {
        return this.dequeueCounter.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int discarded() {
        Object object = this.matchedListMutex;
        synchronized (object) {
            return this.discarded;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int matched() {
        Object object = this.matchedListMutex;
        synchronized (object) {
            return this.matched.size();
        }
    }

    public void setMaximumPendingMessages(int maximumPendingMessages) {
        this.maximumPendingMessages = maximumPendingMessages;
    }

    public MessageEvictionStrategy getMessageEvictionStrategy() {
        return this.messageEvictionStrategy;
    }

    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
        this.messageEvictionStrategy = messageEvictionStrategy;
    }

    public int getMaxProducersToAudit() {
        return this.maxProducersToAudit;
    }

    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
        this.maxProducersToAudit = maxProducersToAudit;
        if (this.audit != null) {
            this.audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
        }
    }

    public int getMaxAuditDepth() {
        return this.maxAuditDepth;
    }

    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
        this.maxAuditDepth = maxAuditDepth;
        if (this.audit != null) {
            this.audit.setAuditDepth(maxAuditDepth);
        }
    }

    public boolean isEnableAudit() {
        return this.enableAudit;
    }

    public synchronized void setEnableAudit(boolean enableAudit) {
        this.enableAudit = enableAudit;
        if (enableAudit && this.audit == null) {
            this.audit = new ActiveMQMessageAudit(this.maxAuditDepth, this.maxProducersToAudit);
        }
    }

    @Override
    public boolean isFull() {
        return this.getDispatchedQueueSize() >= this.info.getPrefetchSize() && !this.prefetchWindowOpen.get();
    }

    @Override
    public int getInFlightSize() {
        return this.getDispatchedQueueSize();
    }

    @Override
    public boolean isLowWaterMark() {
        return (double)this.getDispatchedQueueSize() <= (double)this.info.getPrefetchSize() * 0.4;
    }

    @Override
    public boolean isHighWaterMark() {
        return (double)this.getDispatchedQueueSize() >= (double)this.info.getPrefetchSize() * 0.9;
    }

    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
    }

    public int getMemoryUsageHighWaterMark() {
        return this.memoryUsageHighWaterMark;
    }

    public SystemUsage getUsageManager() {
        return this.usageManager;
    }

    public PendingMessageCursor getMatched() {
        return this.matched;
    }

    public void setMatched(PendingMessageCursor matched) {
        this.matched = matched;
    }

    @Override
    public void updateConsumerPrefetch(int newPrefetch) {
        if (this.context != null && this.context.getConnection() != null && this.context.getConnection().isManageable()) {
            ConsumerControl cc = new ConsumerControl();
            cc.setConsumerId(this.info.getConsumerId());
            cc.setPrefetch(newPrefetch);
            this.context.getConnection().dispatchAsync(cc);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispatchMatched() throws IOException {
        Object object = this.matchedListMutex;
        synchronized (object) {
            if (!this.matched.isEmpty() && !this.isFull()) {
                try {
                    this.matched.reset();
                    while (this.matched.hasNext() && !this.isFull()) {
                        MessageReference message = this.matched.next();
                        message.decrementReferenceCount();
                        this.matched.remove();
                        if (message.isExpired()) {
                            this.discard(message);
                            continue;
                        }
                        this.dispatch(message);
                        this.prefetchWindowOpen.set(false);
                    }
                }
                finally {
                    this.matched.release();
                }
            }
        }
    }

    private void dispatch(final MessageReference node) throws IOException {
        Message message = node.getMessage();
        if (node != null) {
            node.incrementReferenceCount();
        }
        MessageDispatch md = new MessageDispatch();
        md.setMessage(message);
        md.setConsumerId(this.info.getConsumerId());
        if (node != null) {
            md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
            this.dispatchedCounter.incrementAndGet();
            if (this.singleDestination) {
                if (this.destination == null) {
                    this.destination = (Destination)node.getRegionDestination();
                } else if (this.destination != node.getRegionDestination()) {
                    this.singleDestination = false;
                }
            }
        }
        if (this.info.isDispatchAsync()) {
            if (node != null) {
                md.setTransmitCallback(new TransmitCallback(){

                    @Override
                    public void onSuccess() {
                        Destination regionDestination = (Destination)node.getRegionDestination();
                        regionDestination.getDestinationStatistics().getDispatched().increment();
                        regionDestination.getDestinationStatistics().getInflight().increment();
                        node.decrementReferenceCount();
                    }

                    @Override
                    public void onFailure() {
                        Destination regionDestination = (Destination)node.getRegionDestination();
                        regionDestination.getDestinationStatistics().getDispatched().increment();
                        regionDestination.getDestinationStatistics().getInflight().increment();
                        node.decrementReferenceCount();
                    }
                });
            }
            this.context.getConnection().dispatchAsync(md);
        } else {
            this.context.getConnection().dispatchSync(md);
            if (node != null) {
                Destination regionDestination = (Destination)node.getRegionDestination();
                regionDestination.getDestinationStatistics().getDispatched().increment();
                regionDestination.getDestinationStatistics().getInflight().increment();
                node.decrementReferenceCount();
            }
        }
    }

    private void discard(MessageReference message) {
        Destination dest;
        message.decrementReferenceCount();
        this.matched.remove(message);
        ++this.discarded;
        if (this.destination != null) {
            this.destination.getDestinationStatistics().getDequeues().increment();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(this + ", discarding message " + message);
        }
        if ((dest = (Destination)message.getRegionDestination()) != null) {
            dest.messageDiscarded(this.getContext(), this, message);
        }
        this.broker.getRoot().sendToDeadLetterQueue(this.getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + this.info.getConsumerId()));
    }

    public String toString() {
        return "TopicSubscription: consumer=" + this.info.getConsumerId() + ", destinations=" + this.destinations.size() + ", dispatched=" + this.getDispatchedQueueSize() + ", delivered=" + this.getDequeueCounter() + ", matched=" + this.matched() + ", discarded=" + this.discarded();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void destroy() {
        this.active = false;
        Object object = this.matchedListMutex;
        synchronized (object) {
            try {
                this.matched.destroy();
            }
            catch (Exception e) {
                LOG.warn("Failed to destroy cursor", e);
            }
        }
        this.setSlowConsumer(false);
    }

    @Override
    public int getPrefetchSize() {
        return this.info.getPrefetchSize();
    }

    @Override
    public void setPrefetchSize(int newSize) {
        this.info.setPrefetchSize(newSize);
        try {
            this.dispatchMatched();
        }
        catch (Exception e) {
            LOG.trace("Caught exception on dispatch after prefetch size change.");
        }
    }
}

