/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.metrics;

import io.debezium.data.Envelope;
import io.debezium.relational.TableId;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.util.clock.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceReaderMetrics {
    private static final Logger LOG = LoggerFactory.getLogger(SourceReaderMetrics.class);
    public static final long UNDEFINED = -1L;
    public static final String NAMESPACE_GROUP_KEY = "namespace";
    public static final String SCHEMA_GROUP_KEY = "schema";
    public static final String TABLE_GROUP_KEY = "table";
    public static final String NUM_SNAPSHOT_RECORDS = "numSnapshotRecords";
    public static final String NUM_INSERT_DML_RECORDS = "numInsertDMLRecords";
    public static final String NUM_UPDATE_DML_RECORDS = "numUpdateDMLRecords";
    public static final String NUM_DELETE_DML_RECORDS = "numDeleteDMLRecords";
    public static final String NUM_DDL_RECORDS = "numDDLRecords";
    public static final String CURRENT_EVENT_TIME_LAG = "currentEventTimeLag";
    private final SourceReaderMetricGroup metricGroup;
    private final Counter snapshotCounter;
    private final Counter insertCounter;
    private final Counter updateCounter;
    private final Counter deleteCounter;
    private final Counter schemaChangeCounter;
    private final Map<TableId, TableMetrics> tableMetricsMap = new HashMap<TableId, TableMetrics>();
    private volatile long fetchDelay = -1L;
    private final Counter numRecordsInErrorsCounter;
    private volatile long lastReceivedEventTime = -1L;

    public SourceReaderMetrics(SourceReaderMetricGroup metricGroup) {
        this.metricGroup = metricGroup;
        this.numRecordsInErrorsCounter = metricGroup.getNumRecordsInErrorsCounter();
        metricGroup.gauge("currentFetchEventTimeLag", this::getFetchDelay);
        metricGroup.gauge(CURRENT_EVENT_TIME_LAG, this::getCurrentEventTimeLag);
        this.snapshotCounter = metricGroup.counter(NUM_SNAPSHOT_RECORDS);
        this.insertCounter = metricGroup.counter(NUM_INSERT_DML_RECORDS);
        this.updateCounter = metricGroup.counter(NUM_UPDATE_DML_RECORDS);
        this.deleteCounter = metricGroup.counter(NUM_DELETE_DML_RECORDS);
        this.schemaChangeCounter = metricGroup.counter(NUM_DDL_RECORDS);
    }

    public long getFetchDelay() {
        return this.fetchDelay;
    }

    public void recordFetchDelay(long fetchDelay) {
        this.fetchDelay = fetchDelay;
    }

    public void addNumRecordsInErrors(long delta) {
        this.numRecordsInErrorsCounter.inc(delta);
    }

    public void updateLastReceivedEventTime(Long eventTimestamp) {
        if (eventTimestamp != null && eventTimestamp > 0L) {
            this.lastReceivedEventTime = eventTimestamp;
        }
    }

    public void markRecord() {
        try {
            this.metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc();
        }
        catch (Exception e) {
            LOG.warn("Failed to update record counters.", e);
        }
    }

    public void updateRecordCounters(SourceRecord record) {
        this.catchAndWarnLogAllExceptions(() -> {
            if (SourceRecordUtils.isDataChangeRecord(record)) {
                TableMetrics tableMetrics = this.getTableMetrics(SourceRecordUtils.getTableId(record));
                Envelope.Operation op = Envelope.operationFor(record);
                switch (op) {
                    case READ: {
                        this.snapshotCounter.inc();
                        tableMetrics.markSnapshotRecord();
                        break;
                    }
                    case CREATE: {
                        this.insertCounter.inc();
                        tableMetrics.markInsertRecord();
                        break;
                    }
                    case DELETE: {
                        this.deleteCounter.inc();
                        tableMetrics.markDeleteRecord();
                        break;
                    }
                    case UPDATE: {
                        this.updateCounter.inc();
                        tableMetrics.markUpdateRecord();
                    }
                }
            } else if (SourceRecordUtils.isSchemaChangeEvent(record)) {
                this.schemaChangeCounter.inc();
                TableId tableId = SourceRecordUtils.getTableId(record);
                if (tableId != null) {
                    this.getTableMetrics(tableId).markSchemaChangeRecord();
                }
            }
        });
    }

    private TableMetrics getTableMetrics(TableId tableId) {
        return this.tableMetricsMap.computeIfAbsent(tableId, id -> new TableMetrics(id.catalog(), id.schema(), id.table(), (MetricGroup)this.metricGroup));
    }

    private void catchAndWarnLogAllExceptions(Runnable runnable) {
        try {
            runnable.run();
        }
        catch (Exception e) {
            LOG.warn("Failed to update metrics", e);
        }
    }

    private long getCurrentEventTimeLag() {
        if (this.lastReceivedEventTime == -1L) {
            return -1L;
        }
        return SystemClock.getInstance().absoluteTimeMillis() - this.lastReceivedEventTime;
    }

    private static class TableMetrics {
        private final Counter recordsCounter;
        private final Counter snapshotCounter;
        private final Counter insertCounter;
        private final Counter updateCounter;
        private final Counter deleteCounter;
        private final Counter schemaChangeCounter;

        public TableMetrics(String databaseName, String schemaName, String tableName, MetricGroup parentGroup) {
            databaseName = this.processNull(databaseName);
            schemaName = this.processNull(schemaName);
            tableName = this.processNull(tableName);
            MetricGroup metricGroup = parentGroup.addGroup(SourceReaderMetrics.NAMESPACE_GROUP_KEY, databaseName).addGroup(SourceReaderMetrics.SCHEMA_GROUP_KEY, schemaName).addGroup(SourceReaderMetrics.TABLE_GROUP_KEY, tableName);
            this.recordsCounter = metricGroup.counter("numRecordsIn");
            this.snapshotCounter = metricGroup.counter(SourceReaderMetrics.NUM_SNAPSHOT_RECORDS);
            this.insertCounter = metricGroup.counter(SourceReaderMetrics.NUM_INSERT_DML_RECORDS);
            this.updateCounter = metricGroup.counter(SourceReaderMetrics.NUM_UPDATE_DML_RECORDS);
            this.deleteCounter = metricGroup.counter(SourceReaderMetrics.NUM_DELETE_DML_RECORDS);
            this.schemaChangeCounter = metricGroup.counter(SourceReaderMetrics.NUM_DDL_RECORDS);
        }

        private String processNull(String name) {
            if (StringUtils.isBlank(name)) {
                return "";
            }
            return name;
        }

        public void markSnapshotRecord() {
            this.recordsCounter.inc();
            this.snapshotCounter.inc();
        }

        public void markInsertRecord() {
            this.recordsCounter.inc();
            this.insertCounter.inc();
        }

        public void markDeleteRecord() {
            this.recordsCounter.inc();
            this.deleteCounter.inc();
        }

        public void markUpdateRecord() {
            this.recordsCounter.inc();
            this.updateCounter.inc();
        }

        public void markSchemaChangeRecord() {
            this.recordsCounter.inc();
            this.schemaChangeCounter.inc();
        }
    }
}

