/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals.metrics;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import org.rocksdb.TickerType;
import org.slf4j.Logger;

public class RocksDBMetricsRecorder {
    private final Logger logger;
    private Sensor bytesWrittenToDatabaseSensor;
    private Sensor bytesReadFromDatabaseSensor;
    private Sensor memtableBytesFlushedSensor;
    private Sensor memtableHitRatioSensor;
    private Sensor writeStallDurationSensor;
    private Sensor blockCacheDataHitRatioSensor;
    private Sensor blockCacheIndexHitRatioSensor;
    private Sensor blockCacheFilterHitRatioSensor;
    private Sensor bytesReadDuringCompactionSensor;
    private Sensor bytesWrittenDuringCompactionSensor;
    private Sensor numberOfOpenFilesSensor;
    private Sensor numberOfFileErrorsSensor;
    private final Map<String, Statistics> statisticsToRecord = new ConcurrentHashMap<String, Statistics>();
    private final String metricsScope;
    private final String storeName;
    private final String threadId;
    private TaskId taskId;
    private StreamsMetricsImpl streamsMetrics;

    public RocksDBMetricsRecorder(String metricsScope, String threadId, String storeName) {
        this.metricsScope = metricsScope;
        this.threadId = threadId;
        this.storeName = storeName;
        LogContext logContext = new LogContext(String.format("[RocksDB Metrics Recorder for %s] ", storeName));
        this.logger = logContext.logger(RocksDBMetricsRecorder.class);
    }

    public String storeName() {
        return this.storeName;
    }

    public TaskId taskId() {
        return this.taskId;
    }

    public void init(StreamsMetricsImpl streamsMetrics, TaskId taskId) {
        if (this.taskId != null && !this.taskId.equals(taskId)) {
            throw new IllegalStateException("Metrics recorder is re-initialised with different task: previous task is " + this.taskId + " whereas current task is " + taskId + ". This is a bug in Kafka Streams.");
        }
        if (this.streamsMetrics != null && this.streamsMetrics != streamsMetrics) {
            throw new IllegalStateException("Metrics recorder is re-initialised with different Streams metrics. This is a bug in Kafka Streams.");
        }
        this.initSensors(streamsMetrics, taskId);
        this.taskId = taskId;
        this.streamsMetrics = streamsMetrics;
    }

