/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Random;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(value={MockitoExtension.class})
@MockitoSettings(strictness=Strictness.STRICT_STUBS)
public class AbstractStreamTest {
    @Test
    public void testToInternalValueTransformerSupplierSuppliesNewTransformers() {
        ValueTransformerSupplier valueTransformerSupplier = (ValueTransformerSupplier)Mockito.mock(ValueTransformerSupplier.class);
        Mockito.when((Object)valueTransformerSupplier.get()).thenReturn(new NoopValueTransformer()).thenReturn(new NoopValueTransformer());
        ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = AbstractStream.toValueTransformerWithKeySupplier((ValueTransformerSupplier)valueTransformerSupplier);
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
    }

    @Test
    public void testToInternalValueTransformerWithKeySupplierSuppliesNewTransformers() {
        ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = (ValueTransformerWithKeySupplier)Mockito.mock(ValueTransformerWithKeySupplier.class);
        Mockito.when((Object)valueTransformerWithKeySupplier.get()).thenReturn(new NoopValueTransformerWithKey());
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
    }

    @Test
    public void testShouldBeExtensible() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        String topicName = "topic";
        ExtendedKStream stream = new ExtendedKStream(builder.stream("topic", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())));
        stream.randomFilter().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build());){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
            for (int expectedKey : expectedKeys) {
                inputTopic.pipeInput((Object)expectedKey, (Object)("V" + expectedKey));
            }
            Assertions.assertTrue((supplier.theCapturedProcessor().processed().size() <= expectedKeys.length ? 1 : 0) != 0);
        }
    }

    private static class ExtendedKStreamDummy<K, V>
    implements ProcessorSupplier<K, V, K, V> {
        private final Random rand = new Random();

        ExtendedKStreamDummy() {
        }

        public Processor<K, V, K, V> get() {
            return new ExtendedKStreamDummyProcessor();
        }

        private class ExtendedKStreamDummyProcessor
        extends ContextualProcessor<K, V, K, V> {
            private ExtendedKStreamDummyProcessor() {
            }

            public void process(Record<K, V> record) {
                if (ExtendedKStreamDummy.this.rand.nextBoolean()) {
                    this.context().forward(record);
                }
            }
        }
    }

    private static class ExtendedKStream<K, V>
    extends AbstractStream<K, V> {
        ExtendedKStream(KStream<K, V> stream) {
            super((AbstractStream)((KStreamImpl)stream));
        }

        KStream<K, V> randomFilter() {
            String name = this.builder.newProcessorName("RANDOM-FILTER-");
            ProcessorGraphNode processorNode = new ProcessorGraphNode(name, new ProcessorParameters(new ExtendedKStreamDummy(), name));
            this.builder.addGraphNode(this.graphNode, (GraphNode)processorNode);
            return new KStreamImpl(name, null, null, this.subTopologySourceNodes, false, (GraphNode)processorNode, this.builder);
        }
    }
}

