/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentMatchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.quality.Strictness;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={MockitoExtension.class})
public class ProcessorNodeTest {
    private static final String TOPIC = "topic";
    private static final int PARTITION = 0;
    private static final Long OFFSET = 0L;
    private static final Long TIMESTAMP = 0L;
    private static final TaskId TASK_ID = new TaskId(0, 0);
    private static final String NAME = "name";
    private static final String KEY = "key";
    private static final String VALUE = "value";

    @Test
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
        ProcessorNode node = new ProcessorNode(NAME, (Processor)new ExceptionalProcessor(), Collections.emptySet());
        Assertions.assertThrows(StreamsException.class, () -> node.init(null));
    }

    @Test
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
        ProcessorNode node = new ProcessorNode(NAME, (Processor)new ExceptionalProcessor(), Collections.emptySet());
        Assertions.assertThrows(StreamsException.class, () -> node.init(null));
    }

    @Test
    public void shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerRepliesWithFail() {
        ProcessorNode node = new ProcessorNode(NAME, (Processor)new IgnoredInternalExceptionsProcessor(), Collections.emptySet());
        InternalProcessorContext<Object, Object> internalProcessorContext = this.mockInternalProcessorContext();
        node.init(internalProcessorContext, (ProcessingExceptionHandler)new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext, false));
        FailedProcessingException failedProcessingException = (FailedProcessingException)Assertions.assertThrows(FailedProcessingException.class, () -> node.process(new Record((Object)KEY, (Object)VALUE, TIMESTAMP.longValue())));
        Assertions.assertTrue((boolean)(failedProcessingException.getCause() instanceof RuntimeException));
        Assertions.assertEquals((Object)"Processing exception should be caught and handled by the processing exception handler.", (Object)failedProcessingException.getCause().getMessage());
    }

    @Test
    public void shouldNotThrowFailedProcessingExceptionWhenProcessingExceptionHandlerRepliesWithContinue() {
        ProcessorNode node = new ProcessorNode(NAME, (Processor)new IgnoredInternalExceptionsProcessor(), Collections.emptySet());
        InternalProcessorContext<Object, Object> internalProcessorContext = this.mockInternalProcessorContext();
        node.init(internalProcessorContext, (ProcessingExceptionHandler)new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, false));
        Assertions.assertDoesNotThrow(() -> node.process(new Record((Object)KEY, (Object)VALUE, TIMESTAMP.longValue())));
    }

    @ParameterizedTest
    @CsvSource(value={"FailedProcessingException,java.lang.RuntimeException,Fail processing", "TaskCorruptedException,org.apache.kafka.streams.processor.internals.ProcessorNodeTest$IgnoredInternalExceptionsProcessor$1,Invalid offset", "TaskMigratedException,java.lang.RuntimeException,Task migrated cause"})
    public void shouldNotHandleInternalExceptionsThrownDuringProcessing(String ignoredExceptionName, Class<?> ignoredExceptionCause, String ignoredExceptionCauseMessage) {
        ProcessingExceptionHandler processingExceptionHandler = (ProcessingExceptionHandler)Mockito.mock(ProcessingExceptionHandler.class);
        ProcessorNode node = new ProcessorNode(NAME, (Processor)new IgnoredInternalExceptionsProcessor(), Collections.emptySet());
        InternalProcessorContext<Object, Object> internalProcessorContext = this.mockInternalProcessorContext();
        node.init(internalProcessorContext, processingExceptionHandler);
        RuntimeException runtimeException = (RuntimeException)Assertions.assertThrows(RuntimeException.class, () -> node.process(new Record((Object)ignoredExceptionName, (Object)VALUE, TIMESTAMP.longValue())));
        Assertions.assertEquals(ignoredExceptionCause, runtimeException.getCause().getClass());
        Assertions.assertEquals((Object)ignoredExceptionCauseMessage, (Object)runtimeException.getCause().getMessage());
        ((ProcessingExceptionHandler)Mockito.verify((Object)processingExceptionHandler, (VerificationMode)Mockito.never())).handle((ErrorHandlerContext)ArgumentMatchers.any(), (Record)ArgumentMatchers.any(), (Exception)ArgumentMatchers.any());
    }

    @Test
    public void shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException() {
        ProcessorNode node = new ProcessorNode(NAME, (Processor)new IgnoredInternalExceptionsProcessor(), Collections.emptySet());
        InternalProcessorContext<Object, Object> internalProcessorContext = this.mockInternalProcessorContext();
        node.init(internalProcessorContext, (ProcessingExceptionHandler)new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, true));
        FailedProcessingException failedProcessingException = (FailedProcessingException)Assertions.assertThrows(FailedProcessingException.class, () -> node.process(new Record((Object)KEY, (Object)VALUE, TIMESTAMP.longValue())));
        Assertions.assertInstanceOf(RuntimeException.class, (Object)failedProcessingException.getCause());
        Assertions.assertEquals((Object)"KABOOM!", (Object)failedProcessingException.getCause().getMessage());
    }

    @Test
    public void testMetricsWithBuiltInMetricsVersionLatest() {
        Metrics metrics = new Metrics();
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "latest", (Time)new MockTime());
        InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
        ProcessorNode node = new ProcessorNode(NAME, (Processor)new NoOpProcessor(), Collections.emptySet());
        node.init(context);
        String threadId = Thread.currentThread().getName();
        String[] latencyOperations = new String[]{"process", "punctuate", "create", "destroy"};
        String groupName = "stream-processor-node-metrics";
        LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
        String threadIdTagKey = "client-id";
        metricTags.put("processor-node-id", node.name());
        metricTags.put("task-id", context.taskId().toString());
        metricTags.put("client-id", threadId);
        for (String opName : latencyOperations) {
            Assertions.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-avg", "stream-processor-node-metrics", metricTags));
            Assertions.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-max", "stream-processor-node-metrics", metricTags));
            Assertions.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-rate", "stream-processor-node-metrics", metricTags));
            Assertions.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-total", "stream-processor-node-metrics", metricTags));
        }
        metricTags.put("processor-node-id", "all");
        for (String opName : latencyOperations) {
            Assertions.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-avg", "stream-processor-node-metrics", metricTags));
            Assertions.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-latency-max", "stream-processor-node-metrics", metricTags));
            Assertions.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-rate", "stream-processor-node-metrics", metricTags));
            Assertions.assertFalse((boolean)StreamsTestUtils.containsMetric(metrics, opName + "-total", "stream-processor-node-metrics", metricTags));
        }
    }

    @Test
    public void testTopologyLevelClassCastException() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("streams-plaintext-input").flatMapValues(value -> Collections.singletonList(""));
        Topology topology = builder.build();
        Properties config = new Properties();
        config.put("default.key.serde", Serdes.ByteArraySerde.class);
        config.put("default.value.serde", Serdes.ByteArraySerde.class);
        try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);){
            TestInputTopic topic = testDriver.createInputTopic("streams-plaintext-input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            StreamsException se = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> topic.pipeInput((Object)KEY, (Object)VALUE));
            String msg = se.getMessage();
            Assertions.assertTrue((boolean)msg.contains("ClassCastException"), (String)"Error about class cast with serdes");
            Assertions.assertTrue((boolean)msg.contains("Serdes"), (String)"Error about class cast with serdes");
        }
    }

    @Test
    public void testTopologyLevelConfigException() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("streams-plaintext-input").flatMapValues(value -> Collections.singletonList(""));
        Topology topology = builder.build();
        StreamsException se = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> new TopologyTestDriver(topology));
        Assertions.assertTrue((boolean)se.getMessage().contains("Failed to initialize key serdes for source node"));
        Assertions.assertTrue((boolean)se.getCause().getMessage().contains("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));
    }

    @Test
    public void testTopologyLevelClassCastExceptionDirect() {
        Metrics metrics = new Metrics();
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "latest", (Time)new MockTime());
        InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
        ProcessorNode node = new ProcessorNode("pname", (Processor)new ClassCastProcessor(), Collections.emptySet());
        node.init(context);
        StreamsException se = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> node.process(new Record((Object)KEY, (Object)VALUE, TIMESTAMP.longValue())));
        Assertions.assertTrue((boolean)(se.getCause() instanceof ClassCastException));
        Assertions.assertTrue((boolean)se.getMessage().contains("default Serdes"));
        Assertions.assertTrue((boolean)se.getMessage().contains("input types"));
        Assertions.assertTrue((boolean)se.getMessage().contains("pname"));
    }

    private InternalProcessorContext<Object, Object> mockInternalProcessorContext() {
        InternalProcessorContext internalProcessorContext = (InternalProcessorContext)Mockito.mock(InternalProcessorContext.class, (MockSettings)Mockito.withSettings().strictness(Strictness.LENIENT));
        Mockito.when((Object)internalProcessorContext.taskId()).thenReturn((Object)TASK_ID);
        Mockito.when((Object)internalProcessorContext.metrics()).thenReturn((Object)new StreamsMetricsImpl(new Metrics(), "test-client", "latest", (Time)new MockTime()));
        Mockito.when((Object)internalProcessorContext.topic()).thenReturn((Object)TOPIC);
        Mockito.when((Object)internalProcessorContext.partition()).thenReturn((Object)0);
        Mockito.when((Object)internalProcessorContext.offset()).thenReturn((Object)OFFSET);
        Mockito.when((Object)internalProcessorContext.recordContext()).thenReturn((Object)new ProcessorRecordContext(TIMESTAMP.longValue(), OFFSET.longValue(), 0, TOPIC, (Headers)new RecordHeaders()));
        Mockito.when((Object)internalProcessorContext.currentNode()).thenReturn((Object)new ProcessorNode(NAME));
        return internalProcessorContext;
    }

    public static class ProcessingExceptionHandlerMock
    implements ProcessingExceptionHandler {
        private final ProcessingExceptionHandler.ProcessingHandlerResponse response;
        private final InternalProcessorContext<Object, Object> internalProcessorContext;
        private final boolean shouldThrowException;

        public ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse response, InternalProcessorContext<Object, Object> internalProcessorContext, boolean shouldThrowException) {
            this.response = response;
            this.internalProcessorContext = internalProcessorContext;
            this.shouldThrowException = shouldThrowException;
        }

        public ProcessingExceptionHandler.ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
            Assertions.assertEquals((Object)this.internalProcessorContext.topic(), (Object)context.topic());
            Assertions.assertEquals((int)this.internalProcessorContext.partition(), (int)context.partition());
            Assertions.assertEquals((long)this.internalProcessorContext.offset(), (long)context.offset());
            Assertions.assertEquals((Object)this.internalProcessorContext.currentNode().name(), (Object)context.processorNodeId());
            Assertions.assertEquals((Object)this.internalProcessorContext.taskId(), (Object)context.taskId());
            Assertions.assertEquals((long)this.internalProcessorContext.timestamp(), (long)context.timestamp());
            Assertions.assertEquals((Object)ProcessorNodeTest.KEY, (Object)record.key());
            Assertions.assertEquals((Object)ProcessorNodeTest.VALUE, (Object)record.value());
            Assertions.assertInstanceOf(RuntimeException.class, (Object)exception);
            Assertions.assertEquals((Object)"Processing exception should be caught and handled by the processing exception handler.", (Object)exception.getMessage());
            if (this.shouldThrowException) {
                throw new RuntimeException("KABOOM!");
            }
            return this.response;
        }

        public void configure(Map<String, ?> configs) {
        }
    }

    private static class ClassCastProcessor
    extends ExceptionalProcessor {
        private ClassCastProcessor() {
        }

        @Override
        public void init(ProcessorContext<Object, Object> context) {
        }

        @Override
        public void process(Record<Object, Object> record) {
            throw new ClassCastException("Incompatible types simulation exception.");
        }
    }

    private static class IgnoredInternalExceptionsProcessor
    implements Processor<Object, Object, Object, Object> {
        private IgnoredInternalExceptionsProcessor() {
        }

        public void process(Record<Object, Object> record) {
            if (record.key().equals("FailedProcessingException")) {
                throw new FailedProcessingException((Exception)new RuntimeException("Fail processing"));
            }
            if (record.key().equals("TaskCorruptedException")) {
                HashSet<TaskId> tasksIds = new HashSet<TaskId>();
                tasksIds.add(new TaskId(0, 0));
                throw new TaskCorruptedException(tasksIds, new InvalidOffsetException("Invalid offset"){

                    public Set<TopicPartition> partitions() {
                        return new HashSet<TopicPartition>(Collections.singletonList(new TopicPartition(ProcessorNodeTest.TOPIC, 0)));
                    }
                });
            }
            if (record.key().equals("TaskMigratedException")) {
                throw new TaskMigratedException("TaskMigratedException", (Throwable)new RuntimeException("Task migrated cause"));
            }
            throw new RuntimeException("Processing exception should be caught and handled by the processing exception handler.");
        }
    }

    private static class NoOpProcessor
    implements Processor<Object, Object, Object, Object> {
        private NoOpProcessor() {
        }

        public void process(Record<Object, Object> record) {
        }
    }

    private static class ExceptionalProcessor
    implements Processor<Object, Object, Object, Object> {
        private ExceptionalProcessor() {
        }

        public void init(ProcessorContext<Object, Object> context) {
            throw new RuntimeException();
        }

        public void process(Record<Object, Object> record) {
            throw new RuntimeException();
        }

        public void close() {
            throw new RuntimeException();
        }
    }
}

