/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.SensorAccessor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class SourceNodeTest {
    @Test
    public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
        MockSourceNode<String, String> sourceNode = new MockSourceNode<String, String>(new TheDeserializer(), new TheDeserializer());
        RecordHeaders headers = new RecordHeaders();
        String deserializeKey = (String)sourceNode.deserializeKey("topic", (Headers)headers, "data".getBytes(StandardCharsets.UTF_8));
        MatcherAssert.assertThat((Object)deserializeKey, (Matcher)CoreMatchers.is((Object)("topic" + headers + "data")));
    }

    @Test
    public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
        MockSourceNode<String, String> sourceNode = new MockSourceNode<String, String>(new TheDeserializer(), new TheDeserializer());
        RecordHeaders headers = new RecordHeaders();
        String deserializedValue = (String)sourceNode.deserializeValue("topic", (Headers)headers, "data".getBytes(StandardCharsets.UTF_8));
        MatcherAssert.assertThat((Object)deserializedValue, (Matcher)CoreMatchers.is((Object)("topic" + headers + "data")));
    }

    @Test
    public void shouldExposeProcessMetricsWithBuiltInMetricsVersionLatest() {
        this.shouldExposeProcessMetrics("latest");
    }

    @Test
    public void shouldExposeProcessWithBuiltInMetricsVersion0100To24() {
        this.shouldExposeProcessMetrics("0.10.0-2.4");
    }

    private void shouldExposeProcessMetrics(String builtInMetricsVersion) {
        Metrics metrics = new Metrics();
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion);
        InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
        SourceNode node = new SourceNode(context.currentNode().name(), (Deserializer)new TheDeserializer(), (Deserializer)new TheDeserializer());
        node.init((InternalProcessorContext)context);
        String threadId = Thread.currentThread().getName();
        String groupName = "stream-processor-node-metrics";
        String threadIdTagKey = "0.10.0-2.4".equals(builtInMetricsVersion) ? "client-id" : "thread-id";
        Map metricTags = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)threadIdTagKey, (Object)threadId), Utils.mkEntry((Object)"task-id", (Object)context.taskId().toString()), Utils.mkEntry((Object)"processor-node-id", (Object)node.name())});
        if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
            Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, "forward-rate", "stream-processor-node-metrics", metricTags));
            Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, "forward-total", "stream-processor-node-metrics", metricTags));
            metricTags.put("processor-node-id", "all");
            Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, "forward-rate", "stream-processor-node-metrics", metricTags));
            Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, "forward-total", "stream-processor-node-metrics", metricTags));
        } else {
            Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, "process-rate", "stream-processor-node-metrics", metricTags));
            Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, "process-total", "stream-processor-node-metrics", metricTags));
            String parentGroupName = "stream-task-metrics";
            metricTags.remove("processor-node-id");
            Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, "process-rate", "stream-task-metrics", metricTags));
            Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, "process-total", "stream-task-metrics", metricTags));
            String sensorNamePrefix = "internal." + threadId + ".task." + context.taskId().toString();
            Sensor processSensor = metrics.getSensor(sensorNamePrefix + ".node." + context.currentNode().name() + ".s.process");
            SensorAccessor sensorAccessor = new SensorAccessor(processSensor);
            MatcherAssert.assertThat(sensorAccessor.parents().stream().map(Sensor::name).collect(Collectors.toList()), (Matcher)Matchers.contains((Object[])new String[]{sensorNamePrefix + ".s.process"}));
        }
    }

    public static class TheDeserializer
    implements Deserializer<String> {
        public String deserialize(String topic, Headers headers, byte[] data) {
            return topic + headers + new String(data, StandardCharsets.UTF_8);
        }

        public String deserialize(String topic, byte[] data) {
            return this.deserialize(topic, null, data);
        }
    }
}

