/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class MetricsIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String STREAM_THREAD_NODE_METRICS = "stream-metrics";
    private static final String STREAM_TASK_NODE_METRICS = "stream-task-metrics";
    private static final String STREAM_PROCESSOR_NODE_METRICS = "stream-processor-node-metrics";
    private static final String STREAM_CACHE_NODE_METRICS = "stream-record-cache-metrics";
    private static final String STREAM_STORE_IN_MEMORY_STATE_METRICS = "stream-in-memory-state-metrics";
    private static final String STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS = "stream-in-memory-lru-state-metrics";
    private static final String STREAM_STORE_ROCKSDB_STATE_METRICS = "stream-rocksdb-state-metrics";
    private static final String STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS = "stream-rocksdb-window-state-metrics";
    private static final String STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS = "stream-rocksdb-session-state-metrics";
    private static final String PUT_LATENCY_AVG = "put-latency-avg";
    private static final String PUT_LATENCY_MAX = "put-latency-max";
    private static final String PUT_IF_ABSENT_LATENCY_AVG = "put-if-absent-latency-avg";
    private static final String PUT_IF_ABSENT_LATENCY_MAX = "put-if-absent-latency-max";
    private static final String GET_LATENCY_AVG = "get-latency-avg";
    private static final String GET_LATENCY_MAX = "get-latency-max";
    private static final String DELETE_LATENCY_AVG = "delete-latency-avg";
    private static final String DELETE_LATENCY_MAX = "delete-latency-max";
    private static final String PUT_ALL_LATENCY_AVG = "put-all-latency-avg";
    private static final String PUT_ALL_LATENCY_MAX = "put-all-latency-max";
    private static final String ALL_LATENCY_AVG = "all-latency-avg";
    private static final String ALL_LATENCY_MAX = "all-latency-max";
    private static final String RANGE_LATENCY_AVG = "range-latency-avg";
    private static final String RANGE_LATENCY_MAX = "range-latency-max";
    private static final String FLUSH_LATENCY_AVG = "flush-latency-avg";
    private static final String FLUSH_LATENCY_MAX = "flush-latency-max";
    private static final String RESTORE_LATENCY_AVG = "restore-latency-avg";
    private static final String RESTORE_LATENCY_MAX = "restore-latency-max";
    private static final String PUT_RATE = "put-rate";
    private static final String PUT_TOTAL = "put-total";
    private static final String PUT_IF_ABSENT_RATE = "put-if-absent-rate";
    private static final String PUT_IF_ABSENT_TOTAL = "put-if-absent-total";
    private static final String GET_RATE = "get-rate";
    private static final String DELETE_RATE = "delete-rate";
    private static final String DELETE_TOTAL = "delete-total";
    private static final String PUT_ALL_RATE = "put-all-rate";
    private static final String PUT_ALL_TOTAL = "put-all-total";
    private static final String ALL_RATE = "all-rate";
    private static final String ALL_TOTAL = "all-total";
    private static final String RANGE_RATE = "range-rate";
    private static final String RANGE_TOTAL = "range-total";
    private static final String FLUSH_RATE = "flush-rate";
    private static final String FLUSH_TOTAL = "flush-total";
    private static final String RESTORE_RATE = "restore-rate";
    private static final String RESTORE_TOTAL = "restore-total";
    private static final String PROCESS_LATENCY_AVG = "process-latency-avg";
    private static final String PROCESS_LATENCY_MAX = "process-latency-max";
    private static final String PUNCTUATE_LATENCY_AVG = "punctuate-latency-avg";
    private static final String PUNCTUATE_LATENCY_MAX = "punctuate-latency-max";
    private static final String CREATE_LATENCY_AVG = "create-latency-avg";
    private static final String CREATE_LATENCY_MAX = "create-latency-max";
    private static final String DESTROY_LATENCY_AVG = "destroy-latency-avg";
    private static final String DESTROY_LATENCY_MAX = "destroy-latency-max";
    private static final String PROCESS_RATE = "process-rate";
    private static final String PROCESS_TOTAL = "process-total";
    private static final String PUNCTUATE_RATE = "punctuate-rate";
    private static final String PUNCTUATE_TOTAL = "punctuate-total";
    private static final String CREATE_RATE = "create-rate";
    private static final String CREATE_TOTAL = "create-total";
    private static final String DESTROY_RATE = "destroy-rate";
    private static final String DESTROY_TOTAL = "destroy-total";
    private static final String FORWARD_TOTAL = "forward-total";
    private static final String STREAM_STRING = "stream";
    private static final String COMMIT_LATENCY_AVG = "commit-latency-avg";
    private static final String COMMIT_LATENCY_MAX = "commit-latency-max";
    private static final String POLL_LATENCY_AVG = "poll-latency-avg";
    private static final String POLL_LATENCY_MAX = "poll-latency-max";
    private static final String COMMIT_RATE = "commit-rate";
    private static final String COMMIT_TOTAL = "commit-total";
    private static final String POLL_RATE = "poll-rate";
    private static final String POLL_TOTAL = "poll-total";
    private static final String TASK_CREATED_RATE = "task-created-rate";
    private static final String TASK_CREATED_TOTAL = "task-created-total";
    private static final String TASK_CLOSED_RATE = "task-closed-rate";
    private static final String TASK_CLOSED_TOTAL = "task-closed-total";
    private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
    private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total";
    private static final String RECORD_LATENESS_AVG = "record-lateness-avg";
    private static final String RECORD_LATENESS_MAX = "record-lateness-max";
    private static final String HIT_RATIO_AVG = "hitRatio-avg";
    private static final String HIT_RATIO_MIN = "hitRatio-min";
    private static final String HIT_RATIO_MAX = "hitRatio-max";
    private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store";
    private static final String SESSION_AGGREGATED_STREAM_STORE = "session-aggregated-stream-store";
    private static final String MY_STORE_IN_MEMORY = "myStoreInMemory";
    private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
    private static final String MY_STORE_LRU_MAP = "myStoreLruMap";
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private static final String STREAM_INPUT = "STREAM_INPUT";
    private static final String STREAM_OUTPUT_1 = "STREAM_OUTPUT_1";
    private static final String STREAM_OUTPUT_2 = "STREAM_OUTPUT_2";
    private static final String STREAM_OUTPUT_3 = "STREAM_OUTPUT_3";
    private static final String STREAM_OUTPUT_4 = "STREAM_OUTPUT_4";
    private KStream<Integer, String> stream;
    private KStream<Integer, String> stream2;
    private final String appId = "stream-metrics-test";

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "stream-metrics-test");
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name);
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0xA00000L);
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
    }

    @After
    public void after() throws InterruptedException {
        CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
    }

    private void startApplication() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private void closeApplication() throws Exception {
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    private void checkMetricDeregistration() throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            List listMetricAfterClosingApp = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
            return listMetricAfterClosingApp.size() == 0;
        }, (long)10000L, (String)"de-registration of metrics");
    }

    @Test
    public void testStreamMetric() throws Exception {
        StringBuilder errorMessage = new StringBuilder();
        this.stream = this.builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.stream.to(STREAM_OUTPUT_1, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.builder.table(STREAM_OUTPUT_1, Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)MY_STORE_IN_MEMORY)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_2);
        this.builder.table(STREAM_OUTPUT_2, Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_3);
        this.builder.table(STREAM_OUTPUT_3, Materialized.as((KeyValueBytesStoreSupplier)Stores.lruMap((String)MY_STORE_LRU_MAP, (int)10000)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_4);
        this.startApplication();
        TestUtils.waitForCondition(() -> this.testThreadMetric(errorMessage), (long)10000L, () -> "testThreadMetric -> " + errorMessage.toString());
        TestUtils.waitForCondition(() -> this.testTaskMetric(errorMessage), (long)10000L, () -> "testTaskMetric -> " + errorMessage.toString());
        TestUtils.waitForCondition(() -> this.testProcessorMetric(errorMessage), (long)10000L, () -> "testProcessorMetric -> " + errorMessage.toString());
        TestUtils.waitForCondition(() -> this.testStoreMetricKeyValueByType(STREAM_STORE_IN_MEMORY_STATE_METRICS, errorMessage), (long)10000L, () -> "testStoreMetricKeyValueByType:stream-in-memory-state-metrics -> " + errorMessage.toString());
        TestUtils.waitForCondition(() -> this.testStoreMetricKeyValueByType(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS, errorMessage), (long)10000L, () -> "testStoreMetricKeyValueByType:stream-in-memory-lru-state-metrics -> " + errorMessage.toString());
        TestUtils.waitForCondition(() -> this.testStoreMetricKeyValueByType(STREAM_STORE_ROCKSDB_STATE_METRICS, errorMessage), (long)10000L, () -> "testStoreMetricKeyValueByType:stream-rocksdb-state-metrics -> " + errorMessage.toString());
        TestUtils.waitForCondition(() -> this.testCacheMetric(errorMessage), (long)10000L, () -> "testCacheMetric -> " + errorMessage.toString());
        this.closeApplication();
        this.checkMetricDeregistration();
    }

    @Test
    public void testStreamMetricOfWindowStore() throws Exception {
        StringBuilder errorMessage = new StringBuilder();
        this.stream2 = this.builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        KGroupedStream groupedStream = this.stream2.groupByKey();
        groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(50L))).aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue, Materialized.as((String)TIME_WINDOWED_AGGREGATED_STREAM_STORE).withValueSerde(Serdes.Long()));
        this.startApplication();
        TestUtils.waitForCondition(() -> this.testStoreMetricWindow(errorMessage), (long)10000L, () -> "testStoreMetricWindow -> " + errorMessage.toString());
        this.closeApplication();
        this.checkMetricDeregistration();
    }

    @Test
    public void testStreamMetricOfSessionStore() throws Exception {
        StringBuilder errorMessage = new StringBuilder();
        this.stream2 = this.builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        KGroupedStream groupedStream = this.stream2.groupByKey();
        groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(50L))).aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue, (aggKey, leftAggValue, rightAggValue) -> leftAggValue, Materialized.as((String)SESSION_AGGREGATED_STREAM_STORE).withValueSerde(Serdes.Long()));
        this.startApplication();
        TestUtils.waitForCondition(() -> this.testStoreMetricSession(errorMessage), (long)10000L, () -> "testStoreMetricSession -> " + errorMessage.toString());
        this.closeApplication();
        this.checkMetricDeregistration();
    }

    private boolean testThreadMetric(StringBuilder errorMessage) {
        errorMessage.setLength(0);
        try {
            List<Metric> listMetricThread = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList());
            this.testMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
            this.testMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
            this.testMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
            this.testMetricByName(listMetricThread, POLL_LATENCY_MAX, 1);
            this.testMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1);
            this.testMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1);
            this.testMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1);
            this.testMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1);
            this.testMetricByName(listMetricThread, COMMIT_RATE, 1);
            this.testMetricByName(listMetricThread, COMMIT_TOTAL, 1);
            this.testMetricByName(listMetricThread, POLL_RATE, 1);
            this.testMetricByName(listMetricThread, POLL_TOTAL, 1);
            this.testMetricByName(listMetricThread, PROCESS_RATE, 1);
            this.testMetricByName(listMetricThread, PROCESS_TOTAL, 1);
            this.testMetricByName(listMetricThread, PUNCTUATE_RATE, 1);
            this.testMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1);
            this.testMetricByName(listMetricThread, TASK_CREATED_RATE, 1);
            this.testMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1);
            this.testMetricByName(listMetricThread, TASK_CLOSED_RATE, 1);
            this.testMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
            this.testMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
            this.testMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
            return true;
        }
        catch (Throwable e) {
            errorMessage.append(e.getMessage());
            return false;
        }
    }

    private boolean testTaskMetric(StringBuilder errorMessage) {
        errorMessage.setLength(0);
        try {
            List<Metric> listMetricTask = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
            this.testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
            this.testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
            this.testMetricByName(listMetricTask, COMMIT_RATE, 5);
            this.testMetricByName(listMetricTask, COMMIT_TOTAL, 5);
            this.testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
            this.testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
            return true;
        }
        catch (Throwable e) {
            errorMessage.append(e.getMessage());
            return false;
        }
    }

    private boolean testProcessorMetric(StringBuilder errorMessage) {
        errorMessage.setLength(0);
        try {
            List<Metric> listMetricProcessor = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
            this.testMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
            this.testMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
            this.testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
            this.testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18);
            this.testMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18);
            this.testMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18);
            this.testMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18);
            this.testMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18);
            this.testMetricByName(listMetricProcessor, PROCESS_RATE, 18);
            this.testMetricByName(listMetricProcessor, PROCESS_TOTAL, 18);
            this.testMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18);
            this.testMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18);
            this.testMetricByName(listMetricProcessor, CREATE_RATE, 18);
            this.testMetricByName(listMetricProcessor, CREATE_TOTAL, 18);
            this.testMetricByName(listMetricProcessor, DESTROY_RATE, 18);
            this.testMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
            this.testMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
            return true;
        }
        catch (Throwable e) {
            errorMessage.append(e.getMessage());
            return false;
        }
    }

    private boolean testStoreMetricWindow(StringBuilder errorMessage) {
        errorMessage.setLength(0);
        try {
            List<Metric> listMetricStore = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS)).collect(Collectors.toList());
            this.testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, PUT_RATE, 2);
            this.testMetricByName(listMetricStore, PUT_TOTAL, 2);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
            this.testMetricByName(listMetricStore, GET_RATE, 0);
            this.testMetricByName(listMetricStore, DELETE_RATE, 0);
            this.testMetricByName(listMetricStore, DELETE_TOTAL, 0);
            this.testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
            this.testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
            this.testMetricByName(listMetricStore, ALL_RATE, 0);
            this.testMetricByName(listMetricStore, ALL_TOTAL, 0);
            this.testMetricByName(listMetricStore, RANGE_RATE, 0);
            this.testMetricByName(listMetricStore, RANGE_TOTAL, 0);
            this.testMetricByName(listMetricStore, FLUSH_RATE, 2);
            this.testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
            this.testMetricByName(listMetricStore, RESTORE_RATE, 2);
            this.testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
            return true;
        }
        catch (Throwable e) {
            errorMessage.append(e.getMessage());
            return false;
        }
    }

    private boolean testStoreMetricSession(StringBuilder errorMessage) {
        errorMessage.setLength(0);
        try {
            List<Metric> listMetricStore = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS)).collect(Collectors.toList());
            this.testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
            this.testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
            this.testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, PUT_RATE, 2);
            this.testMetricByName(listMetricStore, PUT_TOTAL, 2);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
            this.testMetricByName(listMetricStore, GET_RATE, 0);
            this.testMetricByName(listMetricStore, DELETE_RATE, 0);
            this.testMetricByName(listMetricStore, DELETE_TOTAL, 0);
            this.testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
            this.testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
            this.testMetricByName(listMetricStore, ALL_RATE, 0);
            this.testMetricByName(listMetricStore, ALL_TOTAL, 0);
            this.testMetricByName(listMetricStore, RANGE_RATE, 0);
            this.testMetricByName(listMetricStore, RANGE_TOTAL, 0);
            this.testMetricByName(listMetricStore, FLUSH_RATE, 2);
            this.testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
            this.testMetricByName(listMetricStore, RESTORE_RATE, 2);
            this.testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
            return true;
        }
        catch (Throwable e) {
            errorMessage.append(e.getMessage());
            return false;
        }
    }

    private boolean testStoreMetricKeyValueByType(String storeType, StringBuilder errorMessage) {
        errorMessage.setLength(0);
        try {
            List<Metric> listMetricStore = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(storeType)).collect(Collectors.toList());
            this.testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
            this.testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
            this.testMetricByName(listMetricStore, PUT_RATE, 2);
            this.testMetricByName(listMetricStore, PUT_TOTAL, 2);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
            this.testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
            this.testMetricByName(listMetricStore, GET_RATE, 2);
            this.testMetricByName(listMetricStore, DELETE_RATE, 2);
            this.testMetricByName(listMetricStore, DELETE_TOTAL, 2);
            this.testMetricByName(listMetricStore, PUT_ALL_RATE, 2);
            this.testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
            this.testMetricByName(listMetricStore, ALL_RATE, 2);
            this.testMetricByName(listMetricStore, ALL_TOTAL, 2);
            this.testMetricByName(listMetricStore, RANGE_RATE, 2);
            this.testMetricByName(listMetricStore, RANGE_TOTAL, 2);
            this.testMetricByName(listMetricStore, FLUSH_RATE, 2);
            this.testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
            this.testMetricByName(listMetricStore, RESTORE_RATE, 2);
            this.testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
            return true;
        }
        catch (Throwable e) {
            errorMessage.append(e.getMessage());
            return false;
        }
    }

    private boolean testCacheMetric(StringBuilder errorMessage) {
        errorMessage.setLength(0);
        try {
            List<Metric> listMetricCache = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
            this.testMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
            this.testMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
            this.testMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
            return true;
        }
        catch (Throwable e) {
            errorMessage.append(e.getMessage());
            return false;
        }
    }

    private void testMetricByName(List<Metric> listMetric, String metricName, int numMetric) {
        List metrics = listMetric.stream().filter(m -> m.metricName().name().equals(metricName)).collect(Collectors.toList());
        Assert.assertEquals((String)("Size of metrics of type:'" + metricName + "' must be equal to:" + numMetric + " but it's equal to " + metrics.size()), (long)numMetric, (long)metrics.size());
        for (Metric m2 : metrics) {
            Assert.assertNotNull((String)("Metric:'" + m2.metricName() + "' must be not null"), (Object)m2.metricValue());
        }
    }
}

