/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.mb.integration.common.clients;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.NamingException;
import org.apache.log4j.Logger;
import org.wso2.mb.integration.common.clients.AndesJMSBase;
import org.wso2.mb.integration.common.clients.configurations.AndesJMSConsumerClientConfiguration;
import org.wso2.mb.integration.common.clients.exceptions.AndesClientException;
import org.wso2.mb.integration.common.clients.operations.utils.AndesClientUtils;
import org.wso2.mb.integration.common.clients.operations.utils.ExchangeType;
import org.wso2.mb.integration.common.clients.operations.utils.JMSDeliveryStatus;

public class AndesJMSConsumer
extends AndesJMSBase
implements Runnable,
MessageListener {
    private static Logger log = Logger.getLogger(AndesJMSConsumer.class);
    private final AndesJMSConsumerClientConfiguration consumerConfig;
    private long firstMessageConsumedTimestamp;
    private long lastMessageConsumedTimestamp;
    private AtomicLong receivedMessageCount = new AtomicLong(0L);
    private long totalLatency;
    private Connection connection;
    private Session session;
    private MessageConsumer receiver;

    public AndesJMSConsumer(AndesJMSConsumerClientConfiguration config, boolean createConsumer) throws NamingException, JMSException {
        super(config);
        this.consumerConfig = config;
        if (createConsumer) {
            if (ExchangeType.QUEUE == this.consumerConfig.getExchangeType()) {
                this.createQueueConnection();
            } else if (ExchangeType.TOPIC == this.consumerConfig.getExchangeType()) {
                this.createTopicConnection();
            }
        }
    }

    private void createTopicConnection() throws NamingException, JMSException {
        TopicConnectionFactory connFactory = (TopicConnectionFactory)super.getInitialContext().lookup("andesConnectionfactory");
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.setClientID(this.consumerConfig.getSubscriptionID());
        topicConnection.start();
        TopicSession topicSession = 0 == this.consumerConfig.getAcknowledgeMode().getType() ? topicConnection.createTopicSession(true, this.consumerConfig.getAcknowledgeMode().getType()) : topicConnection.createTopicSession(false, this.consumerConfig.getAcknowledgeMode().getType());
        Topic topic = (Topic)super.getInitialContext().lookup(this.consumerConfig.getDestinationName());
        this.connection = topicConnection;
        this.session = topicSession;
        this.receiver = this.consumerConfig.isDurable() ? (null != this.consumerConfig.getSelectors() ? topicSession.createDurableSubscriber(topic, this.consumerConfig.getSubscriptionID(), this.consumerConfig.getSelectors(), false) : topicSession.createDurableSubscriber(topic, this.consumerConfig.getSubscriptionID())) : (null != this.consumerConfig.getSelectors() ? topicSession.createSubscriber(topic, this.consumerConfig.getSelectors(), false) : topicSession.createSubscriber(topic));
    }

    private void createQueueConnection() throws NamingException, JMSException {
        QueueConnectionFactory connFactory = (QueueConnectionFactory)super.getInitialContext().lookup("andesConnectionfactory");
        QueueConnection queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        QueueSession queueSession = 0 == this.consumerConfig.getAcknowledgeMode().getType() ? queueConnection.createQueueSession(true, this.consumerConfig.getAcknowledgeMode().getType()) : queueConnection.createQueueSession(false, this.consumerConfig.getAcknowledgeMode().getType());
        Queue queue = (Queue)super.getInitialContext().lookup(this.consumerConfig.getDestinationName());
        this.connection = queueConnection;
        this.session = queueSession;
        this.receiver = null != this.consumerConfig.getSelectors() ? queueSession.createReceiver(queue, this.consumerConfig.getSelectors()) : queueSession.createReceiver(queue);
    }

    @Override
    public void startClient() throws AndesClientException, JMSException {
        if (null != this.connection && null != this.session && null != this.receiver) {
            log.info((Object)"Starting Consumer");
            if (this.consumerConfig.isAsync()) {
                this.receiver.setMessageListener((MessageListener)this);
            } else {
                Thread consumerThread = new Thread(this);
                consumerThread.start();
            }
        } else {
            throw new AndesClientException("The connection, session and message receiver is not assigned.");
        }
    }

    @Override
    public void stopClient() {
        Thread stopThread = new Thread(new Runnable(){

            @Override
            public void run() {
                if (null != AndesJMSConsumer.this.connection && null != AndesJMSConsumer.this.session && null != AndesJMSConsumer.this.receiver) {
                    try {
                        log.info((Object)"Closing Consumer");
                        if (ExchangeType.TOPIC == AndesJMSConsumer.this.consumerConfig.getExchangeType()) {
                            if (null != AndesJMSConsumer.this.receiver) {
                                TopicSubscriber topicSubscriber = (TopicSubscriber)AndesJMSConsumer.this.receiver;
                                topicSubscriber.close();
                            }
                            if (null != AndesJMSConsumer.this.session) {
                                TopicSession topicSession = (TopicSession)AndesJMSConsumer.this.session;
                                topicSession.close();
                            }
                            if (null != AndesJMSConsumer.this.connection) {
                                TopicConnection topicConnection = (TopicConnection)AndesJMSConsumer.this.connection;
                                topicConnection.close();
                            }
                        } else if (ExchangeType.QUEUE == AndesJMSConsumer.this.consumerConfig.getExchangeType()) {
                            if (null != AndesJMSConsumer.this.receiver) {
                                QueueReceiver queueReceiver = (QueueReceiver)AndesJMSConsumer.this.receiver;
                                queueReceiver.close();
                            }
                            if (null != AndesJMSConsumer.this.session) {
                                QueueSession queueSession = (QueueSession)AndesJMSConsumer.this.session;
                                queueSession.close();
                            }
                            if (null != AndesJMSConsumer.this.connection) {
                                QueueConnection queueConnection = (QueueConnection)AndesJMSConsumer.this.connection;
                                queueConnection.stop();
                                queueConnection.close();
                            }
                        }
                        AndesJMSConsumer.this.receiver = null;
                        AndesJMSConsumer.this.session = null;
                        AndesJMSConsumer.this.connection = null;
                        log.info((Object)"Consumer Closed");
                    }
                    catch (JMSException e) {
                        log.error((Object)"Error in stopping client.", (Throwable)e);
                        throw new RuntimeException("Error in stopping client.", e);
                    }
                }
            }
        });
        stopThread.start();
        try {
            stopThread.join();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Error waiting for subscriber to stop", e);
        }
    }

    public void stopClientSync() {
        if (null != this.connection && null != this.session && null != this.receiver) {
            try {
                log.info((Object)"Closing Consumer");
                if (ExchangeType.TOPIC == this.consumerConfig.getExchangeType()) {
                    if (null != this.receiver) {
                        TopicSubscriber topicSubscriber = (TopicSubscriber)this.receiver;
                        topicSubscriber.close();
                    }
                    if (null != this.session) {
                        TopicSession topicSession = (TopicSession)this.session;
                        topicSession.close();
                    }
                    if (null != this.connection) {
                        TopicConnection topicConnection = (TopicConnection)this.connection;
                        topicConnection.close();
                    }
                } else if (ExchangeType.QUEUE == this.consumerConfig.getExchangeType()) {
                    if (null != this.receiver) {
                        QueueReceiver queueReceiver = (QueueReceiver)this.receiver;
                        queueReceiver.close();
                    }
                    if (null != this.session) {
                        QueueSession queueSession = (QueueSession)this.session;
                        queueSession.close();
                    }
                    if (null != this.connection) {
                        QueueConnection queueConnection = (QueueConnection)this.connection;
                        queueConnection.stop();
                        queueConnection.close();
                    }
                }
                this.receiver = null;
                this.session = null;
                this.connection = null;
                log.info((Object)"Consumer Closed");
            }
            catch (JMSException e) {
                log.error((Object)"Error in stopping client.", (Throwable)e);
                throw new RuntimeException("Error in stopping client.", e);
            }
        }
    }

    public void unSubscribe(final boolean stopClient) throws JMSException {
        Thread unsubscribeThread = new Thread(new Runnable(){

            /*
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            @Override
            public void run() {
                if (null != AndesJMSConsumer.this.connection && null != AndesJMSConsumer.this.session && null != AndesJMSConsumer.this.receiver) {
                    try {
                        log.info((Object)"Un-subscribing Subscriber");
                        AndesJMSConsumer.this.session.unsubscribe(AndesJMSConsumer.this.consumerConfig.getSubscriptionID());
                        log.info((Object)"Subscriber Un-Subscribed");
                        if (!stopClient) return;
                        AndesJMSConsumer.this.stopClient();
                        return;
                    }
                    catch (JMSException e) {
                        log.error((Object)"Error in removing subscription(un-subscribing).", (Throwable)e);
                        throw new RuntimeException("JMSException : Error in removing subscription(un-subscribing).", e);
                    }
                } else {
                    AndesClientException andesClientException = new AndesClientException("The connection, session and message receiver is not assigned.");
                    log.error((Object)"The connection, session and message receiver is not assigned.", (Throwable)andesClientException);
                    throw new RuntimeException("The connection, session and message receiver is not assigned.", andesClientException);
                }
            }
        });
        unsubscribeThread.start();
        try {
            unsubscribeThread.join();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Error waiting for consumer to unsubscribe", e);
        }
    }

    @Override
    public void run() {
        try {
            Message message;
            boolean interrupted = false;
            do {
                if (null != (message = this.receiver.receive())) continue;
                interrupted = true;
                break;
            } while (!this.processReceivedMessage(message));
            if (!interrupted) {
                this.stopClientSync();
            }
        }
        catch (JMSException e) {
            log.error((Object)"Error while receiving messages ", (Throwable)e);
            throw new RuntimeException("JMSException : Error while listening to messages", e);
        }
        catch (IOException e) {
            log.error((Object)"Error while writing message to file", (Throwable)e);
            throw new RuntimeException("IOException : Error while writing message to file\"", e);
        }
    }

    public void onMessage(Message message) {
        try {
            boolean success = this.processReceivedMessage(message);
            if (success) {
                this.stopClient();
            }
        }
        catch (JMSException e) {
            log.error((Object)"Error while listening to messages", (Throwable)e);
            throw new RuntimeException("Error while listening to messages", e);
        }
        catch (IOException e) {
            log.error((Object)"Error while writing message to file", (Throwable)e);
            throw new RuntimeException("Error while listening to messages", e);
        }
    }

    private boolean processReceivedMessage(Message message) throws JMSException, IOException {
        if (null != message) {
            long threadID = Thread.currentThread().getId();
            long currentTimeStamp = System.currentTimeMillis();
            this.totalLatency += currentTimeStamp - message.getJMSTimestamp();
            if (0L == this.firstMessageConsumedTimestamp) {
                this.firstMessageConsumedTimestamp = currentTimeStamp;
            }
            this.lastMessageConsumedTimestamp = currentTimeStamp;
            this.receivedMessageCount.incrementAndGet();
            JMSDeliveryStatus deliveryStatus = message.getJMSRedelivered() ? JMSDeliveryStatus.REDELIVERED : JMSDeliveryStatus.ORIGINAL;
            if (0L == this.receivedMessageCount.get() % this.consumerConfig.getPrintsPerMessageCount()) {
                log.info((Object)("[RECEIVE] ThreadID:" + threadID + " Destination(" + this.consumerConfig.getExchangeType().getType() + "):" + this.consumerConfig.getDestinationName() + " ReceivedMessageCount:" + this.receivedMessageCount + " MessageToReceive:" + this.consumerConfig.getMaximumMessagesToReceived() + " Original/Redelivered:" + deliveryStatus.getStatus()));
            }
            if (null != this.consumerConfig.getFilePathToWriteStatistics()) {
                String statisticsString = Long.toString(currentTimeStamp) + "," + Double.toString(this.getConsumerTPS()) + "," + Double.toString(this.getAverageLatency());
                AndesClientUtils.writeStatisticsToFile(statisticsString, this.consumerConfig.getFilePathToWriteStatistics());
            }
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage)message;
                if (null != this.consumerConfig.getFilePathToWriteReceivedMessages()) {
                    AndesClientUtils.writeReceivedMessagesToFile(textMessage.getText(), this.consumerConfig.getFilePathToWriteReceivedMessages());
                } else {
                    this.consumerConfig.addReceivedMessage(textMessage.getText());
                }
            }
            if (0L == this.receivedMessageCount.get() % this.consumerConfig.getAcknowledgeAfterEachMessageCount() && 2 == this.session.getAcknowledgeMode()) {
                message.acknowledge();
                log.info((Object)("Acknowledging message : " + message.getJMSMessageID()));
            }
            if (0L == this.receivedMessageCount.get() % this.consumerConfig.getCommitAfterEachMessageCount()) {
                this.session.commit();
                log.info((Object)"Committed session");
            } else if (0L == this.receivedMessageCount.get() % this.consumerConfig.getRollbackAfterEachMessageCount()) {
                this.session.rollback();
                log.info((Object)"Roll-backed session");
            } else if (0L == this.receivedMessageCount.get() % this.consumerConfig.getRecoverAfterEachMessageCount()) {
                log.info((Object)"Recovering session");
                this.session.recover();
            }
            if (this.receivedMessageCount.get() >= this.consumerConfig.getUnSubscribeAfterEachMessageCount()) {
                this.unSubscribe(true);
                AndesClientUtils.sleepForInterval(1000L);
                return true;
            }
            if (this.receivedMessageCount.get() >= this.consumerConfig.getMaximumMessagesToReceived()) {
                return true;
            }
            if (0L < this.consumerConfig.getRunningDelay()) {
                try {
                    Thread.sleep(this.consumerConfig.getRunningDelay());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        return false;
    }

    public AtomicLong getReceivedMessageCount() {
        return this.receivedMessageCount;
    }

    public double getConsumerTPS() {
        if (0L == this.lastMessageConsumedTimestamp - this.firstMessageConsumedTimestamp) {
            return this.receivedMessageCount.doubleValue() / 0.001;
        }
        return this.receivedMessageCount.doubleValue() / ((double)(this.lastMessageConsumedTimestamp - this.firstMessageConsumedTimestamp) / 1000.0);
    }

    public double getAverageLatency() {
        if (0.0 == this.receivedMessageCount.doubleValue()) {
            log.warn((Object)"No messages were received to calculate average latency.");
            return 0.0;
        }
        return (double)this.totalLatency / 1000.0 / this.receivedMessageCount.doubleValue();
    }

    @Override
    public AndesJMSConsumerClientConfiguration getConfig() {
        return this.consumerConfig;
    }

    public Connection getConnection() {
        return this.connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public Session getSession() {
        return this.session;
    }

    public void setSession(Session session) {
        this.session = session;
    }

    public MessageConsumer getReceiver() {
        return this.receiver;
    }

    public void setReceiver(MessageConsumer receiver) {
        this.receiver = receiver;
    }
}

