package org.apache.doris.datasource.hive.event;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSClientException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.class */
public class MetastoreEventsProcessor extends MasterDaemon {
    public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY = "hive.metastore.notifications.add.thrift.objects";
    private static final String GZIP_JSON_FORMAT_PREFIX = "gzip";
    private final MetastoreEventFactory metastoreEventFactory;
    private boolean isRunning;
    private static final Logger LOG = LogManager.getLogger(MetastoreEventsProcessor.class);
    private static final MessageDeserializer JSON_MESSAGE_DESERIALIZER = new JSONMessageDeserializer();
    private static final MessageDeserializer GZIP_JSON_MESSAGE_DESERIALIZER = new GzipJSONMessageDeserializer();

    public MetastoreEventsProcessor() {
        super(MetastoreEventsProcessor.class.getName(), Config.hms_events_polling_interval_ms);
        this.metastoreEventFactory = new MetastoreEventFactory();
        this.isRunning = false;
    }

    private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog hMSExternalCatalog) {
        LOG.debug("Start to pull events on catalog [{}]", hMSExternalCatalog.getName());
        NotificationEventResponse nextEventResponse = hMSExternalCatalog.getNextEventResponse(hMSExternalCatalog);
        return nextEventResponse == null ? Collections.emptyList() : nextEventResponse.getEvents();
    }

    private void doExecute(List<MetastoreEvent> list, HMSExternalCatalog hMSExternalCatalog) {
        for (MetastoreEvent metastoreEvent : list) {
            try {
                metastoreEvent.process();
            } catch (HMSClientException e) {
                if (e.getCause() == null || !(e.getCause() instanceof NoSuchObjectException)) {
                    hMSExternalCatalog.setLastSyncedEventId(metastoreEvent.getEventId() - 1);
                    throw e;
                }
                LOG.warn(metastoreEvent.debugString("Failed to process event and skip", new Object[0]), e);
            } catch (Exception e2) {
                hMSExternalCatalog.setLastSyncedEventId(metastoreEvent.getEventId() - 1);
                throw e2;
            }
        }
    }

    private void processEvents(List<NotificationEvent> list, HMSExternalCatalog hMSExternalCatalog) {
        doExecute(this.metastoreEventFactory.getMetastoreEvents(list, hMSExternalCatalog), hMSExternalCatalog);
        hMSExternalCatalog.setLastSyncedEventId(list.get(list.size() - 1).getEventId());
    }

    @Override // org.apache.doris.common.util.MasterDaemon
    protected void runAfterCatalogReady() {
        if (this.isRunning) {
            LOG.warn("Last task not finished,ignore current task.");
            return;
        }
        this.isRunning = true;
        try {
            realRun();
        } catch (Exception e) {
            LOG.warn("Task failed", e);
        }
        this.isRunning = false;
    }

    private void realRun() {
        Iterator<Long> it = Env.getCurrentEnv().getCatalogMgr().getCatalogIds().iterator();
        while (it.hasNext()) {
            CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(it.next().longValue());
            if (catalog instanceof HMSExternalCatalog) {
                HMSExternalCatalog hMSExternalCatalog = (HMSExternalCatalog) catalog;
                try {
                    List<NotificationEvent> nextHMSEvents = getNextHMSEvents(hMSExternalCatalog);
                    if (!nextHMSEvents.isEmpty()) {
                        LOG.info("Events size are {} on catalog [{}]", Integer.valueOf(nextHMSEvents.size()), hMSExternalCatalog.getName());
                        processEvents(nextHMSEvents, hMSExternalCatalog);
                    }
                } catch (MetastoreNotificationFetchException e) {
                    LOG.warn("Failed to fetch hms events on {}. msg: ", hMSExternalCatalog.getName(), e);
                } catch (Exception e2) {
                    LOG.warn("Failed to process hive metastore [{}] events .", hMSExternalCatalog.getName(), e2);
                }
            }
        }
    }

    public static MessageDeserializer getMessageDeserializer(String str) {
        return (str == null || !str.startsWith(GZIP_JSON_FORMAT_PREFIX)) ? JSON_MESSAGE_DESERIALIZER : GZIP_JSON_MESSAGE_DESERIALIZER;
    }
}
