/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.jms.spout;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.storm.jms.JmsProvider;
import org.apache.storm.jms.JmsTupleProducer;
import org.apache.storm.jms.spout.JmsMessageID;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsSpout
extends BaseRichSpout {
    private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
    private static final int POLL_INTERVAL_MS = 50;
    private int jmsAcknowledgeMode = 1;
    private boolean distributed = true;
    private MessageHandler messageHandler = new MessageHandler();
    private JmsTupleProducer tupleProducer;
    private JmsProvider jmsProvider;
    private long messageSequence = 0L;
    private SpoutOutputCollector collector;
    private transient Connection connection;
    private transient Session session;
    private MessageConsumer consumer;
    private boolean individualAcks;

    public void setJmsAcknowledgeMode(int mode) {
        switch (mode) {
            case 1: 
            case 3: {
                this.messageHandler = new MessageHandler();
                break;
            }
            case 2: {
                this.messageHandler = new ClientAckHandler();
                break;
            }
            case 0: {
                this.messageHandler = new TransactedSessionMessageHandler();
                break;
            }
            default: {
                LOG.warn("Unsupported Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
            }
        }
        this.jmsAcknowledgeMode = mode;
    }

    private void validateJmsAckMode() {
        if (this.jmsAcknowledgeMode != 1 && this.jmsAcknowledgeMode != 3 && this.jmsAcknowledgeMode != 2 && this.jmsAcknowledgeMode != 0) {
            LOG.warn("Unsupported Acknowledge mode: " + this.jmsAcknowledgeMode + " (See javax.jms.Session for valid values)");
            if (this.individualAcks) {
                LOG.warn("Allowing vendor specific mode due to setIndividualAcks");
            } else {
                throw new IllegalArgumentException("UnsupportedAcknowledge mode: " + this.jmsAcknowledgeMode);
            }
        }
    }

    public int getJmsAcknowledgeMode() {
        return this.jmsAcknowledgeMode;
    }

    public void setJmsProvider(JmsProvider provider) {
        this.jmsProvider = provider;
    }

    public void setJmsTupleProducer(JmsTupleProducer producer) {
        this.tupleProducer = producer;
    }

    public void setIndividualAcks() {
        this.individualAcks = true;
        this.messageHandler = new MessageAckHandler();
    }

    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
        if (this.jmsProvider == null) {
            throw new IllegalStateException("JMS provider has not been set.");
        }
        if (this.tupleProducer == null) {
            throw new IllegalStateException("JMS Tuple Producer has not been set.");
        }
        this.validateJmsAckMode();
        this.collector = spoutOutputCollector;
        try {
            ConnectionFactory cf = this.jmsProvider.connectionFactory();
            Destination dest = this.jmsProvider.destination();
            this.connection = cf.createConnection();
            this.session = this.messageHandler.createSession(this.connection);
            this.consumer = this.session.createConsumer(dest);
            this.connection.start();
        }
        catch (Exception e) {
            LOG.warn("Error creating JMS connection.", (Throwable)e);
        }
    }

    public void close() {
        try {
            LOG.debug("Closing JMS connection.");
            this.session.close();
            this.connection.close();
        }
        catch (JMSException e) {
            LOG.warn("Error closing JMS connection.", (Throwable)e);
        }
    }

    public void nextTuple() {
        try {
            Message msg = this.consumer.receive(50L);
            if (msg != null) {
                LOG.debug("sending tuple {}", (Object)msg);
                this.messageHandler.emit(msg);
            }
        }
        catch (JMSException ex) {
            LOG.warn("Got error trying to process tuple", (Throwable)ex);
        }
    }

    public void ack(Object msgId) {
        LOG.debug("Received ACK for message: {}", msgId);
        this.messageHandler.ack(msgId);
    }

    public void fail(Object msgId) {
        LOG.warn("Received fail for message {}", msgId);
        this.messageHandler.fail(msgId);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this.tupleProducer.declareOutputFields(declarer);
    }

    public boolean isDistributed() {
        return this.distributed;
    }

    public void setDistributed(boolean isDistributed) {
        this.distributed = isDistributed;
    }

    protected Session getSession() {
        return this.session;
    }

    public Map<String, Object> getComponentConfiguration() {
        return this.distributed ? null : Collections.singletonMap("topology.max.task.parallelism", 1);
    }

    private class TransactedSessionMessageHandler
    extends MessageAckHandler {
        private TransactedSessionMessageHandler() {
        }

        @Override
        protected void doAck(Message msg) throws JMSException {
            if (this.getPendingAcks().isEmpty()) {
                JmsSpout.this.session.commit();
                LOG.debug("JMS session committed");
            } else {
                LOG.debug("Not committing the session since there are pending messages in the session");
            }
        }

        @Override
        protected void doFail() throws JMSException {
            LOG.info("Triggering session rollback");
            JmsSpout.this.session.rollback();
        }

        @Override
        Session createSession(Connection conn) throws JMSException {
            return conn.createSession(true, JmsSpout.this.jmsAcknowledgeMode);
        }
    }

    private class ClientAckHandler
    extends MessageAckHandler {
        private ClientAckHandler() {
        }

        @Override
        protected void doAck(Message msg) throws JMSException {
            if (this.getPendingAcks().isEmpty()) {
                msg.acknowledge();
                LOG.debug("JMS message acked");
            } else {
                LOG.debug("Not acknowledging the JMS message since there are pending messages in the session");
            }
        }
    }

    private class MessageAckHandler
    extends MessageHandler {
        private Map<JmsMessageID, Message> pendingAcks;

        private MessageAckHandler() {
            this.pendingAcks = new HashMap<JmsMessageID, Message>();
        }

        @Override
        void emit(Message msg) {
            LOG.debug("Received msg {}, Requesting acks.", (Object)msg);
            try {
                JmsMessageID messageId = new JmsMessageID(JmsSpout.this.messageSequence++, msg.getJMSMessageID());
                Values vals = JmsSpout.this.tupleProducer.toTuple(msg);
                JmsSpout.this.collector.emit((List)vals, (Object)messageId);
                this.pendingAcks.put(messageId, msg);
            }
            catch (JMSException ex) {
                LOG.warn("Error processing message {}", (Object)msg);
            }
        }

        @Override
        void ack(Object msgId) {
            if (this.pendingAcks.isEmpty()) {
                LOG.debug("Not processing the ACK, pendingAcks is empty");
            } else {
                Message msg = this.pendingAcks.remove(msgId);
                if (msg != null) {
                    try {
                        this.doAck(msg);
                    }
                    catch (JMSException e) {
                        LOG.warn("Error acknowledging JMS message: {}", msgId, (Object)e);
                    }
                } else {
                    LOG.warn("Couldn't acknowledge unknown JMS message: {}", msgId);
                }
            }
        }

        @Override
        void fail(Object msgId) {
            try {
                if (!this.pendingAcks.isEmpty()) {
                    this.pendingAcks.clear();
                    this.doFail();
                }
            }
            catch (JMSException ex) {
                LOG.warn("Error during session recovery", (Throwable)ex);
            }
        }

        protected void doAck(Message msg) throws JMSException {
            msg.acknowledge();
            LOG.debug("JMS message acked");
        }

        protected void doFail() throws JMSException {
            LOG.info("Triggering session recovery");
            JmsSpout.this.getSession().recover();
        }

        protected Map<JmsMessageID, Message> getPendingAcks() {
            return this.pendingAcks;
        }
    }

    private class MessageHandler
    implements Serializable {
        private MessageHandler() {
        }

        void emit(Message msg) {
            LOG.debug("Received msg {}", (Object)msg);
            try {
                Values vals = JmsSpout.this.tupleProducer.toTuple(msg);
                JmsSpout.this.collector.emit((List)vals);
            }
            catch (JMSException ex) {
                LOG.warn("Error processing message {}", (Object)msg);
            }
        }

        void ack(Object msgId) {
        }

        void fail(Object msgId) {
        }

        Session createSession(Connection conn) throws JMSException {
            return conn.createSession(false, JmsSpout.this.jmsAcknowledgeMode);
        }
    }
}

