/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.hcatalog.listener;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hive.hcatalog.messaging.HCatEventMessage;
import org.apache.hive.hcatalog.messaging.MessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationListener
extends MetaStoreEventListener {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
    protected Connection conn;
    private static MessageFactory messageFactory = MessageFactory.getInstance();
    public static final int NUM_RETRIES = 1;
    private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check";
    private static final String HEALTH_CHECK_MSG = "HCAT_JMS_HEALTH_CHECK_MESSAGE";
    protected final ThreadLocal<Session> session = new ThreadLocal<Session>(){

        @Override
        protected Session initialValue() {
            try {
                return NotificationListener.this.createSession();
            }
            catch (Exception e) {
                LOG.error("Couldn't create JMS Session", (Throwable)e);
                return null;
            }
        }

        @Override
        public void remove() {
            if (this.get() != null) {
                try {
                    ((Session)this.get()).close();
                }
                catch (Exception e) {
                    LOG.error("Unable to close bad JMS session, ignored error", (Throwable)e);
                }
            }
            super.remove();
        }
    };

    public NotificationListener(Configuration conf) {
        super(conf);
        this.testAndCreateConnection();
    }

    private static String getTopicName(Table table, ListenerEvent partitionEvent) {
        return (String)table.getParameters().get("hcat.msgbus.topic.name");
    }

    public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
        if (partitionEvent.getStatus()) {
            Table table = partitionEvent.getTable();
            List partitions = partitionEvent.getPartitions();
            String topicName = NotificationListener.getTopicName(table, (ListenerEvent)partitionEvent);
            if (topicName != null && !topicName.equals("")) {
                this.send(messageFactory.buildAddPartitionMessage(table, partitions), topicName);
            } else {
                LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + ((Partition)partitions.get(0)).getDbName() + "." + ((Partition)partitions.get(0)).getTableName() + " To enable notifications for this table, please do alter table set properties (" + "hcat.msgbus.topic.name" + "=<dbname>.<tablename>) or whatever you want topic name to be.");
            }
        }
    }

    public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
        if (partitionEvent.getStatus()) {
            Partition partition = partitionEvent.getPartition();
            StorageDescriptor sd = partition.getSd();
            sd.setBucketCols(new ArrayList());
            sd.setSortCols(new ArrayList());
            sd.setParameters(new HashMap());
            sd.getSerdeInfo().setParameters(new HashMap());
            sd.getSkewedInfo().setSkewedColNames(new ArrayList());
            String topicName = NotificationListener.getTopicName(partitionEvent.getTable(), (ListenerEvent)partitionEvent);
            if (topicName != null && !topicName.equals("")) {
                this.send(messageFactory.buildDropPartitionMessage(partitionEvent.getTable(), partition), topicName);
            } else {
                LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName() + "." + partition.getTableName() + " To enable notifications for this table, please do alter table set properties (" + "hcat.msgbus.topic.name" + "=<dbname>.<tablename>) or whatever you want topic name to be.");
            }
        }
    }

    public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
        if (dbEvent.getStatus()) {
            String topicName = this.getTopicPrefix((Configuration)dbEvent.getHandler().getHiveConf());
            this.send(messageFactory.buildCreateDatabaseMessage(dbEvent.getDatabase()), topicName);
        }
    }

    public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
        if (dbEvent.getStatus()) {
            String topicName = this.getTopicPrefix((Configuration)dbEvent.getHandler().getHiveConf());
            this.send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
        }
    }

    public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
        if (tableEvent.getStatus()) {
            Table newTbl;
            Table tbl = tableEvent.getTable();
            HiveMetaStore.HMSHandler handler = tableEvent.getHandler();
            HiveConf conf = handler.getHiveConf();
            try {
                newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()).deepCopy();
                newTbl.getParameters().put("hcat.msgbus.topic.name", this.getTopicPrefix((Configuration)conf) + "." + newTbl.getDbName().toLowerCase() + "." + newTbl.getTableName().toLowerCase());
                handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
            }
            catch (InvalidOperationException e) {
                MetaException me = new MetaException(e.toString());
                me.initCause((Throwable)e);
                throw me;
            }
            catch (NoSuchObjectException e) {
                MetaException me = new MetaException(e.toString());
                me.initCause((Throwable)e);
                throw me;
            }
            String topicName = this.getTopicPrefix((Configuration)conf) + "." + newTbl.getDbName().toLowerCase();
            this.send(messageFactory.buildCreateTableMessage(newTbl), topicName);
        }
    }

    private String getTopicPrefix(Configuration conf) {
        return conf.get("hcat.msgbus.topic.prefix", "hcat");
    }

    public void onDropTable(DropTableEvent tableEvent) throws MetaException {
        if (tableEvent.getStatus()) {
            Table table = tableEvent.getTable();
            String topicName = this.getTopicPrefix((Configuration)tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase();
            this.send(messageFactory.buildDropTableMessage(table), topicName);
        }
    }

    protected void send(HCatEventMessage hCatEventMessage, String topicName) {
        this.send(hCatEventMessage, topicName, 1);
    }

    protected void send(HCatEventMessage hCatEventMessage, String topicName, int retries) {
        try {
            if (this.session.get() == null) {
                throw new JMSException("Invalid JMS session");
            }
            Topic topic = this.createTopic(topicName);
            TextMessage msg = this.session.get().createTextMessage(hCatEventMessage.toString());
            msg.setStringProperty("HCAT_EVENT", hCatEventMessage.getEventType().toString());
            msg.setStringProperty("HCAT_MESSAGE_VERSION", messageFactory.getVersion());
            msg.setStringProperty("HCAT_MESSAGE_FORMAT", messageFactory.getMessageFormat());
            MessageProducer producer = this.createProducer((Destination)topic);
            producer.send((Message)msg);
            this.session.get().commit();
        }
        catch (Exception e) {
            if (retries >= 0) {
                LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", (Throwable)e);
                this.testAndCreateConnection();
                this.send(hCatEventMessage, topicName, retries - 1);
            }
            LOG.error("Failed to send message on topic: " + topicName + " event: " + (Object)((Object)hCatEventMessage.getEventType()) + " after retries: " + 1, (Throwable)e);
        }
    }

    protected Topic createTopic(String topicName) throws JMSException {
        return this.session.get().createTopic(topicName);
    }

    protected synchronized void testAndCreateConnection() {
        if (this.conn != null) {
            this.session.remove();
            if (!this.isConnectionHealthy()) {
                try {
                    this.conn.close();
                }
                catch (Exception e) {
                    LOG.error("Unable to close bad JMS connection, ignored error", (Throwable)e);
                }
                this.conn = this.createConnection();
            }
        } else {
            this.conn = this.createConnection();
        }
        try {
            this.session.set(this.createSession());
        }
        catch (JMSException e) {
            LOG.error("Couldn't create JMS session, ignored the error", (Throwable)e);
        }
    }

    protected Connection createConnection() {
        LOG.info("Will create new JMS connection");
        Connection jmsConnection = null;
        try {
            InitialContext jndiCntxt = new InitialContext();
            ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
            jmsConnection = connFac.createConnection();
            jmsConnection.start();
            jmsConnection.setExceptionListener(new ExceptionListener(){

                public void onException(JMSException jmse) {
                    LOG.error("JMS Exception listener received exception. Ignored the error", (Throwable)jmse);
                }
            });
        }
        catch (NamingException e) {
            LOG.error("JNDI error while setting up Message Bus connection. Please make sure file named 'jndi.properties' is in classpath and contains appropriate key-value pairs.", (Throwable)e);
        }
        catch (JMSException e) {
            LOG.error("Failed to initialize connection to message bus", (Throwable)e);
        }
        catch (Throwable t) {
            LOG.error("Unable to connect to JMS provider", t);
        }
        return jmsConnection;
    }

    protected boolean isConnectionHealthy() {
        try {
            Topic topic = this.createTopic(this.getTopicPrefix(this.getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX);
            MessageProducer producer = this.createProducer((Destination)topic);
            TextMessage msg = this.session.get().createTextMessage(HEALTH_CHECK_MSG);
            producer.send((Message)msg, 1, 4, 0L);
        }
        catch (Exception e) {
            return false;
        }
        return true;
    }

    protected Session createSession() throws JMSException {
        return this.conn.createSession(true, 0);
    }

    protected MessageProducer createProducer(Destination topic) throws JMSException {
        return this.session.get().createProducer(topic);
    }

    protected void finalize() throws Throwable {
        if (this.conn != null) {
            try {
                this.conn.close();
            }
            catch (Exception e) {
                LOG.error("Couldn't close jms connection, ignored the error", (Throwable)e);
            }
        }
    }

    public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) throws MetaException {
    }

    public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
    }

    public void onAlterTable(AlterTableEvent ate) throws MetaException {
    }
}

