package org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.odpi.openmetadata.frameworks.auditlog.AuditLoggingComponent;
import org.odpi.openmetadata.frameworks.connectors.ConnectorBase;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.properties.EndpointProperties;
import org.odpi.openmetadata.repositoryservices.connectors.omrstopic.InternalOMRSEventProcessingContext;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode;
import org.odpi.openmetadata.repositoryservices.ffdc.exception.OMRSLogicErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/odpi/openmetadata/repositoryservices/connectors/openmetadatatopic/OpenMetadataTopicConnector.class */
public abstract class OpenMetadataTopicConnector extends ConnectorBase implements OpenMetadataTopic, Runnable, AuditLoggingComponent {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) OpenMetadataTopicConnector.class);
    private static final String defaultThreadName = "OpenMetadataTopicListener";
    private static final String defaultTopicName = "OpenMetadataTopic";
    private volatile boolean keepRunning = false;
    private List<OpenMetadataTopicListener> topicListeners = new ArrayList();
    private String listenerThreadName = defaultThreadName;
    private String topicName = defaultTopicName;
    private int sleepTime = 100;
    protected AuditLog auditLog = null;

    @Override // org.odpi.openmetadata.frameworks.auditlog.AuditLoggingComponent
    public void setAuditLog(AuditLog auditLog) {
        this.auditLog = auditLog;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.auditLog.logMessage(this.listenerThreadName, OMRSAuditCode.OPEN_METADATA_TOPIC_LISTENER_START.getMessageDefinition(this.topicName), getConnection().toString());
        while (this.keepRunning) {
            try {
                try {
                    List<IncomingEvent> checkForIncomingEvents = checkForIncomingEvents();
                    if (checkForIncomingEvents != null && !checkForIncomingEvents.isEmpty()) {
                        for (IncomingEvent incomingEvent : checkForIncomingEvents) {
                            if (incomingEvent != null) {
                                distributeEvent(incomingEvent);
                            }
                        }
                    }
                } catch (Throwable th) {
                    log.error("Bad exception from checkForEvents", th);
                }
                Thread.sleep(this.sleepTime);
            } catch (InterruptedException e) {
                log.info("Wake up for more events");
            }
        }
        this.auditLog.logMessage(this.listenerThreadName, OMRSAuditCode.OPEN_METADATA_TOPIC_LISTENER_SHUTDOWN.getMessageDefinition(this.topicName), getConnection().toString());
    }

    private void distributeEvent(IncomingEvent incomingEvent) {
        InternalOMRSEventProcessingContext.clear();
        InternalOMRSEventProcessingContext.getInstance().setCurrentMessageId(incomingEvent.getMessageId());
        Iterator<OpenMetadataTopicListener> it = this.topicListeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().processEvent(incomingEvent.getJson());
            } catch (Throwable th) {
                this.auditLog.logException("distributeEvent", OMRSAuditCode.EVENT_PROCESSING_ERROR.getMessageDefinition(incomingEvent.getJson(), th.toString()), incomingEvent.getJson(), th);
            }
        }
        incomingEvent.setState(IncomingEventState.DISTRIBUTED_TO_ALL_TOPIC_LISTENERS);
        incomingEvent.addAsyncProcessingResult(InternalOMRSEventProcessingContext.getInstance().getOverallAsyncProcessingResult());
    }

    protected List<IncomingEvent> checkForIncomingEvents() {
        ArrayList arrayList = new ArrayList();
        for (String str : checkForEvents()) {
            arrayList.add(new IncomingEvent(str, String.valueOf(str.hashCode())));
        }
        return arrayList;
    }

    @Deprecated
    protected List<String> checkForEvents() {
        return Collections.emptyList();
    }

    @Override // org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopic
    public String registerListener(OpenMetadataTopicListener openMetadataTopicListener) {
        EndpointProperties endpoint;
        if (openMetadataTopicListener == null) {
            throw new OMRSLogicErrorException(OMRSErrorCode.NULL_OPEN_METADATA_TOPIC_LISTENER.getMessageDefinition(this.listenerThreadName, this.topicName), getClass().getName(), "registerListener");
        }
        this.topicListeners.add(openMetadataTopicListener);
        if (this.connectionProperties != null && (endpoint = this.connectionProperties.getEndpoint()) != null) {
            this.topicName = endpoint.getAddress();
        }
        return this.topicName;
    }

    @Override // org.odpi.openmetadata.frameworks.connectors.ConnectorBase, org.odpi.openmetadata.frameworks.connectors.Connector
    public void start() throws ConnectorCheckedException {
        super.start();
        this.keepRunning = true;
        if (this.connectionProperties != null) {
            EndpointProperties endpoint = this.connectionProperties.getEndpoint();
            if (endpoint != null) {
                this.topicName = endpoint.getAddress();
                this.listenerThreadName = "OpenMetadataTopicListener: " + this.topicName;
            }
            Map<String, Object> configurationProperties = this.connectionProperties.getConfigurationProperties();
            if (configurationProperties != null) {
                Object obj = configurationProperties.get("sleepTime");
                if (obj instanceof Integer) {
                    this.sleepTime = ((Integer) obj).intValue();
                }
            }
        }
        new Thread(this, this.listenerThreadName).start();
    }

    @Override // org.odpi.openmetadata.frameworks.connectors.ConnectorBase, org.odpi.openmetadata.frameworks.connectors.Connector
    public void disconnect() throws ConnectorCheckedException {
        super.disconnect();
        this.keepRunning = false;
    }
}
