/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Category(value={IntegrationTest.class})
@RunWith(value=Parameterized.class)
public class RocksDBMetricsIntegrationTest {
    private static final int NUM_BROKERS = 3;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
    private static final String STREAM_INPUT = "STREAM_INPUT";
    private static final String STREAM_OUTPUT = "STREAM_OUTPUT";
    private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
    private static final Duration WINDOW_SIZE = Duration.ofMillis(50L);
    private static final long TIMEOUT = 60000L;
    private static final String METRICS_GROUP = "stream-state-metrics";
    private static final String BYTES_WRITTEN_RATE = "bytes-written-rate";
    private static final String BYTES_WRITTEN_TOTAL = "bytes-written-total";
    private static final String BYTES_READ_RATE = "bytes-read-rate";
    private static final String BYTES_READ_TOTAL = "bytes-read-total";
    private static final String MEMTABLE_BYTES_FLUSHED_RATE = "memtable-bytes-flushed-rate";
    private static final String MEMTABLE_BYTES_FLUSHED_TOTAL = "memtable-bytes-flushed-total";
    private static final String MEMTABLE_HIT_RATIO = "memtable-hit-ratio";
    private static final String WRITE_STALL_DURATION_AVG = "write-stall-duration-avg";
    private static final String WRITE_STALL_DURATION_TOTAL = "write-stall-duration-total";
    private static final String BLOCK_CACHE_DATA_HIT_RATIO = "block-cache-data-hit-ratio";
    private static final String BLOCK_CACHE_INDEX_HIT_RATIO = "block-cache-index-hit-ratio";
    private static final String BLOCK_CACHE_FILTER_HIT_RATIO = "block-cache-filter-hit-ratio";
    private static final String BYTES_READ_DURING_COMPACTION_RATE = "bytes-read-compaction-rate";
    private static final String BYTES_WRITTEN_DURING_COMPACTION_RATE = "bytes-written-compaction-rate";
    private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
    private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";
    @Parameterized.Parameter
    public String processingGuarantee;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({"exactly_once"}, {"at_least_once"});
    }

    @Before
    public void before() throws Exception {
        CLUSTER.createTopic(STREAM_INPUT, 1, 3);
    }

    @After
    public void after() throws Exception {
        CLUSTER.deleteTopicsAndWait(STREAM_INPUT, STREAM_OUTPUT);
    }

    @Test
    public void shouldExposeRocksDBMetricsForNonSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
        Properties streamsConfiguration = this.streamsConfig();
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
        StreamsBuilder builder = this.builderForNonSegmentedStateStore();
        String metricsScope = "rocksdb-state-id";
        this.cleanUpStateRunVerifyAndClose(builder, streamsConfiguration, IntegerDeserializer.class, StringDeserializer.class, this::verifyThatRocksDBMetricsAreExposed, "rocksdb-state-id");
        this.cleanUpStateRunVerifyAndClose(builder, streamsConfiguration, IntegerDeserializer.class, StringDeserializer.class, this::verifyThatRocksDBMetricsAreExposed, "rocksdb-state-id");
    }

    @Test
    public void shouldExposeRocksDBMetricsForSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
        Properties streamsConfiguration = this.streamsConfig();
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
        StreamsBuilder builder = this.builderForSegmentedStateStore();
        String metricsScope = "rocksdb-window-state-id";
        this.cleanUpStateRunVerifyAndClose(builder, streamsConfiguration, LongDeserializer.class, LongDeserializer.class, this::verifyThatRocksDBMetricsAreExposed, "rocksdb-window-state-id");
        this.cleanUpStateRunVerifyAndClose(builder, streamsConfiguration, LongDeserializer.class, LongDeserializer.class, this::verifyThatRocksDBMetricsAreExposed, "rocksdb-window-state-id");
    }

    @Test
    public void shouldVerifyThatMetricsGetMeasurementsFromRocksDBForNonSegmentedStateStore() throws Exception {
        Properties streamsConfiguration = this.streamsConfig();
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
        StreamsBuilder builder = this.builderForNonSegmentedStateStore();
        String metricsScope = "rocksdb-state-id";
        this.cleanUpStateRunVerifyAndClose(builder, streamsConfiguration, IntegerDeserializer.class, StringDeserializer.class, this::verifyThatBytesWrittenTotalIncreases, "rocksdb-state-id");
    }

    @Test
    public void shouldVerifyThatMetricsGetMeasurementsFromRocksDBForSegmentedStateStore() throws Exception {
        Properties streamsConfiguration = this.streamsConfig();
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
        StreamsBuilder builder = this.builderForSegmentedStateStore();
        String metricsScope = "rocksdb-window-state-id";
        this.cleanUpStateRunVerifyAndClose(builder, streamsConfiguration, LongDeserializer.class, LongDeserializer.class, this::verifyThatBytesWrittenTotalIncreases, "rocksdb-window-state-id");
    }

    private Properties streamsConfig() {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", "test-application");
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        streamsConfiguration.put("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name);
        streamsConfiguration.put("processing.guarantee", this.processingGuarantee);
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        return streamsConfiguration;
    }

    private StreamsBuilder builderForNonSegmentedStateStore() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(STREAM_INPUT, Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()).toStream().to(STREAM_OUTPUT);
        return builder;
    }

    private StreamsBuilder builderForSegmentedStateStore() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).groupByKey().windowedBy((Windows)TimeWindows.of((Duration)WINDOW_SIZE).grace(Duration.ZERO)).aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue, Materialized.as((String)"time-windowed-aggregated-stream-store").withValueSerde(Serdes.Long()).withRetention(WINDOW_SIZE)).toStream().map((key, value) -> KeyValue.pair((Object)value, (Object)value)).to(STREAM_OUTPUT, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        return builder;
    }

    private void cleanUpStateRunVerifyAndClose(StreamsBuilder builder, Properties streamsConfiguration, Class outputKeyDeserializer, Class outputValueDeserializer, MetricsVerifier metricsVerifier, String metricsScope) throws Exception {
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
        kafkaStreams.cleanUp();
        this.produceRecords();
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams, 60000L);
        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)"consumerApp", (Class)outputKeyDeserializer, (Class)outputValueDeserializer, (Properties)new Properties()), STREAM_OUTPUT, 1);
        metricsVerifier.verify(kafkaStreams, metricsScope);
        kafkaStreams.close();
    }

    private void produceRecords() throws Exception {
        MockTime mockTime = new MockTime(WINDOW_SIZE.toMillis());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue((Object)1, (Object)"A")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), mockTime.milliseconds());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue((Object)1, (Object)"B")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), mockTime.milliseconds());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue((Object)1, (Object)"C")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), mockTime.milliseconds());
    }

    private void verifyThatRocksDBMetricsAreExposed(KafkaStreams kafkaStreams, String metricsScope) {
        List<Metric> listMetricStore = this.getRocksDBMetrics(kafkaStreams, metricsScope);
        this.checkMetricByName(listMetricStore, BYTES_WRITTEN_RATE, 1);
        this.checkMetricByName(listMetricStore, BYTES_WRITTEN_TOTAL, 1);
        this.checkMetricByName(listMetricStore, BYTES_READ_RATE, 1);
        this.checkMetricByName(listMetricStore, BYTES_READ_TOTAL, 1);
        this.checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_RATE, 1);
        this.checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_TOTAL, 1);
        this.checkMetricByName(listMetricStore, MEMTABLE_HIT_RATIO, 1);
        this.checkMetricByName(listMetricStore, WRITE_STALL_DURATION_AVG, 1);
        this.checkMetricByName(listMetricStore, WRITE_STALL_DURATION_TOTAL, 1);
        this.checkMetricByName(listMetricStore, BLOCK_CACHE_DATA_HIT_RATIO, 1);
        this.checkMetricByName(listMetricStore, BLOCK_CACHE_INDEX_HIT_RATIO, 1);
        this.checkMetricByName(listMetricStore, BLOCK_CACHE_FILTER_HIT_RATIO, 1);
        this.checkMetricByName(listMetricStore, BYTES_READ_DURING_COMPACTION_RATE, 1);
        this.checkMetricByName(listMetricStore, BYTES_WRITTEN_DURING_COMPACTION_RATE, 1);
        this.checkMetricByName(listMetricStore, NUMBER_OF_OPEN_FILES, 1);
        this.checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, 1);
    }

    private void checkMetricByName(List<Metric> listMetric, String metricName, int numMetric) {
        List metrics = listMetric.stream().filter(m -> m.metricName().name().equals(metricName)).collect(Collectors.toList());
        MatcherAssert.assertThat((String)("Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size()), (Object)metrics.size(), (Matcher)Matchers.is((Object)numMetric));
        for (Metric metric : metrics) {
            MatcherAssert.assertThat((String)("Metric:'" + metric.metricName() + "' must be not null"), (Object)metric.metricValue(), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        }
    }

    private void verifyThatBytesWrittenTotalIncreases(KafkaStreams kafkaStreams, String metricsScope) throws InterruptedException {
        List metric = this.getRocksDBMetrics(kafkaStreams, metricsScope).stream().filter(m -> BYTES_WRITTEN_TOTAL.equals(m.metricName().name())).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> (Double)((Metric)metric.get(0)).metricValue() > 0.0, (long)60000L, () -> "RocksDB metric bytes.written.total did not increase in 60000 ms");
    }

    private List<Metric> getRocksDBMetrics(KafkaStreams kafkaStreams, String metricsScope) {
        return new ArrayList(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(METRICS_GROUP) && m.metricName().tags().containsKey(metricsScope)).collect(Collectors.toList());
    }

    @FunctionalInterface
    private static interface MetricsVerifier {
        public void verify(KafkaStreams var1, String var2) throws Exception;
    }
}

