/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hcatalog.listener;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hcatalog.messaging.HCatEventMessage;
import org.apache.hcatalog.messaging.MessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationListener
extends MetaStoreEventListener {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
    protected Session session;
    protected Connection conn;
    private static MessageFactory messageFactory = MessageFactory.getInstance();

    public NotificationListener(Configuration conf) {
        super(conf);
        this.createConnection();
    }

    private static String getTopicName(Table table, ListenerEvent partitionEvent) {
        return (String)table.getParameters().get("hcat.msgbus.topic.name");
    }

    public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
        if (partitionEvent.getStatus()) {
            Table table = partitionEvent.getTable();
            List partitions = partitionEvent.getPartitions();
            String topicName = NotificationListener.getTopicName(table, (ListenerEvent)partitionEvent);
            if (topicName != null && !topicName.equals("")) {
                this.send(messageFactory.buildAddPartitionMessage(table, partitions), topicName);
            } else {
                LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + ((Partition)partitions.get(0)).getDbName() + "." + ((Partition)partitions.get(0)).getTableName() + " To enable notifications for this table, please do alter table set properties (" + "hcat.msgbus.topic.name" + "=<dbname>.<tablename>) or whatever you want topic name to be.");
            }
        }
    }

    public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
        if (partitionEvent.getStatus()) {
            Partition partition = partitionEvent.getPartition();
            StorageDescriptor sd = partition.getSd();
            sd.setBucketCols(new ArrayList());
            sd.setSortCols(new ArrayList());
            sd.setParameters(new HashMap());
            sd.getSerdeInfo().setParameters(new HashMap());
            sd.getSkewedInfo().setSkewedColNames(new ArrayList());
            String topicName = NotificationListener.getTopicName(partitionEvent.getTable(), (ListenerEvent)partitionEvent);
            if (topicName != null && !topicName.equals("")) {
                this.send(messageFactory.buildDropPartitionMessage(partitionEvent.getTable(), partition), topicName);
            } else {
                LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName() + "." + partition.getTableName() + " To enable notifications for this table, please do alter table set properties (" + "hcat.msgbus.topic.name" + "=<dbname>.<tablename>) or whatever you want topic name to be.");
            }
        }
    }

    public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
        if (dbEvent.getStatus()) {
            String topicName = this.getTopicPrefix(dbEvent.getHandler().getHiveConf());
            this.send(messageFactory.buildCreateDatabaseMessage(dbEvent.getDatabase()), topicName);
        }
    }

    public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
        if (dbEvent.getStatus()) {
            String topicName = this.getTopicPrefix(dbEvent.getHandler().getHiveConf());
            this.send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
        }
    }

    public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
        if (tableEvent.getStatus()) {
            Table newTbl;
            Table tbl = tableEvent.getTable();
            HiveMetaStore.HMSHandler handler = tableEvent.getHandler();
            HiveConf conf = handler.getHiveConf();
            try {
                newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()).deepCopy();
                newTbl.getParameters().put("hcat.msgbus.topic.name", this.getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "." + newTbl.getTableName().toLowerCase());
                handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
            }
            catch (InvalidOperationException e) {
                MetaException me = new MetaException(e.toString());
                me.initCause((Throwable)e);
                throw me;
            }
            catch (NoSuchObjectException e) {
                MetaException me = new MetaException(e.toString());
                me.initCause((Throwable)e);
                throw me;
            }
            String topicName = this.getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase();
            this.send(messageFactory.buildCreateTableMessage(newTbl), topicName);
        }
    }

    private String getTopicPrefix(HiveConf conf) {
        return conf.get("hcat.msgbus.topic.prefix", "hcat");
    }

    public void onDropTable(DropTableEvent tableEvent) throws MetaException {
        if (tableEvent.getStatus()) {
            Table table = tableEvent.getTable();
            String topicName = this.getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase();
            this.send(messageFactory.buildDropTableMessage(table), topicName);
        }
    }

    protected void send(HCatEventMessage hCatEventMessage, String topicName) {
        try {
            Topic topic;
            if (null == this.session) {
                this.createConnection();
                if (null == this.session) {
                    LOG.error("Invalid session. Failed to send message on topic: " + topicName + " event: " + (Object)((Object)hCatEventMessage.getEventType()));
                    return;
                }
            }
            if (null == (topic = this.getTopic(topicName))) {
                LOG.error("Invalid session. Failed to send message on topic: " + topicName + " event: " + (Object)((Object)hCatEventMessage.getEventType()));
                return;
            }
            MessageProducer producer = this.session.createProducer((Destination)topic);
            TextMessage msg = this.session.createTextMessage(hCatEventMessage.toString());
            msg.setStringProperty("HCAT_EVENT", hCatEventMessage.getEventType().toString());
            msg.setStringProperty("HCAT_MESSAGE_VERSION", messageFactory.getVersion());
            msg.setStringProperty("HCAT_MESSAGE_FORMAT", messageFactory.getMessageFormat());
            producer.send((Message)msg);
            this.session.commit();
        }
        catch (Exception e) {
            LOG.error("Failed to send message on topic: " + topicName + " event: " + (Object)((Object)hCatEventMessage.getEventType()), (Throwable)e);
        }
    }

    protected Topic getTopic(String topicName) throws JMSException {
        Topic topic;
        try {
            topic = this.session.createTopic(topicName);
        }
        catch (IllegalStateException ise) {
            LOG.error("Seems like connection is lost. Retrying", (Throwable)ise);
            this.createConnection();
            topic = this.session.createTopic(topicName);
        }
        return topic;
    }

    protected void createConnection() {
        try {
            InitialContext jndiCntxt = new InitialContext();
            ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
            Connection conn = connFac.createConnection();
            conn.start();
            conn.setExceptionListener(new ExceptionListener(){

                public void onException(JMSException jmse) {
                    LOG.error(jmse.toString());
                }
            });
            this.session = conn.createSession(true, 0);
        }
        catch (NamingException e) {
            LOG.error("JNDI error while setting up Message Bus connection. Please make sure file named 'jndi.properties' is in classpath and contains appropriate key-value pairs.", (Throwable)e);
        }
        catch (JMSException e) {
            LOG.error("Failed to initialize connection to message bus", (Throwable)e);
        }
        catch (Throwable t) {
            LOG.error("Unable to connect to JMS provider", t);
        }
    }

    protected void finalize() throws Throwable {
        try {
            if (null != this.session) {
                this.session.close();
            }
            if (this.conn != null) {
                this.conn.close();
            }
        }
        catch (Exception ignore) {
            LOG.info("Failed to close message bus connection.", (Throwable)ignore);
        }
    }

    public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) throws MetaException {
    }

    public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
    }

    public void onAlterTable(AlterTableEvent ate) throws MetaException {
    }
}

