/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.tool;

import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.tool.AbstractJmsMeasurableClient;
import org.apache.activemq.tool.properties.JmsClientProperties;
import org.apache.activemq.tool.properties.JmsConsumerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsConsumerClient
extends AbstractJmsMeasurableClient {
    private static final Logger LOG = LoggerFactory.getLogger(JmsConsumerClient.class);
    protected MessageConsumer jmsConsumer;
    protected JmsConsumerProperties client;

    public JmsConsumerClient(ConnectionFactory factory) {
        this(new JmsConsumerProperties(), factory);
    }

    public JmsConsumerClient(JmsConsumerProperties clientProps, ConnectionFactory factory) {
        super(factory);
        this.client = clientProps;
    }

    public void receiveMessages() throws JMSException {
        if (this.client.isAsyncRecv()) {
            if (this.client.getRecvType().equalsIgnoreCase("time")) {
                this.receiveAsyncTimeBasedMessages(this.client.getRecvDuration());
            } else {
                this.receiveAsyncCountBasedMessages(this.client.getRecvCount());
            }
        } else if (this.client.getRecvType().equalsIgnoreCase("time")) {
            this.receiveSyncTimeBasedMessages(this.client.getRecvDuration());
        } else {
            this.receiveSyncCountBasedMessages(this.client.getRecvCount());
        }
    }

    public void receiveMessages(int destCount) throws JMSException {
        this.destCount = destCount;
        this.receiveMessages();
    }

    public void receiveMessages(int destIndex, int destCount) throws JMSException {
        this.destIndex = destIndex;
        this.receiveMessages(destCount);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveSyncTimeBasedMessages(long duration) throws JMSException {
        if (this.getJmsConsumer() == null) {
            this.createJmsConsumer();
        }
        try {
            this.getConnection().start();
            LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
            long endTime = System.currentTimeMillis() + duration;
            while (System.currentTimeMillis() - endTime < 0L) {
                this.getJmsConsumer().receive();
                this.incThroughput();
                this.sleep();
                this.commitTxIfNecessary();
            }
        }
        finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info("Unsubscribing durable subscriber: " + this.getClientName());
                this.getJmsConsumer().close();
                this.getSession().unsubscribe(this.getClientName());
            }
            this.getConnection().close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveSyncCountBasedMessages(long count) throws JMSException {
        if (this.getJmsConsumer() == null) {
            this.createJmsConsumer();
        }
        try {
            this.getConnection().start();
            LOG.info("Starting to synchronously receive " + count + " messages...");
            int recvCount = 0;
            while ((long)recvCount < count) {
                this.getJmsConsumer().receive();
                this.incThroughput();
                ++recvCount;
                this.sleep();
                this.commitTxIfNecessary();
            }
        }
        finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info("Unsubscribing durable subscriber: " + this.getClientName());
                this.getJmsConsumer().close();
                this.getSession().unsubscribe(this.getClientName());
            }
            this.getConnection().close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveAsyncTimeBasedMessages(long duration) throws JMSException {
        if (this.getJmsConsumer() == null) {
            this.createJmsConsumer();
        }
        this.getJmsConsumer().setMessageListener(new MessageListener(){

            public void onMessage(Message msg) {
                JmsConsumerClient.this.incThroughput();
                JmsConsumerClient.this.sleep();
                try {
                    JmsConsumerClient.this.commitTxIfNecessary();
                }
                catch (JMSException ex) {
                    LOG.error("Error committing transaction: " + ex.getMessage());
                }
            }
        });
        try {
            this.getConnection().start();
            LOG.info("Starting to asynchronously receive messages for " + duration + " ms...");
            try {
                Thread.sleep(duration);
            }
            catch (InterruptedException e) {
                throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
            }
        }
        finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info("Unsubscribing durable subscriber: " + this.getClientName());
                this.getJmsConsumer().close();
                this.getSession().unsubscribe(this.getClientName());
            }
            this.getConnection().close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void receiveAsyncCountBasedMessages(long count) throws JMSException {
        if (this.getJmsConsumer() == null) {
            this.createJmsConsumer();
        }
        final AtomicInteger recvCount = new AtomicInteger(0);
        this.getJmsConsumer().setMessageListener(new MessageListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onMessage(Message msg) {
                JmsConsumerClient.this.incThroughput();
                JmsConsumerClient.this.sleep();
                recvCount.incrementAndGet();
                AtomicInteger atomicInteger = recvCount;
                synchronized (atomicInteger) {
                    recvCount.notify();
                }
                try {
                    JmsConsumerClient.this.commitTxIfNecessary();
                }
                catch (JMSException ex) {
                    LOG.error("Error committing transaction: " + ex.getMessage());
                }
            }
        });
        try {
            this.getConnection().start();
            LOG.info("Starting to asynchronously receive " + this.client.getRecvCount() + " messages...");
            try {
                while ((long)recvCount.get() < count) {
                    AtomicInteger atomicInteger = recvCount;
                    synchronized (atomicInteger) {
                        recvCount.wait();
                    }
                }
                return;
            }
            catch (InterruptedException e) {
                throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
            }
        }
        finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info("Unsubscribing durable subscriber: " + this.getClientName());
                this.getJmsConsumer().close();
                this.getSession().unsubscribe(this.getClientName());
            }
            this.getConnection().close();
        }
    }

    public MessageConsumer createJmsConsumer() throws JMSException {
        Destination[] dest = this.createDestinations(this.destCount);
        Destination consumedDestination = dest[0];
        if (dest.length > 1) {
            String destinationName = ((ActiveMQDestination)consumedDestination).getPhysicalName();
            LOG.warn("Multiple destinations requested for consumer; using only first: {}", (Object)destinationName);
        }
        if (this.client.getMessageSelector() == null) {
            return this.createJmsConsumer(consumedDestination);
        }
        return this.createJmsConsumer(consumedDestination, this.client.getMessageSelector(), false);
    }

    public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
        if (this.client.isDurable()) {
            String clientName = this.getClientName();
            if (clientName == null) {
                clientName = "JmsConsumer";
                this.setClientName(clientName);
            }
            LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
            this.jmsConsumer = this.getSession().createDurableSubscriber((Topic)dest, clientName);
        } else {
            LOG.info("Creating non-durable consumer to: " + dest.toString());
            this.jmsConsumer = this.getSession().createConsumer(dest);
        }
        return this.jmsConsumer;
    }

    public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException {
        if (this.client.isDurable()) {
            String clientName = this.getClientName();
            if (clientName == null) {
                clientName = "JmsConsumer";
                this.setClientName(clientName);
            }
            LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
            this.jmsConsumer = this.getSession().createDurableSubscriber((Topic)dest, clientName, selector, noLocal);
        } else {
            LOG.info("Creating non-durable consumer to: " + dest.toString());
            this.jmsConsumer = this.getSession().createConsumer(dest, selector, noLocal);
        }
        return this.jmsConsumer;
    }

    public MessageConsumer getJmsConsumer() {
        return this.jmsConsumer;
    }

    @Override
    public JmsClientProperties getClient() {
        return this.client;
    }

    @Override
    public void setClient(JmsClientProperties clientProps) {
        this.client = (JmsConsumerProperties)clientProps;
    }

    protected void sleep() {
        if (this.client.getRecvDelay() > 0L) {
            try {
                LOG.trace("Sleeping for " + this.client.getRecvDelay() + " milliseconds");
                Thread.sleep(this.client.getRecvDelay());
            }
            catch (InterruptedException ex) {
                LOG.warn(ex.getMessage());
            }
        }
    }
}