    public void addStatistics(String segmentName, Statistics statistics) {
        if (this.statisticsToRecord.isEmpty()) {
            this.logger.debug("Adding metrics recorder of task {} to metrics recording trigger", (Object)this.taskId);
            this.streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this);
        } else if (this.statisticsToRecord.containsKey(segmentName)) {
            throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + this.taskId + " has been already added. This is a bug in Kafka Streams.");
        }
        statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
        this.logger.debug("Adding statistics for store {} of task {}", (Object)segmentName, (Object)this.taskId);
        this.statisticsToRecord.put(segmentName, statistics);
    }

    private void initSensors(StreamsMetricsImpl streamsMetrics, TaskId taskId) {
        RocksDBMetrics.RocksDBMetricContext metricContext = new RocksDBMetrics.RocksDBMetricContext(this.threadId, taskId.toString(), this.metricsScope, this.storeName);
        this.bytesWrittenToDatabaseSensor = RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricContext);
        this.bytesReadFromDatabaseSensor = RocksDBMetrics.bytesReadFromDatabaseSensor(streamsMetrics, metricContext);
        this.memtableBytesFlushedSensor = RocksDBMetrics.memtableBytesFlushedSensor(streamsMetrics, metricContext);
        this.memtableHitRatioSensor = RocksDBMetrics.memtableHitRatioSensor(streamsMetrics, metricContext);
        this.writeStallDurationSensor = RocksDBMetrics.writeStallDurationSensor(streamsMetrics, metricContext);
        this.blockCacheDataHitRatioSensor = RocksDBMetrics.blockCacheDataHitRatioSensor(streamsMetrics, metricContext);
        this.blockCacheIndexHitRatioSensor = RocksDBMetrics.blockCacheIndexHitRatioSensor(streamsMetrics, metricContext);
        this.blockCacheFilterHitRatioSensor = RocksDBMetrics.blockCacheFilterHitRatioSensor(streamsMetrics, metricContext);
        this.bytesWrittenDuringCompactionSensor = RocksDBMetrics.bytesWrittenDuringCompactionSensor(streamsMetrics, metricContext);
        this.bytesReadDuringCompactionSensor = RocksDBMetrics.bytesReadDuringCompactionSensor(streamsMetrics, metricContext);
        this.numberOfOpenFilesSensor = RocksDBMetrics.numberOfOpenFilesSensor(streamsMetrics, metricContext);
        this.numberOfFileErrorsSensor = RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricContext);
    }

    public void removeStatistics(String segmentName) {
        this.logger.debug("Removing statistics for store {} of task {}", (Object)segmentName, (Object)this.taskId);
        Statistics removedStatistics = this.statisticsToRecord.remove(segmentName);
        if (removedStatistics == null) {
            throw new IllegalStateException("No statistics for store \"" + segmentName + "\" of task " + this.taskId + " could be found. This is a bug in Kafka Streams.");
        }
        removedStatistics.close();
        if (this.statisticsToRecord.isEmpty()) {
            this.logger.debug("Removing metrics recorder for store {} of task {} from metrics recording trigger", (Object)this.storeName, (Object)this.taskId);
            this.streamsMetrics.rocksDBMetricsRecordingTrigger().removeMetricsRecorder(this);
        }
    }

    public void record() {
        this.logger.debug("Recording metrics for store {}", (Object)this.storeName);
        long bytesWrittenToDatabase = 0L;
        long bytesReadFromDatabase = 0L;
        long memtableBytesFlushed = 0L;
        long memtableHits = 0L;
        long memtableMisses = 0L;
        long blockCacheDataHits = 0L;
        long blockCacheDataMisses = 0L;
        long blockCacheIndexHits = 0L;
        long blockCacheIndexMisses = 0L;
        long blockCacheFilterHits = 0L;
        long blockCacheFilterMisses = 0L;
        long writeStallDuration = 0L;
        long bytesWrittenDuringCompaction = 0L;
        long bytesReadDuringCompaction = 0L;
        long numberOfOpenFiles = 0L;
        long numberOfFileErrors = 0L;
        for (Statistics statistics : this.statisticsToRecord.values()) {
            bytesWrittenToDatabase += statistics.getAndResetTickerCount(TickerType.BYTES_WRITTEN);
            bytesReadFromDatabase += statistics.getAndResetTickerCount(TickerType.BYTES_READ);
            memtableBytesFlushed += statistics.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES);
            memtableHits += statistics.getAndResetTickerCount(TickerType.MEMTABLE_HIT);
            memtableMisses += statistics.getAndResetTickerCount(TickerType.MEMTABLE_MISS);
            blockCacheDataHits += statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT);
            blockCacheDataMisses += statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS);
            blockCacheIndexHits += statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT);
            blockCacheIndexMisses += statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS);
            blockCacheFilterHits += statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT);
            blockCacheFilterMisses += statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS);
            writeStallDuration += statistics.getAndResetTickerCount(TickerType.STALL_MICROS);
            bytesWrittenDuringCompaction += statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
            bytesReadDuringCompaction += statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
            numberOfOpenFiles += statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS) - statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES);
            numberOfFileErrors += statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS);
        }
        this.bytesWrittenToDatabaseSensor.record((double)bytesWrittenToDatabase);
        this.bytesReadFromDatabaseSensor.record((double)bytesReadFromDatabase);
        this.memtableBytesFlushedSensor.record((double)memtableBytesFlushed);
        this.memtableHitRatioSensor.record(this.computeHitRatio(memtableHits, memtableMisses));
        this.blockCacheDataHitRatioSensor.record(this.computeHitRatio(blockCacheDataHits, blockCacheDataMisses));
        this.blockCacheIndexHitRatioSensor.record(this.computeHitRatio(blockCacheIndexHits, blockCacheIndexMisses));
        this.blockCacheFilterHitRatioSensor.record(this.computeHitRatio(blockCacheFilterHits, blockCacheFilterMisses));
        this.writeStallDurationSensor.record((double)writeStallDuration);
        this.bytesWrittenDuringCompactionSensor.record((double)bytesWrittenDuringCompaction);
        this.bytesReadDuringCompactionSensor.record((double)bytesReadDuringCompaction);
        this.numberOfOpenFilesSensor.record((double)numberOfOpenFiles);
        this.numberOfFileErrorsSensor.record((double)numberOfFileErrors);
    }

    private double computeHitRatio(long hits, long misses) {
        if (hits == 0L) {
            return 0.0;
        }
        return (double)hits / (double)(hits + misses);
    }
}

