/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.metrics;

import io.debezium.connector.spanner.metrics.event.LatencyMetricEvent;
import io.debezium.connector.spanner.metrics.event.MetricEvent;
import io.debezium.connector.spanner.metrics.latency.LatencyCalculator;
import io.debezium.connector.spanner.processor.SourceRecordUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricsEventPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(MetricsEventPublisher.class);
    private final Map<Class<? extends MetricEvent>, Consumer<? extends MetricEvent>> subscribes = new ConcurrentHashMap<Class<? extends MetricEvent>, Consumer<? extends MetricEvent>>();

    public <T extends MetricEvent> void publishMetricEvent(T metricEvent) {
        Consumer<? extends MetricEvent> consumer = this.subscribes.get(metricEvent.getClass());
        if (consumer != null) {
            try {
                consumer.accept(metricEvent);
            }
            catch (Exception ex) {
                LOGGER.warn("Failed to process metric event: " + metricEvent, (Throwable)ex);
            }
        }
    }

    public <T extends MetricEvent> void subscribe(Class<T> clazz, Consumer<T> consumer) {
        if (this.subscribes.containsKey(clazz)) {
            throw new IllegalStateException();
        }
        this.subscribes.put(clazz, consumer);
    }

    public void logLatency(SourceRecord sourceRecord) {
        Long lowWatermarkLag;
        Long ownConnectorLatency;
        Long emitToPublishLatency;
        Long commitToPublishLatency;
        Long commitToEmitLatency;
        Long spannerLatency;
        Long readToEmitLatency;
        if (!SourceRecordUtils.isDataChangeRecord(sourceRecord)) {
            return;
        }
        Long totalLatency = LatencyCalculator.getTotalLatency(sourceRecord);
        if (totalLatency != null && totalLatency > 300000L) {
            LOGGER.debug("Published very high total latency for source record {}:{}", (Object)sourceRecord, (Object)totalLatency);
        }
        if ((readToEmitLatency = LatencyCalculator.getReadToEmitLatency(sourceRecord)) != null && readToEmitLatency > 300000L) {
            LOGGER.debug("Published very high readToEmit latency for source record {}:{}", (Object)sourceRecord, (Object)readToEmitLatency);
        }
        if ((spannerLatency = LatencyCalculator.getSpannerLatency(sourceRecord)) != null && spannerLatency > 300000L) {
            LOGGER.debug("Published very high spannerLatnency latency for source record {}:{}", (Object)sourceRecord, (Object)spannerLatency);
        }
        if ((commitToEmitLatency = LatencyCalculator.getCommitToEmitLatency(sourceRecord)) != null && commitToEmitLatency > 300000L) {
            LOGGER.debug("Published very high spannerLatnency latency for source record {}:{}", (Object)sourceRecord, (Object)commitToEmitLatency);
        }
        if ((commitToPublishLatency = LatencyCalculator.getCommitToPublishLatency(sourceRecord)) != null && commitToPublishLatency > 300000L) {
            LOGGER.debug("Published very high commitToPublishLatency latency for source record {}:{}", (Object)sourceRecord, (Object)commitToPublishLatency);
        }
        if ((emitToPublishLatency = LatencyCalculator.getEmitToPublishLatency(sourceRecord)) != null && emitToPublishLatency > 300000L) {
            LOGGER.debug("Published very high emitToPublishLatency latency for source record {}:{}", (Object)sourceRecord, (Object)emitToPublishLatency);
        }
        if ((ownConnectorLatency = LatencyCalculator.getOwnConnectorLatency(sourceRecord)) != null && ownConnectorLatency > 300000L) {
            LOGGER.debug("Published very high ownConnectorLatency latency for source record {}:{}", (Object)sourceRecord, (Object)ownConnectorLatency);
        }
        if ((lowWatermarkLag = LatencyCalculator.getLowWatermarkLag(sourceRecord)) != null && lowWatermarkLag > 300000L) {
            LOGGER.debug("Published very high lowWatermarkLag latency for source record {}:{}", (Object)sourceRecord, (Object)lowWatermarkLag);
        }
        this.publishMetricEvent(new LatencyMetricEvent(totalLatency, readToEmitLatency, spannerLatency, commitToEmitLatency, commitToPublishLatency, emitToPublishLatency, lowWatermarkLag, ownConnectorLatency));
    }
}

