/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Properties;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class ProcessorNodeTest {
    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
        ProcessorNode node = new ProcessorNode("name", (Processor)new ExceptionalProcessor(), Collections.emptySet());
        node.init(null);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
        ProcessorNode node = new ProcessorNode("name", (Processor)new ExceptionalProcessor(), Collections.emptySet());
        node.close();
    }

    @Test
    public void testMetricsWithBuiltInMetricsVersionLatest() {
        this.testMetrics("latest");
    }

    @Test
    public void testMetricsWithBuiltInMetricsVersion0100To24() {
        this.testMetrics("0.10.0-2.4");
    }

    private void testMetrics(String builtInMetricsVersion) {
        Metrics metrics = new Metrics();
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion);
        InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
        ProcessorNode node = new ProcessorNode("name", (Processor)new NoOpProcessor(), Collections.emptySet());
        node.init((InternalProcessorContext)context);
        String threadId = Thread.currentThread().getName();
        String[] latencyOperations = new String[]{"process", "punctuate", "create", "destroy"};
        String groupName = "stream-processor-node-metrics";
        LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
        String threadIdTagKey = "0.10.0-2.4".equals(builtInMetricsVersion) ? "client-id" : "thread-id";
        metricTags.put("processor-node-id", node.name());
        metricTags.put("task-id", context.taskId().toString());
        metricTags.put(threadIdTagKey, threadId);
        if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
            for (String opName : latencyOperations) {
                Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-avg", "stream-processor-node-metrics", metricTags));
                Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-max", "stream-processor-node-metrics", metricTags));
                Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-rate", "stream-processor-node-metrics", metricTags));
                Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-total", "stream-processor-node-metrics", metricTags));
            }
            metricTags.put("processor-node-id", "all");
            for (String opName : latencyOperations) {
                Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-avg", "stream-processor-node-metrics", metricTags));
                Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-max", "stream-processor-node-metrics", metricTags));
                Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-rate", "stream-processor-node-metrics", metricTags));
                Assert.assertTrue((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-total", "stream-processor-node-metrics", metricTags));
            }
        } else {
            for (String opName : latencyOperations) {
                Assert.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-avg", "stream-processor-node-metrics", metricTags));
                Assert.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-max", "stream-processor-node-metrics", metricTags));
                Assert.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-rate", "stream-processor-node-metrics", metricTags));
                Assert.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-total", "stream-processor-node-metrics", metricTags));
            }
            metricTags.put("processor-node-id", "all");
            for (String opName : latencyOperations) {
                Assert.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-avg", "stream-processor-node-metrics", metricTags));
                Assert.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-max", "stream-processor-node-metrics", metricTags));
                Assert.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-rate", "stream-processor-node-metrics", metricTags));
                Assert.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-total", "stream-processor-node-metrics", metricTags));
            }
        }
    }

    @Test
    public void testTopologyLevelClassCastException() {
        Properties props = new Properties();
        props.put("application.id", "test");
        props.put("bootstrap.servers", "dummy:1234");
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("streams-plaintext-input").flatMapValues(value -> Arrays.asList(""));
        Topology topology = builder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
        TestInputTopic topic = testDriver.createInputTopic("streams-plaintext-input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        StreamsException se = (StreamsException)Assert.assertThrows(StreamsException.class, () -> topic.pipeInput((Object)"a-key", (Object)"a value"));
        String msg = se.getMessage();
        Assert.assertTrue((String)"Error about class cast with serdes", (boolean)msg.contains("ClassCastException"));
        Assert.assertTrue((String)"Error about class cast with serdes", (boolean)msg.contains("Serdes"));
    }

    @Test
    public void testTopologyLevelClassCastExceptionDirect() {
        Metrics metrics = new Metrics();
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "latest");
        InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
        ProcessorNode node = new ProcessorNode("name", (Processor)new ClassCastProcessor(), Collections.emptySet());
        node.init((InternalProcessorContext)context);
        StreamsException se = (StreamsException)Assert.assertThrows(StreamsException.class, () -> node.process((Object)"aKey", (Object)"aValue"));
        MatcherAssert.assertThat((Object)se.getCause(), (Matcher)CoreMatchers.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat((Object)se.getMessage(), (Matcher)CoreMatchers.containsString((String)"default Serdes"));
        MatcherAssert.assertThat((Object)se.getMessage(), (Matcher)CoreMatchers.containsString((String)"input types"));
    }

    private static class ClassCastProcessor
    extends ExceptionalProcessor {
        private ClassCastProcessor() {
        }

        @Override
        public void init(ProcessorContext context) {
        }

        @Override
        public void process(Object key, Object value) {
            throw new ClassCastException("Incompatible types simulation exception.");
        }
    }

    private static class NoOpProcessor
    implements Processor<Object, Object> {
        private NoOpProcessor() {
        }

        public void init(ProcessorContext context) {
        }

        public void process(Object key, Object value) {
        }

        public void close() {
        }
    }

    private static class ExceptionalProcessor
    implements Processor<Object, Object> {
        private ExceptionalProcessor() {
        }

        public void init(ProcessorContext context) {
            throw new RuntimeException();
        }

        public void process(Object key, Object value) {
            throw new RuntimeException();
        }

        public void close() {
            throw new RuntimeException();
        }
    }
}

