/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.jms.bolt;

import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.storm.jms.JmsMessageProducer;
import org.apache.storm.jms.JmsProvider;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsBolt
extends BaseTickTupleAwareRichBolt {
    private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class);
    private boolean autoAck = true;
    private Connection connection;
    private Session session;
    private MessageProducer messageProducer;
    private boolean jmsTransactional = false;
    private int jmsAcknowledgeMode = 1;
    private JmsProvider jmsProvider;
    private JmsMessageProducer producer;
    private OutputCollector collector;

    public void setJmsProvider(JmsProvider provider) {
        this.jmsProvider = provider;
    }

    public void setJmsMessageProducer(JmsMessageProducer producer) {
        this.producer = producer;
    }

    public void setJmsAcknowledgeMode(int acknowledgeMode) {
        this.jmsAcknowledgeMode = acknowledgeMode;
    }

    public void setAutoAck(boolean autoAck) {
        this.autoAck = autoAck;
    }

    protected void process(Tuple input) {
        LOG.debug("Tuple received. Sending JMS message.");
        try {
            Message msg = this.producer.toMessage(this.session, (ITuple)input);
            if (msg != null) {
                if (msg.getJMSDestination() != null) {
                    this.messageProducer.send(msg.getJMSDestination(), msg);
                } else {
                    this.messageProducer.send(msg);
                }
            }
            if (this.autoAck) {
                LOG.debug("ACKing tuple: " + input);
                this.collector.ack(input);
            }
        }
        catch (JMSException e) {
            LOG.warn("Failing tuple: " + input);
            LOG.warn("Exception: ", (Throwable)e);
            this.collector.fail(input);
        }
    }

    public void cleanup() {
        try {
            LOG.debug("Closing JMS connection.");
            this.session.close();
            this.connection.close();
        }
        catch (JMSException e) {
            LOG.warn("Error closing JMS connection.", (Throwable)e);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        if (this.jmsProvider == null || this.producer == null) {
            throw new IllegalStateException("JMS Provider and MessageProducer not set.");
        }
        this.collector = collector;
        LOG.debug("Connecting JMS..");
        try {
            ConnectionFactory cf = this.jmsProvider.connectionFactory();
            Destination dest = this.jmsProvider.destination();
            this.connection = cf.createConnection();
            this.session = this.connection.createSession(this.jmsTransactional, this.jmsAcknowledgeMode);
            this.messageProducer = this.session.createProducer(dest);
            this.connection.start();
        }
        catch (Exception e) {
            LOG.warn("Error creating JMS connection.", (Throwable)e);
        }
    }
}

