/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category(value={IntegrationTest.class})
public class MetricsIntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    private static final int NUM_BROKERS = 1;
    private static final int NUM_THREADS = 2;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final long timeout = 60000L;
    private static final String STREAM_CLIENT_NODE_METRICS = "stream-metrics";
    private static final String STREAM_THREAD_NODE_METRICS = "stream-thread-metrics";
    private static final String STREAM_TASK_NODE_METRICS = "stream-task-metrics";
    private static final String STREAM_PROCESSOR_NODE_METRICS = "stream-processor-node-metrics";
    private static final String STREAM_TOPIC_METRICS = "stream-topic-metrics";
    private static final String STREAM_CACHE_NODE_METRICS = "stream-record-cache-metrics";
    private static final String IN_MEMORY_KVSTORE_TAG_KEY = "in-memory-state-id";
    private static final String IN_MEMORY_LRUCACHE_TAG_KEY = "in-memory-lru-state-id";
    private static final String ROCKSDB_KVSTORE_TAG_KEY = "rocksdb-state-id";
    private static final String STATE_STORE_LEVEL_GROUP = "stream-state-metrics";
    private static final String VERSION = "version";
    private static final String COMMIT_ID = "commit-id";
    private static final String APPLICATION_ID = "application-id";
    private static final String TOPOLOGY_DESCRIPTION = "topology-description";
    private static final String STATE = "state";
    private static final String ALIVE_STREAM_THREADS = "alive-stream-threads";
    private static final String FAILED_STREAM_THREADS = "failed-stream-threads";
    private static final String PUT_LATENCY_AVG = "put-latency-avg";
    private static final String PUT_LATENCY_MAX = "put-latency-max";
    private static final String PUT_IF_ABSENT_LATENCY_AVG = "put-if-absent-latency-avg";
    private static final String PUT_IF_ABSENT_LATENCY_MAX = "put-if-absent-latency-max";
    private static final String GET_LATENCY_AVG = "get-latency-avg";
    private static final String GET_LATENCY_MAX = "get-latency-max";
    private static final String DELETE_LATENCY_AVG = "delete-latency-avg";
    private static final String DELETE_LATENCY_MAX = "delete-latency-max";
    private static final String REMOVE_LATENCY_AVG = "remove-latency-avg";
    private static final String REMOVE_LATENCY_MAX = "remove-latency-max";
    private static final String PUT_ALL_LATENCY_AVG = "put-all-latency-avg";
    private static final String PUT_ALL_LATENCY_MAX = "put-all-latency-max";
    private static final String ALL_LATENCY_AVG = "all-latency-avg";
    private static final String ALL_LATENCY_MAX = "all-latency-max";
    private static final String RANGE_LATENCY_AVG = "range-latency-avg";
    private static final String RANGE_LATENCY_MAX = "range-latency-max";
    private static final String FLUSH_LATENCY_AVG = "flush-latency-avg";
    private static final String FLUSH_LATENCY_MAX = "flush-latency-max";
    private static final String RESTORE_LATENCY_AVG = "restore-latency-avg";
    private static final String RESTORE_LATENCY_MAX = "restore-latency-max";
    private static final String PUT_RATE = "put-rate";
    private static final String PUT_TOTAL = "put-total";
    private static final String PUT_IF_ABSENT_RATE = "put-if-absent-rate";
    private static final String PUT_IF_ABSENT_TOTAL = "put-if-absent-total";
    private static final String GET_RATE = "get-rate";
    private static final String GET_TOTAL = "get-total";
    private static final String FETCH_RATE = "fetch-rate";
    private static final String FETCH_TOTAL = "fetch-total";
    private static final String FETCH_LATENCY_AVG = "fetch-latency-avg";
    private static final String FETCH_LATENCY_MAX = "fetch-latency-max";
    private static final String DELETE_RATE = "delete-rate";
    private static final String DELETE_TOTAL = "delete-total";
    private static final String REMOVE_RATE = "remove-rate";
    private static final String REMOVE_TOTAL = "remove-total";
    private static final String PUT_ALL_RATE = "put-all-rate";
    private static final String PUT_ALL_TOTAL = "put-all-total";
    private static final String ALL_RATE = "all-rate";
    private static final String ALL_TOTAL = "all-total";
    private static final String RANGE_RATE = "range-rate";
    private static final String RANGE_TOTAL = "range-total";
    private static final String FLUSH_RATE = "flush-rate";
    private static final String FLUSH_TOTAL = "flush-total";
    private static final String RESTORE_RATE = "restore-rate";
    private static final String RESTORE_TOTAL = "restore-total";
    private static final String PROCESS_LATENCY_AVG = "process-latency-avg";
    private static final String PROCESS_LATENCY_MAX = "process-latency-max";
    private static final String PUNCTUATE_LATENCY_AVG = "punctuate-latency-avg";
    private static final String PUNCTUATE_LATENCY_MAX = "punctuate-latency-max";
    private static final String CREATE_LATENCY_AVG = "create-latency-avg";
    private static final String CREATE_LATENCY_MAX = "create-latency-max";
    private static final String DESTROY_LATENCY_AVG = "destroy-latency-avg";
    private static final String DESTROY_LATENCY_MAX = "destroy-latency-max";
    private static final String PROCESS_RATE = "process-rate";
    private static final String PROCESS_TOTAL = "process-total";
    private static final String PROCESS_RATIO = "process-ratio";
    private static final String PROCESS_RECORDS_AVG = "process-records-avg";
    private static final String PROCESS_RECORDS_MAX = "process-records-max";
    private static final String PUNCTUATE_RATE = "punctuate-rate";
    private static final String PUNCTUATE_TOTAL = "punctuate-total";
    private static final String PUNCTUATE_RATIO = "punctuate-ratio";
    private static final String CREATE_RATE = "create-rate";
    private static final String CREATE_TOTAL = "create-total";
    private static final String DESTROY_RATE = "destroy-rate";
    private static final String DESTROY_TOTAL = "destroy-total";
    private static final String FORWARD_TOTAL = "forward-total";
    private static final String FORWARD_RATE = "forward-rate";
    private static final String STREAM_STRING = "stream";
    private static final String COMMIT_LATENCY_AVG = "commit-latency-avg";
    private static final String COMMIT_LATENCY_MAX = "commit-latency-max";
    private static final String POLL_LATENCY_AVG = "poll-latency-avg";
    private static final String POLL_LATENCY_MAX = "poll-latency-max";
    private static final String COMMIT_RATE = "commit-rate";
    private static final String COMMIT_TOTAL = "commit-total";
    private static final String COMMIT_RATIO = "commit-ratio";
    private static final String ENFORCED_PROCESSING_RATE = "enforced-processing-rate";
    private static final String ENFORCED_PROCESSING_TOTAL = "enforced-processing-total";
    private static final String POLL_RATE = "poll-rate";
    private static final String POLL_TOTAL = "poll-total";
    private static final String POLL_RATIO = "poll-ratio";
    private static final String POLL_RECORDS_AVG = "poll-records-avg";
    private static final String POLL_RECORDS_MAX = "poll-records-max";
    private static final String TASK_CREATED_RATE = "task-created-rate";
    private static final String TASK_CREATED_TOTAL = "task-created-total";
    private static final String TASK_CLOSED_RATE = "task-closed-rate";
    private static final String TASK_CLOSED_TOTAL = "task-closed-total";
    private static final String BLOCKED_TIME_TOTAL = "blocked-time-ns-total";
    private static final String THREAD_START_TIME = "thread-start-time";
    private static final String ACTIVE_PROCESS_RATIO = "active-process-ratio";
    private static final String ACTIVE_BUFFER_COUNT = "active-buffer-count";
    private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
    private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total";
    private static final String RECORD_LATENESS_AVG = "record-lateness-avg";
    private static final String RECORD_LATENESS_MAX = "record-lateness-max";
    private static final String HIT_RATIO_AVG = "hit-ratio-avg";
    private static final String HIT_RATIO_MIN = "hit-ratio-min";
    private static final String HIT_RATIO_MAX = "hit-ratio-max";
    private static final String SUPPRESSION_BUFFER_SIZE_CURRENT = "suppression-buffer-size-current";
    private static final String SUPPRESSION_BUFFER_SIZE_AVG = "suppression-buffer-size-avg";
    private static final String SUPPRESSION_BUFFER_SIZE_MAX = "suppression-buffer-size-max";
    private static final String SUPPRESSION_BUFFER_COUNT_CURRENT = "suppression-buffer-count-current";
    private static final String SUPPRESSION_BUFFER_COUNT_AVG = "suppression-buffer-count-avg";
    private static final String SUPPRESSION_BUFFER_COUNT_MAX = "suppression-buffer-count-max";
    private static final String EXPIRED_WINDOW_RECORD_DROP_RATE = "expired-window-record-drop-rate";
    private static final String EXPIRED_WINDOW_RECORD_DROP_TOTAL = "expired-window-record-drop-total";
    private static final String RECORD_E2E_LATENCY_AVG = "record-e2e-latency-avg";
    private static final String RECORD_E2E_LATENCY_MIN = "record-e2e-latency-min";
    private static final String RECORD_E2E_LATENCY_MAX = "record-e2e-latency-max";
    private static final String BYTES_CONSUMED_TOTAL = "bytes-consumed-total";
    private static final String RECORDS_CONSUMED_TOTAL = "records-consumed-total";
    private static final String BYTES_PRODUCED_TOTAL = "bytes-produced-total";
    private static final String RECORDS_PRODUCED_TOTAL = "records-produced-total";
    private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store";
    private static final String SESSION_AGGREGATED_STREAM_STORE = "session-aggregated-stream-store";
    private static final String MY_STORE_IN_MEMORY = "myStoreInMemory";
    private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
    private static final String MY_STORE_LRU_MAP = "myStoreLruMap";
    private static final String STREAM_INPUT = "STREAM_INPUT";
    private static final String STREAM_OUTPUT_1 = "STREAM_OUTPUT_1";
    private static final String STREAM_OUTPUT_2 = "STREAM_OUTPUT_2";
    private static final String STREAM_OUTPUT_3 = "STREAM_OUTPUT_3";
    private static final String STREAM_OUTPUT_4 = "STREAM_OUTPUT_4";
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String appId;
    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.appId = "app-" + safeTestName;
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", this.appId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name);
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0xA00000L);
        this.streamsConfiguration.put("num.stream.threads", (Object)2);
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
    }

    @After
    public void after() throws InterruptedException {
        CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
    }

    private void startApplication() throws InterruptedException {
        Topology topology = this.builder.build();
        this.kafkaStreams = new KafkaStreams(topology, this.streamsConfiguration);
        this.verifyAliveStreamThreadsMetric();
        this.verifyStateMetric(KafkaStreams.State.CREATED);
        this.verifyTopologyDescriptionMetric(topology.describe().toString());
        this.verifyApplicationIdMetric();
        this.kafkaStreams.start();
        TestUtils.waitForCondition(() -> this.kafkaStreams.state() == KafkaStreams.State.RUNNING, (long)60000L, () -> "Kafka Streams application did not reach state RUNNING in 60000 ms");
        this.verifyAliveStreamThreadsMetric();
        this.verifyStateMetric(KafkaStreams.State.RUNNING);
    }

    private void produceRecordsForTwoSegments(Duration segmentInterval) {
        MockTime mockTime = new MockTime(Math.max(segmentInterval.toMillis(), 60000L));
        Properties props = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue((Object)1, (Object)"A")), props, mockTime.milliseconds());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue((Object)1, (Object)"B")), props, mockTime.milliseconds());
    }

    private void produceRecordsForClosingWindow(Duration windowSize) {
        MockTime mockTime = new MockTime(windowSize.toMillis() + 1L);
        Properties props = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue((Object)1, (Object)"A")), props, mockTime.milliseconds());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue((Object)1, (Object)"B")), props, mockTime.milliseconds());
    }

    private void closeApplication() throws Exception {
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
        long timeout = 60000L;
        TestUtils.waitForCondition(() -> this.kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING, (long)60000L, () -> "Kafka Streams application did not reach state NOT_RUNNING in 60000 ms");
    }

    @Test
    public void shouldAddMetricsOnAllLevels() throws Exception {
        this.builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).to(STREAM_OUTPUT_1, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.builder.table(STREAM_OUTPUT_1, Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)MY_STORE_IN_MEMORY)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_2);
        this.builder.table(STREAM_OUTPUT_2, Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_3);
        this.builder.table(STREAM_OUTPUT_3, Materialized.as((KeyValueBytesStoreSupplier)Stores.lruMap((String)MY_STORE_LRU_MAP, (int)10000)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_4);
        this.startApplication();
        this.verifyStateMetric(KafkaStreams.State.RUNNING);
        this.checkClientLevelMetrics();
        this.checkThreadLevelMetrics();
        this.checkTaskLevelMetrics();
        this.checkProcessorNodeLevelMetrics();
        this.checkTopicLevelMetrics();
        this.checkKeyValueStoreMetrics(IN_MEMORY_KVSTORE_TAG_KEY);
        this.checkKeyValueStoreMetrics(ROCKSDB_KVSTORE_TAG_KEY);
        this.checkKeyValueStoreMetrics(IN_MEMORY_LRUCACHE_TAG_KEY);
        this.checkCacheMetrics();
        this.closeApplication();
        this.checkMetricsDeregistration();
    }

    @Test
    public void shouldAddMetricsForWindowStoreAndSuppressionBuffer() throws Exception {
        Duration windowSize = Duration.ofMillis(50L);
        this.builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).groupByKey().windowedBy((Windows)TimeWindows.of((Duration)windowSize).grace(Duration.ZERO)).aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue, Materialized.as((String)TIME_WINDOWED_AGGREGATED_STREAM_STORE).withValueSerde(Serdes.Long()).withRetention(windowSize)).suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).toStream().map((key, value) -> KeyValue.pair((Object)value, (Object)value)).to(STREAM_OUTPUT_1, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        this.produceRecordsForClosingWindow(windowSize);
        this.startApplication();
        this.verifyStateMetric(KafkaStreams.State.RUNNING);
        this.checkWindowStoreAndSuppressionBufferMetrics();
        this.closeApplication();
        this.checkMetricsDeregistration();
    }

    @Test
    public void shouldAddMetricsForSessionStore() throws Exception {
        Duration inactivityGap = Duration.ofMillis(50L);
        this.builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).groupByKey().windowedBy(SessionWindows.with((Duration)inactivityGap).grace(Duration.ZERO)).aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue, (aggKey, leftAggValue, rightAggValue) -> leftAggValue, Materialized.as((String)SESSION_AGGREGATED_STREAM_STORE).withValueSerde(Serdes.Long()).withRetention(inactivityGap)).toStream().map((key, value) -> KeyValue.pair((Object)value, (Object)value)).to(STREAM_OUTPUT_1, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        this.produceRecordsForTwoSegments(inactivityGap);
        this.startApplication();
        this.verifyStateMetric(KafkaStreams.State.RUNNING);
        this.checkSessionStoreMetrics();
        this.closeApplication();
        this.checkMetricsDeregistration();
    }

    private void verifyAliveStreamThreadsMetric() {
        List metricsList = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().name().equals(ALIVE_STREAM_THREADS) && m.metricName().group().equals(STREAM_CLIENT_NODE_METRICS)).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)metricsList.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((Metric)metricsList.get(0)).metricValue(), (Matcher)CoreMatchers.is((Object)2));
    }

    private void verifyStateMetric(KafkaStreams.State state) {
        List metricsList = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().name().equals(STATE) && m.metricName().group().equals(STREAM_CLIENT_NODE_METRICS)).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)metricsList.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((Metric)metricsList.get(0)).metricValue(), (Matcher)CoreMatchers.is((Object)state));
        MatcherAssert.assertThat((Object)((Metric)metricsList.get(0)).metricValue().toString(), (Matcher)CoreMatchers.is((Object)state.toString()));
    }

    private void verifyTopologyDescriptionMetric(String topologyDescription) {
        List metricsList = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().name().equals(TOPOLOGY_DESCRIPTION) && m.metricName().group().equals(STREAM_CLIENT_NODE_METRICS)).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)metricsList.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((Metric)metricsList.get(0)).metricValue(), (Matcher)CoreMatchers.is((Object)topologyDescription));
    }

    private void verifyApplicationIdMetric() {
        List metricsList = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().name().equals(APPLICATION_ID) && m.metricName().group().equals(STREAM_CLIENT_NODE_METRICS)).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)metricsList.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((Metric)metricsList.get(0)).metricValue(), (Matcher)CoreMatchers.is((Object)this.appId));
    }

    private void checkClientLevelMetrics() {
        List<Metric> listMetricThread = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_CLIENT_NODE_METRICS)).collect(Collectors.toList());
        this.checkMetricByName(listMetricThread, VERSION, 1);
        this.checkMetricByName(listMetricThread, COMMIT_ID, 1);
        this.checkMetricByName(listMetricThread, APPLICATION_ID, 1);
        this.checkMetricByName(listMetricThread, TOPOLOGY_DESCRIPTION, 1);
        this.checkMetricByName(listMetricThread, STATE, 1);
        this.checkMetricByName(listMetricThread, ALIVE_STREAM_THREADS, 1);
        this.checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1);
    }

    private void checkThreadLevelMetrics() {
        List<Metric> listMetricThread = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList());
        this.checkMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricThread, POLL_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricThread, POLL_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricThread, COMMIT_RATE, 2);
        this.checkMetricByName(listMetricThread, COMMIT_TOTAL, 2);
        this.checkMetricByName(listMetricThread, COMMIT_RATIO, 2);
        this.checkMetricByName(listMetricThread, POLL_RATE, 2);
        this.checkMetricByName(listMetricThread, POLL_TOTAL, 2);
        this.checkMetricByName(listMetricThread, POLL_RATIO, 2);
        this.checkMetricByName(listMetricThread, POLL_RECORDS_AVG, 2);
        this.checkMetricByName(listMetricThread, POLL_RECORDS_MAX, 2);
        this.checkMetricByName(listMetricThread, PROCESS_RATE, 2);
        this.checkMetricByName(listMetricThread, PROCESS_TOTAL, 2);
        this.checkMetricByName(listMetricThread, PROCESS_RATIO, 2);
        this.checkMetricByName(listMetricThread, PROCESS_RECORDS_AVG, 2);
        this.checkMetricByName(listMetricThread, PROCESS_RECORDS_MAX, 2);
        this.checkMetricByName(listMetricThread, PUNCTUATE_RATE, 2);
        this.checkMetricByName(listMetricThread, PUNCTUATE_TOTAL, 2);
        this.checkMetricByName(listMetricThread, PUNCTUATE_RATIO, 2);
        this.checkMetricByName(listMetricThread, TASK_CREATED_RATE, 2);
        this.checkMetricByName(listMetricThread, TASK_CREATED_TOTAL, 2);
        this.checkMetricByName(listMetricThread, TASK_CLOSED_RATE, 2);
        this.checkMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 2);
        this.checkMetricByName(listMetricThread, BLOCKED_TIME_TOTAL, 2);
        this.checkMetricByName(listMetricThread, THREAD_START_TIME, 2);
    }

    private void checkTaskLevelMetrics() {
        List<Metric> listMetricTask = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
        this.checkMetricByName(listMetricTask, ENFORCED_PROCESSING_RATE, 4);
        this.checkMetricByName(listMetricTask, ENFORCED_PROCESSING_TOTAL, 4);
        this.checkMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
        this.checkMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
        this.checkMetricByName(listMetricTask, ACTIVE_PROCESS_RATIO, 4);
        this.checkMetricByName(listMetricTask, ACTIVE_BUFFER_COUNT, 4);
        this.checkMetricByName(listMetricTask, PROCESS_LATENCY_AVG, 4);
        this.checkMetricByName(listMetricTask, PROCESS_LATENCY_MAX, 4);
        this.checkMetricByName(listMetricTask, PUNCTUATE_LATENCY_AVG, 4);
        this.checkMetricByName(listMetricTask, PUNCTUATE_LATENCY_MAX, 4);
        this.checkMetricByName(listMetricTask, PUNCTUATE_RATE, 4);
        this.checkMetricByName(listMetricTask, PUNCTUATE_TOTAL, 4);
        this.checkMetricByName(listMetricTask, PROCESS_RATE, 4);
        this.checkMetricByName(listMetricTask, PROCESS_TOTAL, 4);
    }

    private void checkProcessorNodeLevelMetrics() {
        List<Metric> listMetricProcessor = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
        int numberOfSourceNodes = 4;
        int numberOfTerminalNodes = 4;
        this.checkMetricByName(listMetricProcessor, PROCESS_RATE, 4);
        this.checkMetricByName(listMetricProcessor, PROCESS_TOTAL, 4);
        this.checkMetricByName(listMetricProcessor, RECORD_E2E_LATENCY_AVG, 8);
        this.checkMetricByName(listMetricProcessor, RECORD_E2E_LATENCY_MIN, 8);
        this.checkMetricByName(listMetricProcessor, RECORD_E2E_LATENCY_MAX, 8);
    }

    private void checkTopicLevelMetrics() {
        List<Metric> listMetricProcessor = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_TOPIC_METRICS)).collect(Collectors.toList());
        int numberOfSourceTopics = 4;
        int numberOfSinkTopics = 4;
        this.checkMetricByName(listMetricProcessor, BYTES_CONSUMED_TOTAL, 4);
        this.checkMetricByName(listMetricProcessor, RECORDS_CONSUMED_TOTAL, 4);
        this.checkMetricByName(listMetricProcessor, BYTES_PRODUCED_TOTAL, 4);
        this.checkMetricByName(listMetricProcessor, RECORDS_PRODUCED_TOTAL, 4);
    }

    private void checkKeyValueStoreMetrics(String tagKey) {
        List<Metric> listMetricStore = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().tags().containsKey(tagKey) && m.metricName().group().equals(STATE_STORE_LEVEL_GROUP)).collect(Collectors.toList());
        boolean expectedNumberOfLatencyMetrics = true;
        boolean expectedNumberOfRateMetrics = true;
        boolean expectedNumberOfTotalMetrics = false;
        this.checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, GET_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, GET_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, REMOVE_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, REMOVE_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, FETCH_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, FETCH_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, PUT_RATE, 1);
        this.checkMetricByName(listMetricStore, PUT_TOTAL, 0);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 1);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
        this.checkMetricByName(listMetricStore, GET_RATE, 1);
        this.checkMetricByName(listMetricStore, GET_TOTAL, 0);
        this.checkMetricByName(listMetricStore, DELETE_RATE, 1);
        this.checkMetricByName(listMetricStore, DELETE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, REMOVE_RATE, 0);
        this.checkMetricByName(listMetricStore, REMOVE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_RATE, 1);
        this.checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
        this.checkMetricByName(listMetricStore, ALL_RATE, 1);
        this.checkMetricByName(listMetricStore, ALL_TOTAL, 0);
        this.checkMetricByName(listMetricStore, RANGE_RATE, 1);
        this.checkMetricByName(listMetricStore, RANGE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, FLUSH_RATE, 1);
        this.checkMetricByName(listMetricStore, FLUSH_TOTAL, 0);
        this.checkMetricByName(listMetricStore, RESTORE_RATE, 1);
        this.checkMetricByName(listMetricStore, RESTORE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, FETCH_RATE, 0);
        this.checkMetricByName(listMetricStore, FETCH_TOTAL, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_COUNT_CURRENT, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_COUNT_AVG, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_COUNT_MAX, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
        this.checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 1);
        this.checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 1);
    }

    private void checkMetricsDeregistration() {
        List listMetricAfterClosingApp = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)listMetricAfterClosingApp.size(), (Matcher)CoreMatchers.is((Object)0));
    }

    private void checkCacheMetrics() {
        List<Metric> listMetricCache = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
        this.checkMetricByName(listMetricCache, HIT_RATIO_AVG, 3);
        this.checkMetricByName(listMetricCache, HIT_RATIO_MIN, 3);
        this.checkMetricByName(listMetricCache, HIT_RATIO_MAX, 3);
    }

    private void checkWindowStoreAndSuppressionBufferMetrics() {
        List<Metric> listMetricStore = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STATE_STORE_LEVEL_GROUP)).collect(Collectors.toList());
        this.checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, REMOVE_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, REMOVE_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, FETCH_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, FETCH_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, PUT_RATE, 1);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
        this.checkMetricByName(listMetricStore, GET_RATE, 0);
        this.checkMetricByName(listMetricStore, GET_TOTAL, 0);
        this.checkMetricByName(listMetricStore, DELETE_RATE, 0);
        this.checkMetricByName(listMetricStore, DELETE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, REMOVE_RATE, 0);
        this.checkMetricByName(listMetricStore, REMOVE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_RATE, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
        this.checkMetricByName(listMetricStore, ALL_RATE, 0);
        this.checkMetricByName(listMetricStore, ALL_TOTAL, 0);
        this.checkMetricByName(listMetricStore, RANGE_RATE, 0);
        this.checkMetricByName(listMetricStore, RANGE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, FLUSH_RATE, 1);
        this.checkMetricByName(listMetricStore, RESTORE_RATE, 1);
        this.checkMetricByName(listMetricStore, FETCH_RATE, 1);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_COUNT_AVG, 1);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_COUNT_MAX, 1);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 1);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 1);
        this.checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 1);
        this.checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 1);
    }

    private void checkSessionStoreMetrics() {
        List<Metric> listMetricStore = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STATE_STORE_LEVEL_GROUP)).collect(Collectors.toList());
        this.checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, REMOVE_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, REMOVE_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, FETCH_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, FETCH_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricStore, PUT_RATE, 1);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
        this.checkMetricByName(listMetricStore, GET_RATE, 0);
        this.checkMetricByName(listMetricStore, GET_TOTAL, 0);
        this.checkMetricByName(listMetricStore, DELETE_RATE, 0);
        this.checkMetricByName(listMetricStore, DELETE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, REMOVE_RATE, 1);
        this.checkMetricByName(listMetricStore, PUT_ALL_RATE, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
        this.checkMetricByName(listMetricStore, ALL_RATE, 0);
        this.checkMetricByName(listMetricStore, ALL_TOTAL, 0);
        this.checkMetricByName(listMetricStore, RANGE_RATE, 0);
        this.checkMetricByName(listMetricStore, RANGE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, FLUSH_RATE, 1);
        this.checkMetricByName(listMetricStore, RESTORE_RATE, 1);
        this.checkMetricByName(listMetricStore, FETCH_RATE, 1);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_COUNT_CURRENT, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_COUNT_AVG, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_COUNT_MAX, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
        this.checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
        this.checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 1);
        this.checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 1);
    }

    private void checkMetricByName(List<Metric> listMetric, String metricName, int numMetric) {
        List metrics = listMetric.stream().filter(m -> m.metricName().name().equals(metricName)).collect(Collectors.toList());
        Assert.assertEquals((String)("Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size()), (long)numMetric, (long)metrics.size());
        for (Metric m2 : metrics) {
            Assert.assertNotNull((String)("Metric:'" + m2.metricName() + "' must be not null"), (Object)m2.metricValue());
        }
    }
}

