/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class SinkNodeTest {
    private final StateSerdes<Bytes, Bytes> anyStateSerde = StateSerdes.withBuiltinTypes((String)"anyName", Bytes.class, Bytes.class);
    private final Serializer<byte[]> anySerializer = Serdes.ByteArray().serializer();
    private final RecordCollector recordCollector = new MockRecordCollector();
    private final InternalMockProcessorContext<Void, Void> context = new InternalMockProcessorContext(this.anyStateSerde, this.recordCollector);
    private final SinkNode<byte[], byte[]> sink = new SinkNode("anyNodeName", (TopicNameExtractor)new StaticTopicNameExtractor("any-output-topic"), this.anySerializer, this.anySerializer, null);
    private final SinkNode<Object, Object> illTypedSink = this.sink;
    private MockedStatic<WrappingNullableUtils> utilsMock;

    @BeforeEach
    public void setup() {
        this.utilsMock = Mockito.mockStatic(WrappingNullableUtils.class);
    }

    @AfterEach
    public void cleanup() {
        this.utilsMock.close();
    }

    @Test
    public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() {
        this.sink.init(this.context);
        this.context.setTime(-1L);
        try {
            this.illTypedSink.process(new Record((Object)"any key".getBytes(), (Object)"any value".getBytes(), -1L));
            Assertions.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowStreamsExceptionOnUndefinedKeySerde() {
        this.utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer((Serializer)((Serializer)ArgumentMatchers.any()), (ProcessorContext)((ProcessorContext)ArgumentMatchers.any()), (String)((String)ArgumentMatchers.any()))).thenThrow(new Throwable[]{new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")});
        Throwable exception = Assertions.assertThrows(StreamsException.class, () -> this.sink.init(this.context));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.equalTo((Object)"Failed to initialize key serdes for sink node anyNodeName"));
        MatcherAssert.assertThat((Object)exception.getCause().getMessage(), (Matcher)Matchers.equalTo((Object)"Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnUndefinedValueSerde() {
        this.utilsMock.when(() -> WrappingNullableUtils.prepareValueSerializer((Serializer)((Serializer)ArgumentMatchers.any()), (ProcessorContext)((ProcessorContext)ArgumentMatchers.any()), (String)((String)ArgumentMatchers.any()))).thenThrow(new Throwable[]{new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")});
        Throwable exception = Assertions.assertThrows(StreamsException.class, () -> this.sink.init(this.context));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.equalTo((Object)"Failed to initialize value serdes for sink node anyNodeName"));
        MatcherAssert.assertThat((Object)exception.getCause().getMessage(), (Matcher)Matchers.equalTo((Object)"Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG"));
    }

    @Test
    public void shouldThrowStreamsExceptionWithExplicitErrorMessage() {
        this.utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer((Serializer)((Serializer)ArgumentMatchers.any()), (ProcessorContext)((ProcessorContext)ArgumentMatchers.any()), (String)((String)ArgumentMatchers.any()))).thenThrow(new Throwable[]{new StreamsException("")});
        Throwable exception = Assertions.assertThrows(StreamsException.class, () -> this.sink.init(this.context));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.equalTo((Object)"Failed to initialize key serdes for sink node anyNodeName"));
    }
}

