/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Properties;
import java.util.Random;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;

public class AbstractStreamTest {
    @Test
    public void testToInternalValueTransformerSupplierSuppliesNewTransformers() {
        ValueTransformerSupplier valueTransformerSupplier = (ValueTransformerSupplier)EasyMock.createMock(ValueTransformerSupplier.class);
        EasyMock.expect((Object)valueTransformerSupplier.get()).andReturn(null).times(3);
        ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = AbstractStream.toValueTransformerWithKeySupplier((ValueTransformerSupplier)valueTransformerSupplier);
        EasyMock.replay((Object[])new Object[]{valueTransformerSupplier});
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
        EasyMock.verify((Object[])new Object[]{valueTransformerSupplier});
    }

    @Test
    public void testToInternalValueTransformerWithKeySupplierSuppliesNewTransformers() {
        ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = (ValueTransformerWithKeySupplier)EasyMock.createMock(ValueTransformerWithKeySupplier.class);
        EasyMock.expect((Object)valueTransformerWithKeySupplier.get()).andReturn(null).times(3);
        EasyMock.replay((Object[])new Object[]{valueTransformerWithKeySupplier});
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
        valueTransformerWithKeySupplier.get();
        EasyMock.verify((Object[])new Object[]{valueTransformerWithKeySupplier});
    }

    @Test
    public void testShouldBeExtensible() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        String topicName = "topic";
        ExtendedKStream stream = new ExtendedKStream(builder.stream("topic", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())));
        stream.randomFilter().process(supplier, new String[0]);
        Properties props = new Properties();
        props.setProperty("application.id", "abstract-stream-test");
        props.setProperty("bootstrap.servers", "localhost:9091");
        props.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
        TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props);
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(recordFactory.create("topic", (Object)expectedKey, (Object)("V" + expectedKey)));
        }
        Assert.assertTrue((supplier.theCapturedProcessor().processed.size() <= expectedKeys.length ? 1 : 0) != 0);
    }

    private class ExtendedKStreamDummy<K, V>
    implements ProcessorSupplier<K, V> {
        private final Random rand = new Random();

        ExtendedKStreamDummy() {
        }

        public Processor<K, V> get() {
            return new ExtendedKStreamDummyProcessor();
        }

        private class ExtendedKStreamDummyProcessor
        extends AbstractProcessor<K, V> {
            private ExtendedKStreamDummyProcessor() {
            }

            public void process(K key, V value) {
                if (ExtendedKStreamDummy.this.rand.nextBoolean()) {
                    this.context().forward(key, value);
                }
            }
        }
    }

    private class ExtendedKStream<K, V>
    extends AbstractStream<K, V> {
        ExtendedKStream(KStream<K, V> stream) {
            super((AbstractStream)((KStreamImpl)stream));
        }

        KStream<K, V> randomFilter() {
            String name = this.builder.newProcessorName("RANDOM-FILTER-");
            ProcessorGraphNode processorNode = new ProcessorGraphNode(name, new ProcessorParameters(new ExtendedKStreamDummy(), name), false);
            this.builder.addGraphNode(this.streamsGraphNode, (StreamsGraphNode)processorNode);
            return new KStreamImpl(name, null, null, this.sourceNodes, false, (StreamsGraphNode)processorNode, this.builder);
        }
    }
}

