/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class KStreamImplTest {
    private final Serde<String> stringSerde = Serdes.String();
    private final Serde<Integer> intSerde = Serdes.Integer();
    private final Consumed<String, String> stringConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private KStream<String, String> testStream;
    private StreamsBuilder builder;
    private final Consumed<String, String> consumed = Consumed.with(this.stringSerde, this.stringSerde);
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    @Before
    public void before() {
        this.builder = new StreamsBuilder();
        this.testStream = this.builder.stream("source");
    }

    @Test
    public void testNumProcesses() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), this.consumed);
        KStream source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), this.consumed);
        KStream stream1 = source1.filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return true;
            }
        }).filterNot((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return false;
            }
        });
        KStream stream2 = stream1.mapValues((ValueMapper)new ValueMapper<String, Integer>(){

            public Integer apply(String value) {
                return new Integer(value);
            }
        });
        KStream stream3 = source2.flatMapValues((ValueMapper)new ValueMapper<String, Iterable<Integer>>(){

            public Iterable<Integer> apply(String value) {
                return Collections.singletonList(new Integer(value));
            }
        });
        KStream[] streams2 = stream2.branch(new Predicate[]{new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return true;
            }
        }});
        KStream[] streams3 = stream3.branch(new Predicate[]{new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return true;
            }
        }});
        boolean anyWindowSize = true;
        Joined joined = Joined.with(this.stringSerde, this.intSerde, this.intSerde);
        KStream stream4 = streams2[0].join(streams3[0], (ValueJoiner)new ValueJoiner<Integer, Integer, Integer>(){

            public Integer apply(Integer value1, Integer value2) {
                return value1 + value2;
            }
        }, JoinWindows.of((long)1L), joined);
        streams2[1].join(streams3[1], (ValueJoiner)new ValueJoiner<Integer, Integer, Integer>(){

            public Integer apply(Integer value1, Integer value2) {
                return value1 + value2;
            }
        }, JoinWindows.of((long)1L), joined);
        stream4.to("topic-5");
        streams2[1].through("topic-6").process(new MockProcessorSupplier(), new String[0]);
        Assert.assertEquals((long)26L, (long)StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null).processors().size());
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
        StreamsBuilder builder = new StreamsBuilder();
        Consumed consumed = Consumed.with(this.stringSerde, this.stringSerde);
        KStream stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
        KStream stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
        stream1.to("topic-5");
        stream2.through("topic-6");
        ProcessorTopology processorTopology = StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null);
        Assert.assertThat((Object)processorTopology.source("topic-6").getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
        Assert.assertEquals((Object)processorTopology.source("topic-4").getTimestampExtractor(), null);
        Assert.assertEquals((Object)processorTopology.source("topic-3").getTimestampExtractor(), null);
        Assert.assertEquals((Object)processorTopology.source("topic-2").getTimestampExtractor(), null);
        Assert.assertEquals((Object)processorTopology.source("topic-1").getTimestampExtractor(), null);
    }

    @Test
    public void shouldSendDataThroughTopicUsingProduced() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "topic";
        KStream stream = builder.stream("topic", this.consumed);
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        stream.through("through-topic", Produced.with(this.stringSerde, this.stringSerde)).process(processorSupplier, new String[0]);
        this.driver.setUp(builder);
        this.driver.process("topic", "a", "b");
        Assert.assertThat(processorSupplier.processed, (Matcher)CoreMatchers.equalTo(Collections.singletonList("a:b")));
    }

    @Test
    public void shouldSendDataToTopicUsingProduced() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "topic";
        KStream stream = builder.stream("topic", this.consumed);
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        stream.to("to-topic", Produced.with(this.stringSerde, this.stringSerde));
        builder.stream("to-topic", this.consumed).process(processorSupplier, new String[0]);
        this.driver.setUp(builder);
        this.driver.process("topic", "e", "f");
        Assert.assertThat(processorSupplier.processed, (Matcher)CoreMatchers.equalTo(Collections.singletonList("e:f")));
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
        KStreamBuilder builder = new KStreamBuilder();
        KStream kStream = builder.stream(this.stringSerde, this.stringSerde, new String[]{"topic-1"});
        ValueJoiner valueJoiner = MockValueJoiner.instance(":");
        long windowSize = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS);
        KStream stream = kStream.map((KeyValueMapper)new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>(){

            public KeyValue<? extends String, ? extends String> apply(String key, String value) {
                return KeyValue.pair((Object)value, (Object)value);
            }
        });
        stream.join(kStream, valueJoiner, JoinWindows.of((long)windowSize).until(3L * windowSize), Joined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String())).to(Serdes.String(), Serdes.String(), "output-topic");
        ProcessorTopology processorTopology = builder.setApplicationId("X").build(null);
        SourceNode originalSourceNode = processorTopology.source("topic-1");
        for (SourceNode sourceNode : processorTopology.sources()) {
            if (sourceNode.name().equals(originalSourceNode.name())) {
                Assert.assertEquals((Object)sourceNode.getTimestampExtractor(), null);
                continue;
            }
            Assert.assertThat((Object)sourceNode.getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
        }
    }

    @Test
    public void testToWithNullValueSerdeDoesntNPE() {
        StreamsBuilder builder = new StreamsBuilder();
        Consumed consumed = Consumed.with(this.stringSerde, this.stringSerde);
        KStream inputStream = builder.stream(Collections.singleton("input"), consumed);
        inputStream.to(this.stringSerde, null, "output");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilter() {
        this.testStream.filter(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilterNot() {
        this.testStream.filterNot(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnSelectKey() {
        this.testStream.selectKey(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMap() {
        this.testStream.map(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValues() {
        this.testStream.mapValues(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullFilePathOnWriteAsText() {
        this.testStream.writeAsText(null);
    }

    @Test(expected=TopologyException.class)
    public void shouldNotAllowEmptyFilePathOnWriteAsText() {
        this.testStream.writeAsText("\t    \t");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMap() {
        this.testStream.flatMap(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMapValues() {
        this.testStream.flatMapValues(null);
    }

    @Test(expected=IllegalArgumentException.class)
    public void shouldHaveAtLeastOnPredicateWhenBranching() {
        this.testStream.branch(new Predicate[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldCantHaveNullPredicate() {
        this.testStream.branch(new Predicate[]{null});
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicOnThrough() {
        this.testStream.through(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicOnTo() {
        this.testStream.to(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTransformSupplierOnTransform() {
        this.testStream.transform(null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTransformSupplierOnTransformValues() {
        this.testStream.transformValues(null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessSupplier() {
        this.testStream.process(null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherStreamOnJoin() {
        this.testStream.join(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)10L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullValueJoinerOnJoin() {
        this.testStream.join(this.testStream, null, JoinWindows.of((long)10L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinWindowsOnJoin() {
        this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTableOnTableJoin() {
        this.testStream.leftJoin((KTable)null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullValueMapperOnTableJoin() {
        this.testStream.leftJoin(this.builder.table("topic", this.stringConsumed), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSelectorOnGroupBy() {
        this.testStream.groupBy(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullActionOnForEach() {
        this.testStream.foreach(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTableOnJoinWithGlobalTable() {
        this.testStream.join((GlobalKTable)null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
        this.testStream.join(this.builder.globalTable("global", this.stringConsumed), null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
        this.testStream.join(this.builder.globalTable("global", this.stringConsumed), MockMapper.selectValueMapper(), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() {
        this.testStream.leftJoin((GlobalKTable)null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
        this.testStream.leftJoin(this.builder.globalTable("global", this.stringConsumed), null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
        this.testStream.leftJoin(this.builder.globalTable("global", this.stringConsumed), MockMapper.selectValueMapper(), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnPrintIfPrintedIsNull() {
        this.testStream.print((Printed)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnThroughWhenProducedIsNull() {
        this.testStream.through("topic", null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnToWhenProducedIsNull() {
        this.testStream.to("topic", null);
    }

    @Test
    public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
        KTable table = this.builder.table("blah", this.consumed);
        try {
            this.testStream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
            Assert.fail((String)"Should have thrown NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() {
        KTable table = this.builder.table("blah", this.consumed);
        try {
            this.testStream.join(table, MockValueJoiner.TOSTRING_JOINER, null);
            Assert.fail((String)"Should have thrown NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnJoinWithStreamWhenJoinedIsNull() {
        this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)10L), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() {
        this.testStream.outerJoin(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)10L), null);
    }

    @Test
    public void shouldMergeTwoStreams() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        KStream source1 = this.builder.stream("topic-1");
        KStream source2 = this.builder.stream("topic-2");
        KStream merged = source1.merge(source2);
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        merged.process(processorSupplier, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-1", "A", "aa");
        this.driver.process("topic-2", "B", "bb");
        this.driver.process("topic-2", "C", "cc");
        this.driver.process("topic-1", "D", "dd");
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa", "B:bb", "C:cc", "D:dd"}), processorSupplier.processed);
    }

    @Test
    public void shouldMergeMultipleStreams() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        String topic3 = "topic-3";
        String topic4 = "topic-4";
        KStream source1 = this.builder.stream("topic-1");
        KStream source2 = this.builder.stream("topic-2");
        KStream source3 = this.builder.stream("topic-3");
        KStream source4 = this.builder.stream("topic-4");
        KStream merged = source1.merge(source2).merge(source3).merge(source4);
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        merged.process(processorSupplier, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-1", "A", "aa");
        this.driver.process("topic-2", "B", "bb");
        this.driver.process("topic-3", "C", "cc");
        this.driver.process("topic-4", "D", "dd");
        this.driver.process("topic-4", "E", "ee");
        this.driver.process("topic-3", "F", "ff");
        this.driver.process("topic-2", "G", "gg");
        this.driver.process("topic-1", "H", "hh");
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"}), processorSupplier.processed);
    }
}

