/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.tool;

import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Topic;
import org.apache.activemq.tool.AbstractJmsMeasurableClient;
import org.apache.activemq.tool.properties.JmsClientProperties;
import org.apache.activemq.tool.properties.JmsConsumerProperties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class JmsConsumerClient
extends AbstractJmsMeasurableClient {
    private static final Log LOG = LogFactory.getLog(JmsConsumerClient.class);
    protected MessageConsumer jmsConsumer;
    protected JmsConsumerProperties client;

    public JmsConsumerClient(ConnectionFactory factory) {
        this(new JmsConsumerProperties(), factory);
    }

    public JmsConsumerClient(JmsConsumerProperties clientProps, ConnectionFactory factory) {
        super(factory);
        this.client = clientProps;
    }

    public void receiveMessages() throws JMSException {
        if (this.client.isAsyncRecv()) {
            if (this.client.getRecvType().equalsIgnoreCase("time")) {
                this.receiveAsyncTimeBasedMessages(this.client.getRecvDuration());
            } else {
                this.receiveAsyncCountBasedMessages(this.client.getRecvCount());
            }
        } else if (this.client.getRecvType().equalsIgnoreCase("time")) {
            this.receiveSyncTimeBasedMessages(this.client.getRecvDuration());
        } else {
            this.receiveSyncCountBasedMessages(this.client.getRecvCount());
        }
    }

    public void receiveMessages(int destCount) throws JMSException {
        this.destCount = destCount;
        this.receiveMessages();
    }

    public void receiveMessages(int destIndex, int destCount) throws JMSException {
        this.destIndex = destIndex;
        this.receiveMessages(destCount);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveSyncTimeBasedMessages(long duration) throws JMSException {
        if (this.getJmsConsumer() == null) {
            this.createJmsConsumer();
        }
        try {
            this.getConnection().start();
            LOG.info((Object)("Starting to synchronously receive messages for " + duration + " ms..."));
            long endTime = System.currentTimeMillis() + duration;
            while (System.currentTimeMillis() < endTime) {
                this.getJmsConsumer().receive();
                this.incThroughput();
            }
        }
        finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info((Object)("Unsubscribing durable subscriber: " + this.getClientName()));
                this.getJmsConsumer().close();
                this.getSession().unsubscribe(this.getClientName());
            }
            this.getConnection().close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveSyncCountBasedMessages(long count) throws JMSException {
        if (this.getJmsConsumer() == null) {
            this.createJmsConsumer();
        }
        try {
            this.getConnection().start();
            LOG.info((Object)("Starting to synchronously receive " + count + " messages..."));
            int recvCount = 0;
            while ((long)recvCount < count) {
                this.getJmsConsumer().receive();
                this.incThroughput();
                ++recvCount;
            }
        }
        finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info((Object)("Unsubscribing durable subscriber: " + this.getClientName()));
                this.getJmsConsumer().close();
                this.getSession().unsubscribe(this.getClientName());
            }
            this.getConnection().close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveAsyncTimeBasedMessages(long duration) throws JMSException {
        if (this.getJmsConsumer() == null) {
            this.createJmsConsumer();
        }
        this.getJmsConsumer().setMessageListener(new MessageListener(){

            public void onMessage(Message msg) {
                JmsConsumerClient.this.incThroughput();
            }
        });
        try {
            this.getConnection().start();
            LOG.info((Object)("Starting to asynchronously receive messages for " + duration + " ms..."));
            try {
                Thread.sleep(duration);
            }
            catch (InterruptedException e) {
                throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
            }
        }
        finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info((Object)("Unsubscribing durable subscriber: " + this.getClientName()));
                this.getJmsConsumer().close();
                this.getSession().unsubscribe(this.getClientName());
            }
            this.getConnection().close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveAsyncCountBasedMessages(long count) throws JMSException {
        if (this.getJmsConsumer() == null) {
            this.createJmsConsumer();
        }
        final AtomicInteger recvCount = new AtomicInteger(0);
        this.getJmsConsumer().setMessageListener(new MessageListener(){

            public void onMessage(Message msg) {
                JmsConsumerClient.this.incThroughput();
                recvCount.incrementAndGet();
                recvCount.notify();
            }
        });
        try {
            this.getConnection().start();
            LOG.info((Object)("Starting to asynchronously receive " + this.client.getRecvCount() + " messages..."));
            try {
                while ((long)recvCount.get() < count) {
                    recvCount.wait();
                }
            }
            catch (InterruptedException e) {
                throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
            }
        }
        finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info((Object)("Unsubscribing durable subscriber: " + this.getClientName()));
                this.getJmsConsumer().close();
                this.getSession().unsubscribe(this.getClientName());
            }
            this.getConnection().close();
        }
    }

    public MessageConsumer createJmsConsumer() throws JMSException {
        Destination[] dest = this.createDestination(this.destIndex, this.destCount);
        return this.createJmsConsumer(dest[0]);
    }

    public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
        if (this.client.isDurable()) {
            String clientName = this.getClientName();
            if (clientName == null) {
                clientName = "JmsConsumer";
                this.setClientName(clientName);
            }
            LOG.info((Object)("Creating durable subscriber (" + clientName + ") to: " + dest.toString()));
            this.jmsConsumer = this.getSession().createDurableSubscriber((Topic)dest, clientName);
        } else {
            LOG.info((Object)("Creating non-durable consumer to: " + dest.toString()));
            this.jmsConsumer = this.getSession().createConsumer(dest);
        }
        return this.jmsConsumer;
    }

    public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException {
        if (this.client.isDurable()) {
            String clientName = this.getClientName();
            if (clientName == null) {
                clientName = "JmsConsumer";
                this.setClientName(clientName);
            }
            LOG.info((Object)("Creating durable subscriber (" + clientName + ") to: " + dest.toString()));
            this.jmsConsumer = this.getSession().createDurableSubscriber((Topic)dest, clientName, selector, noLocal);
        } else {
            LOG.info((Object)("Creating non-durable consumer to: " + dest.toString()));
            this.jmsConsumer = this.getSession().createConsumer(dest, selector, noLocal);
        }
        return this.jmsConsumer;
    }

    public MessageConsumer getJmsConsumer() {
        return this.jmsConsumer;
    }

    public JmsClientProperties getClient() {
        return this.client;
    }

    public void setClient(JmsClientProperties clientProps) {
        this.client = (JmsConsumerProperties)clientProps;
    }
}

