/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KStreamImplTest {
    private final Consumed<String, String> stringConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier();
    private KStream<String, String> testStream;
    private StreamsBuilder builder;
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final Serde<String> mySerde = new Serdes.StringSerde();
    private final StreamJoined nullStreamJoinedForTest = null;

    @Before
    public void before() {
        this.builder = new StreamsBuilder();
        this.testStream = this.builder.stream("source");
    }

    @Test
    public void testNumProcesses() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), this.stringConsumed);
        KStream source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), this.stringConsumed);
        KStream stream1 = source1.filter((key, value) -> true).filterNot((key, value) -> false);
        KStream stream2 = stream1.mapValues(Integer::new);
        KStream stream3 = source2.flatMapValues(value -> Collections.singletonList(new Integer((String)value)));
        KStream[] streams2 = stream2.branch(new Predicate[]{(key, value) -> value % 2 == 0, (key, value) -> true});
        KStream[] streams3 = stream3.branch(new Predicate[]{(key, value) -> value % 2 == 0, (key, value) -> true});
        boolean anyWindowSize = true;
        StreamJoined joined = StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        KStream stream4 = streams2[0].join(streams3[0], (value1, value2) -> value1 + value2, JoinWindows.of((Duration)Duration.ofMillis(1L)), joined);
        streams2[1].join(streams3[1], (value1, value2) -> value1 + value2, JoinWindows.of((Duration)Duration.ofMillis(1L)), joined);
        stream4.to("topic-5");
        streams2[1].through("topic-6").process(new MockProcessorSupplier(), new String[0]);
        Assert.assertEquals((long)26L, (long)TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size());
    }

    @Test
    public void shouldPreserveSerdesForOperators() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream(Collections.singleton("topic-1"), this.stringConsumed);
        KTable table1 = builder.table("topic-2", this.stringConsumed);
        GlobalKTable table2 = builder.globalTable("topic-2", this.stringConsumed);
        ConsumedInternal consumedInternal = new ConsumedInternal(this.stringConsumed);
        KeyValueMapper selector = (key, value) -> key;
        KeyValueMapper flatSelector = (key, value) -> Collections.singleton(new KeyValue(key, value));
        ValueMapper mapper = value -> value;
        ValueMapper flatMapper = Collections::singleton;
        ValueJoiner joiner = (value1, value2) -> value1;
        TransformerSupplier transformerSupplier = () -> new Transformer<String, String, KeyValue<String, String>>(){

            public void init(ProcessorContext context) {
            }

            public KeyValue<String, String> transform(String key, String value) {
                return new KeyValue((Object)key, (Object)value);
            }

            public void close() {
            }
        };
        ValueTransformerSupplier valueTransformerSupplier = () -> new ValueTransformer<String, String>(){

            public void init(ProcessorContext context) {
            }

            public String transform(String value) {
                return value;
            }

            public void close() {
            }
        };
        Assert.assertEquals((Object)((AbstractStream)stream1.filter((key, value) -> false)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.filter((key, value) -> false)).valueSerde(), (Object)consumedInternal.valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.filterNot((key, value) -> false)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.filterNot((key, value) -> false)).valueSerde(), (Object)consumedInternal.valueSerde());
        Assert.assertNull((Object)((AbstractStream)stream1.selectKey(selector)).keySerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.selectKey(selector)).valueSerde(), (Object)consumedInternal.valueSerde());
        Assert.assertNull((Object)((AbstractStream)stream1.map(KeyValue::new)).keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.map(KeyValue::new)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.mapValues(mapper)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.mapValues(mapper)).valueSerde());
        Assert.assertNull((Object)((AbstractStream)stream1.flatMap(flatSelector)).keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.flatMap(flatSelector)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.flatMapValues(flatMapper)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.flatMapValues(flatMapper)).valueSerde());
        Assert.assertNull((Object)((AbstractStream)stream1.transform(transformerSupplier, new String[0])).keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.transform(transformerSupplier, new String[0])).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.transformValues(valueTransformerSupplier, new String[0])).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.transformValues(valueTransformerSupplier, new String[0])).valueSerde());
        Assert.assertNull((Object)((AbstractStream)stream1.merge(stream1)).keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.merge(stream1)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.through("topic-3")).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.through("topic-3")).valueSerde(), (Object)consumedInternal.valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.through("topic-3", Produced.with(this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)stream1.through("topic-3", Produced.with(this.mySerde, this.mySerde))).valueSerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)stream1.groupByKey()).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.groupByKey()).valueSerde(), (Object)consumedInternal.valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.groupByKey(Grouped.with(this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)stream1.groupByKey(Grouped.with(this.mySerde, this.mySerde))).valueSerde(), this.mySerde);
        Assert.assertNull((Object)((AbstractStream)stream1.groupBy(selector)).keySerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.groupBy(selector)).valueSerde(), (Object)consumedInternal.valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.groupBy(selector, Grouped.with(this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)stream1.groupBy(selector, Grouped.with(this.mySerde, this.mySerde))).valueSerde(), this.mySerde);
        Assert.assertNull((Object)((AbstractStream)stream1.join(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)))).keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.join(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)))).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.join(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertNull((Object)((AbstractStream)stream1.join(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde))).valueSerde());
        Assert.assertNull((Object)((AbstractStream)stream1.leftJoin(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)))).keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.leftJoin(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)))).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.leftJoin(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertNull((Object)((AbstractStream)stream1.leftJoin(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde))).valueSerde());
        Assert.assertNull((Object)((AbstractStream)stream1.outerJoin(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)))).keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.outerJoin(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)))).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.outerJoin(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertNull((Object)((AbstractStream)stream1.outerJoin(stream1, joiner, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde))).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.join(table1, joiner)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.join(table1, joiner)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.join(table1, joiner, Joined.with(this.mySerde, this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertNull((Object)((AbstractStream)stream1.join(table1, joiner, Joined.with(this.mySerde, this.mySerde, this.mySerde))).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.leftJoin(table1, joiner)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.leftJoin(table1, joiner)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.leftJoin(table1, joiner, Joined.with(this.mySerde, this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertNull((Object)((AbstractStream)stream1.leftJoin(table1, joiner, Joined.with(this.mySerde, this.mySerde, this.mySerde))).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.join(table2, selector, joiner)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.join(table2, selector, joiner)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)stream1.leftJoin(table2, selector, joiner)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)stream1.leftJoin(table2, selector, joiner)).valueSerde());
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), this.stringConsumed);
        KStream stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), this.stringConsumed);
        stream1.to("topic-5");
        stream2.through("topic-6");
        ProcessorTopology processorTopology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null);
        MatcherAssert.assertThat((Object)processorTopology.source("topic-6").getTimestampExtractor(), (org.hamcrest.Matcher)IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
        Assert.assertNull((Object)processorTopology.source("topic-4").getTimestampExtractor());
        Assert.assertNull((Object)processorTopology.source("topic-3").getTimestampExtractor());
        Assert.assertNull((Object)processorTopology.source("topic-2").getTimestampExtractor());
        Assert.assertNull((Object)processorTopology.source("topic-1").getTimestampExtractor());
    }

    @Test
    public void shouldSendDataThroughTopicUsingProduced() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "topic";
        KStream stream = builder.stream("topic", this.stringConsumed);
        stream.through("through-topic", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String())).process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"a", (Object)"b");
        }
        MatcherAssert.assertThat(this.processorSupplier.theCapturedProcessor().processed, (org.hamcrest.Matcher)CoreMatchers.equalTo(Collections.singletonList(new KeyValueTimestamp<String, String>("a", "b", 0L))));
    }

    @Test
    public void shouldSendDataToTopicUsingProduced() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "topic";
        KStream stream = builder.stream("topic", this.stringConsumed);
        stream.to("to-topic", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        builder.stream("to-topic", this.stringConsumed).process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"e", (Object)"f");
        }
        MatcherAssert.assertThat(this.processorSupplier.theCapturedProcessor().processed, (org.hamcrest.Matcher)CoreMatchers.equalTo(Collections.singletonList(new KeyValueTimestamp<String, String>("e", "f", 0L))));
    }

    @Test
    public void shouldSendDataToDynamicTopics() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "topic";
        KStream stream = builder.stream("topic", this.stringConsumed);
        stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.substring(0, 1), Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        builder.stream("topic-a-v", this.stringConsumed).process(this.processorSupplier, new String[0]);
        builder.stream("topic-b-v", this.stringConsumed).process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"a", (Object)"v1");
            inputTopic.pipeInput((Object)"a", (Object)"v2");
            inputTopic.pipeInput((Object)"b", (Object)"v1");
        }
        List<MockProcessor<String, String>> mockProcessors = this.processorSupplier.capturedProcessors(2);
        MatcherAssert.assertThat(mockProcessors.get((int)0).processed, (org.hamcrest.Matcher)CoreMatchers.equalTo(Arrays.asList(new KeyValueTimestamp<String, String>("a", "v1", 0L), new KeyValueTimestamp<String, String>("a", "v2", 0L))));
        MatcherAssert.assertThat(mockProcessors.get((int)1).processed, (org.hamcrest.Matcher)CoreMatchers.equalTo(Collections.singletonList(new KeyValueTimestamp<String, String>("b", "v1", 0L))));
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreatedWithRetention() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream kStream = builder.stream("topic-1", this.stringConsumed);
        ValueJoiner valueJoiner = MockValueJoiner.instance(":");
        long windowSize = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS);
        KStream stream = kStream.map((key, value) -> KeyValue.pair((Object)value, (Object)value));
        stream.join(kStream, valueJoiner, JoinWindows.of((Duration)Duration.ofMillis(windowSize)).grace(Duration.ofMillis(3L * windowSize)), Joined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String())).to("output-topic", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build();
        SourceNode originalSourceNode = topology.source("topic-1");
        for (SourceNode sourceNode : topology.sources()) {
            if (sourceNode.name().equals(originalSourceNode.name())) {
                Assert.assertNull((Object)sourceNode.getTimestampExtractor());
                continue;
            }
            MatcherAssert.assertThat((Object)sourceNode.getTimestampExtractor(), (org.hamcrest.Matcher)IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
        }
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream kStream = builder.stream("topic-1", this.stringConsumed);
        ValueJoiner valueJoiner = MockValueJoiner.instance(":");
        long windowSize = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS);
        KStream stream = kStream.map((key, value) -> KeyValue.pair((Object)value, (Object)value));
        stream.join(kStream, valueJoiner, JoinWindows.of((Duration)Duration.ofMillis(windowSize)).grace(Duration.ofMillis(3L * windowSize)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String())).to("output-topic", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build();
        SourceNode originalSourceNode = topology.source("topic-1");
        for (SourceNode sourceNode : topology.sources()) {
            if (sourceNode.name().equals(originalSourceNode.name())) {
                Assert.assertNull((Object)sourceNode.getTimestampExtractor());
                continue;
            }
            MatcherAssert.assertThat((Object)sourceNode.getTimestampExtractor(), (org.hamcrest.Matcher)IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
        }
    }

    @Test
    public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        GlobalKTable globalKTable = builder.globalTable("globalTopic");
        KeyValueMapper kvMappper = (k, v) -> k + v;
        ValueJoiner valueJoiner = (v1, v2) -> v1 + v2;
        builder.stream("topic").selectKey((k, v) -> v).join(globalKTable, kvMappper, valueJoiner).groupByKey().count();
        Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
        String topology = builder.build().describe().toString();
        Matcher matcher = repartitionTopicPattern.matcher(topology);
        Assert.assertTrue((boolean)matcher.find());
        String match = matcher.group();
        MatcherAssert.assertThat((Object)match, (org.hamcrest.Matcher)CoreMatchers.notNullValue());
        Assert.assertTrue((boolean)match.endsWith("repartition"));
    }

    @Test
    public void testToWithNullValueSerdeDoesntNPE() {
        StreamsBuilder builder = new StreamsBuilder();
        Consumed consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        KStream inputStream = builder.stream(Collections.singleton("input"), consumed);
        inputStream.to("output", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilter() {
        this.testStream.filter(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilterNot() {
        this.testStream.filterNot(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnSelectKey() {
        this.testStream.selectKey(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMap() {
        this.testStream.map(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValues() {
        this.testStream.mapValues((ValueMapper)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValuesWithKey() {
        this.testStream.mapValues((ValueMapperWithKey)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMap() {
        this.testStream.flatMap(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMapValues() {
        this.testStream.flatMapValues((ValueMapper)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() {
        this.testStream.flatMapValues((ValueMapperWithKey)null);
    }

    @Test(expected=IllegalArgumentException.class)
    public void shouldHaveAtLeastOnPredicateWhenBranching() {
        this.testStream.branch(new Predicate[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldCantHaveNullPredicate() {
        this.testStream.branch(new Predicate[]{null});
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicOnThrough() {
        this.testStream.through(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicOnTo() {
        this.testStream.to((String)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicChooserOnTo() {
        this.testStream.to((TopicNameExtractor)null);
    }

    @Test
    public void shouldNotAllowNullTransformerSupplierOnTransform() {
        Exception e = (Exception)Assert.assertThrows(NullPointerException.class, () -> this.testStream.transform(null, new String[0]));
        Assert.assertEquals((Object)"transformerSupplier can't be null", (Object)e.getMessage());
    }

    @Test
    public void shouldNotAllowNullTransformerSupplierOnFlatTransform() {
        Exception e = (Exception)Assert.assertThrows(NullPointerException.class, () -> this.testStream.flatTransform(null, new String[0]));
        Assert.assertEquals((Object)"transformerSupplier can't be null", (Object)e.getMessage());
    }

    @Test
    public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValues() {
        Exception e = (Exception)Assert.assertThrows(NullPointerException.class, () -> this.testStream.transformValues((ValueTransformerWithKeySupplier)null, new String[0]));
        Assert.assertEquals((Object)"valueTransformerSupplier can't be null", (Object)e.getMessage());
    }

    @Test
    public void shouldNotAllowNullValueTransformerSupplierOnTransformValues() {
        Exception e = (Exception)Assert.assertThrows(NullPointerException.class, () -> this.testStream.transformValues((ValueTransformerSupplier)null, new String[0]));
        Assert.assertEquals((Object)"valueTransformerSupplier can't be null", (Object)e.getMessage());
    }

    @Test
    public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValues() {
        Exception e = (Exception)Assert.assertThrows(NullPointerException.class, () -> this.testStream.flatTransformValues((ValueTransformerWithKeySupplier)null, new String[0]));
        Assert.assertEquals((Object)"valueTransformerSupplier can't be null", (Object)e.getMessage());
    }

    @Test
    public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() {
        Exception e = (Exception)Assert.assertThrows(NullPointerException.class, () -> this.testStream.flatTransformValues((ValueTransformerSupplier)null, new String[0]));
        Assert.assertEquals((Object)"valueTransformerSupplier can't be null", (Object)e.getMessage());
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessSupplier() {
        this.testStream.process(null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherStreamOnJoin() {
        this.testStream.join(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(10L)));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullValueJoinerOnJoin() {
        this.testStream.join(this.testStream, null, JoinWindows.of((Duration)Duration.ofMillis(10L)));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinWindowsOnJoin() {
        this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTableOnTableJoin() {
        this.testStream.leftJoin((KTable)null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullValueMapperOnTableJoin() {
        this.testStream.leftJoin(this.builder.table("topic", this.stringConsumed), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSelectorOnGroupBy() {
        this.testStream.groupBy(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullActionOnForEach() {
        this.testStream.foreach(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTableOnJoinWithGlobalTable() {
        this.testStream.join((GlobalKTable)null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
        this.testStream.join(this.builder.globalTable("global", this.stringConsumed), null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
        this.testStream.join(this.builder.globalTable("global", this.stringConsumed), MockMapper.selectValueMapper(), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() {
        this.testStream.leftJoin((GlobalKTable)null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
        this.testStream.leftJoin(this.builder.globalTable("global", this.stringConsumed), null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
        this.testStream.leftJoin(this.builder.globalTable("global", this.stringConsumed), MockMapper.selectValueMapper(), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnPrintIfPrintedIsNull() {
        this.testStream.print(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnThroughWhenProducedIsNull() {
        this.testStream.through("topic", null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnToWhenProducedIsNull() {
        this.testStream.to("topic", null);
    }

    @Test
    public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
        KTable table = this.builder.table("blah", this.stringConsumed);
        try {
            this.testStream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
            Assert.fail((String)"Should have thrown NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() {
        KTable table = this.builder.table("blah", this.stringConsumed);
        try {
            this.testStream.join(table, MockValueJoiner.TOSTRING_JOINER, null);
            Assert.fail((String)"Should have thrown NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnJoinWithStreamWhenStreamJoinedIsNull() {
        this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(10L)), this.nullStreamJoinedForTest);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnOuterJoinStreamJoinedIsNull() {
        this.testStream.outerJoin(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(10L)), this.nullStreamJoinedForTest);
    }

    @Test
    public void shouldMergeTwoStreams() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        KStream source1 = this.builder.stream("topic-1");
        KStream source2 = this.builder.stream("topic-2");
        KStream merged = source1.merge(source2);
        merged.process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic-1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic-2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic1.pipeInput((Object)"A", (Object)"aa");
            inputTopic2.pipeInput((Object)"B", (Object)"bb");
            inputTopic2.pipeInput((Object)"C", (Object)"cc");
            inputTopic1.pipeInput((Object)"D", (Object)"dd");
        }
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<String, String>("A", "aa", 0L), new KeyValueTimestamp<String, String>("B", "bb", 0L), new KeyValueTimestamp<String, String>("C", "cc", 0L), new KeyValueTimestamp<String, String>("D", "dd", 0L)), this.processorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldMergeMultipleStreams() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        String topic3 = "topic-3";
        String topic4 = "topic-4";
        KStream source1 = this.builder.stream("topic-1");
        KStream source2 = this.builder.stream("topic-2");
        KStream source3 = this.builder.stream("topic-3");
        KStream source4 = this.builder.stream("topic-4");
        KStream merged = source1.merge(source2).merge(source3).merge(source4);
        merged.process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic-1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic-2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic3 = driver.createInputTopic("topic-3", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic4 = driver.createInputTopic("topic-4", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic1.pipeInput((Object)"A", (Object)"aa", 1L);
            inputTopic2.pipeInput((Object)"B", (Object)"bb", 9L);
            inputTopic3.pipeInput((Object)"C", (Object)"cc", 2L);
            inputTopic4.pipeInput((Object)"D", (Object)"dd", 8L);
            inputTopic4.pipeInput((Object)"E", (Object)"ee", 3L);
            inputTopic3.pipeInput((Object)"F", (Object)"ff", 7L);
            inputTopic2.pipeInput((Object)"G", (Object)"gg", 4L);
            inputTopic1.pipeInput((Object)"H", (Object)"hh", 6L);
        }
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<String, String>("A", "aa", 1L), new KeyValueTimestamp<String, String>("B", "bb", 9L), new KeyValueTimestamp<String, String>("C", "cc", 2L), new KeyValueTimestamp<String, String>("D", "dd", 8L), new KeyValueTimestamp<String, String>("E", "ee", 3L), new KeyValueTimestamp<String, String>("F", "ff", 7L), new KeyValueTimestamp<String, String>("G", "gg", 4L), new KeyValueTimestamp<String, String>("H", "hh", 6L)), this.processorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldProcessFromSourceThatMatchPattern() {
        KStream pattern2Source = this.builder.stream(Pattern.compile("topic-\\d"));
        pattern2Source.process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic3 = driver.createInputTopic("topic-3", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic4 = driver.createInputTopic("topic-4", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic5 = driver.createInputTopic("topic-5", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic6 = driver.createInputTopic("topic-6", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic7 = driver.createInputTopic("topic-7", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic3.pipeInput((Object)"A", (Object)"aa", 1L);
            inputTopic4.pipeInput((Object)"B", (Object)"bb", 5L);
            inputTopic5.pipeInput((Object)"C", (Object)"cc", 10L);
            inputTopic6.pipeInput((Object)"D", (Object)"dd", 8L);
            inputTopic7.pipeInput((Object)"E", (Object)"ee", 3L);
        }
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<String, String>("A", "aa", 1L), new KeyValueTimestamp<String, String>("B", "bb", 5L), new KeyValueTimestamp<String, String>("C", "cc", 10L), new KeyValueTimestamp<String, String>("D", "dd", 8L), new KeyValueTimestamp<String, String>("E", "ee", 3L)), this.processorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldProcessFromSourcesThatMatchMultiplePattern() {
        String topic3 = "topic-without-pattern";
        KStream pattern2Source1 = this.builder.stream(Pattern.compile("topic-\\d"));
        KStream pattern2Source2 = this.builder.stream(Pattern.compile("topic-[A-Z]"));
        KStream source3 = this.builder.stream("topic-without-pattern");
        KStream merged = pattern2Source1.merge(pattern2Source2).merge(source3);
        merged.process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic3 = driver.createInputTopic("topic-3", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic4 = driver.createInputTopic("topic-4", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopicA = driver.createInputTopic("topic-A", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopicZ = driver.createInputTopic("topic-Z", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic = driver.createInputTopic("topic-without-pattern", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic3.pipeInput((Object)"A", (Object)"aa", 1L);
            inputTopic4.pipeInput((Object)"B", (Object)"bb", 5L);
            inputTopicA.pipeInput((Object)"C", (Object)"cc", 10L);
            inputTopicZ.pipeInput((Object)"D", (Object)"dd", 8L);
            inputTopic.pipeInput((Object)"E", (Object)"ee", 3L);
        }
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<String, String>("A", "aa", 1L), new KeyValueTimestamp<String, String>("B", "bb", 5L), new KeyValueTimestamp<String, String>("C", "cc", 10L), new KeyValueTimestamp<String, String>("D", "dd", 8L), new KeyValueTimestamp<String, String>("E", "ee", 3L)), this.processorSupplier.theCapturedProcessor().processed);
    }
}

