/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Random;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class AbstractStreamTest {
    private final String topicName = "topic";
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    @Test
    public void testShouldBeExtensible() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
        MockProcessorSupplier processor = new MockProcessorSupplier();
        String topicName = "topic";
        ExtendedKStream stream = new ExtendedKStream(builder.stream("topic", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())));
        stream.randomFilter().process(processor, new String[0]);
        this.driver.setUp(builder);
        for (int expectedKey : expectedKeys) {
            this.driver.process("topic", expectedKey, "V" + expectedKey);
        }
        Assert.assertTrue((processor.processed.size() <= expectedKeys.length ? 1 : 0) != 0);
    }

    private class ExtendedKStreamDummy<K, V>
    implements ProcessorSupplier<K, V> {
        private Random rand = new Random();

        ExtendedKStreamDummy() {
        }

        public Processor<K, V> get() {
            return new ExtendedKStreamDummyProcessor();
        }

        private class ExtendedKStreamDummyProcessor
        extends AbstractProcessor<K, V> {
            private ExtendedKStreamDummyProcessor() {
            }

            public void process(K key, V value) {
                if (ExtendedKStreamDummy.this.rand.nextBoolean()) {
                    this.context().forward(key, value);
                }
            }
        }
    }

    private class ExtendedKStream<K, V>
    extends AbstractStream<K> {
        ExtendedKStream(KStream<K, V> stream) {
            super((AbstractStream)((KStreamImpl)stream));
        }

        KStream<K, V> randomFilter() {
            String name = this.builder.newProcessorName("RANDOM-FILTER-");
            this.builder.internalTopologyBuilder.addProcessor(name, new ExtendedKStreamDummy(), new String[]{this.name});
            return new KStreamImpl(this.builder, name, this.sourceNodes, false);
        }
    }
}

