/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Test;

public class KStreamKStreamSelfJoinTest {
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void shouldMatchInnerJoinWithSelfJoinWithSingleStream() {
        ArrayList expected;
        this.props.setProperty("built.in.metrics.version", "latest");
        this.props.put("topology.optimization", "all");
        ValueJoiner valueJoiner = (v, v2) -> v + v2;
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier innerJoinSupplier = new MockApiProcessorSupplier();
        KStream stream2 = streamsBuilder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream innerJoin = stream2.join(stream2, valueJoiner, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        innerJoin.process(innerJoinSupplier, new String[0]);
        Topology innerJoinTopology = streamsBuilder.build();
        try (TopologyTestDriver driver = new TopologyTestDriver(innerJoinTopology);){
            TestInputTopic inputTopic = driver.createInputTopic("topic2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            MockApiProcessor processor = innerJoinSupplier.theCapturedProcessor();
            inputTopic.pipeInput((Object)"A", (Object)"1", 1L);
            inputTopic.pipeInput((Object)"B", (Object)"1", 2L);
            inputTopic.pipeInput((Object)"A", (Object)"2", 3L);
            inputTopic.pipeInput((Object)"B", (Object)"2", 4L);
            inputTopic.pipeInput((Object)"B", (Object)"3", 5L);
            expected = processor.processed();
        }
        MockApiProcessorSupplier selfJoinSupplier = new MockApiProcessorSupplier();
        KStream stream1 = streamsBuilder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream selfJoin = stream1.join(stream1, valueJoiner, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        selfJoin.process(selfJoinSupplier, new String[0]);
        Topology selfJoinTopology = streamsBuilder.build(this.props);
        try (TopologyTestDriver driver = new TopologyTestDriver(selfJoinTopology, this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            MockApiProcessor processor = selfJoinSupplier.theCapturedProcessor();
            inputTopic.pipeInput((Object)"A", (Object)"1", 1L);
            inputTopic.pipeInput((Object)"B", (Object)"1", 2L);
            inputTopic.pipeInput((Object)"A", (Object)"2", 3L);
            inputTopic.pipeInput((Object)"B", (Object)"2", 4L);
            inputTopic.pipeInput((Object)"B", (Object)"3", 5L);
            processor.checkAndClearProcessResult(expected.toArray(new KeyValueTimestamp[0]));
        }
    }

    @Test
    public void shouldMatchInnerJoinWithSelfJoinWithTwoStreams() {
        ArrayList expected;
        this.props.setProperty("built.in.metrics.version", "latest");
        this.props.put("topology.optimization", "all");
        ValueJoiner valueJoiner = (v, v2) -> v + v2;
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier innerJoinSupplier = new MockApiProcessorSupplier();
        KStream stream3 = streamsBuilder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream4 = streamsBuilder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream innerJoin = stream3.join(stream4, valueJoiner, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        innerJoin.process(innerJoinSupplier, new String[0]);
        Topology innerJoinTopology = streamsBuilder.build();
        try (TopologyTestDriver driver = new TopologyTestDriver(innerJoinTopology);){
            TestInputTopic inputTopic = driver.createInputTopic("topic2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            MockApiProcessor processor = innerJoinSupplier.theCapturedProcessor();
            inputTopic.pipeInput((Object)"A", (Object)"1", 1L);
            inputTopic.pipeInput((Object)"B", (Object)"1", 2L);
            inputTopic.pipeInput((Object)"A", (Object)"2", 3L);
            inputTopic.pipeInput((Object)"B", (Object)"2", 4L);
            inputTopic.pipeInput((Object)"B", (Object)"3", 5L);
            expected = processor.processed();
        }
        MockApiProcessorSupplier selfJoinSupplier = new MockApiProcessorSupplier();
        KStream stream1 = streamsBuilder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream2 = streamsBuilder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream selfJoin = stream1.join(stream2, valueJoiner, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        selfJoin.process(selfJoinSupplier, new String[0]);
        Topology topology1 = streamsBuilder.build(this.props);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology1, this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            MockApiProcessor processor = selfJoinSupplier.theCapturedProcessor();
            inputTopic.pipeInput((Object)"A", (Object)"1", 1L);
            inputTopic.pipeInput((Object)"B", (Object)"1", 2L);
            inputTopic.pipeInput((Object)"A", (Object)"2", 3L);
            inputTopic.pipeInput((Object)"B", (Object)"2", 4L);
            inputTopic.pipeInput((Object)"B", (Object)"3", 5L);
            processor.checkAndClearProcessResult(expected.toArray(new KeyValueTimestamp[0]));
        }
    }

    @Test
    public void shouldMatchInnerJoinWithSelfJoinDifferentBeforeAfterWindows() {
        ArrayList expected;
        this.props.setProperty("built.in.metrics.version", "latest");
        this.props.put("topology.optimization", "all");
        ValueJoiner valueJoiner = (v, v2) -> v + v2;
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier innerJoinSupplier = new MockApiProcessorSupplier();
        KStream stream3 = streamsBuilder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream4 = streamsBuilder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream innerJoin = stream3.join(stream4, valueJoiner, JoinWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofSeconds(11L), (Duration)Duration.ofSeconds(10L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        innerJoin.process(innerJoinSupplier, new String[0]);
        Topology innerJoinTopology = streamsBuilder.build();
        try (TopologyTestDriver driver = new TopologyTestDriver(innerJoinTopology);){
            TestInputTopic inputTopic = driver.createInputTopic("topic2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            MockApiProcessor processor = innerJoinSupplier.theCapturedProcessor();
            inputTopic.pipeInput((Object)"A", (Object)"1", 0L);
            inputTopic.pipeInput((Object)"A", (Object)"2", 11000L);
            inputTopic.pipeInput((Object)"B", (Object)"1", 12000L);
            inputTopic.pipeInput((Object)"A", (Object)"3", 13000L);
            inputTopic.pipeInput((Object)"A", (Object)"4", 15000L);
            inputTopic.pipeInput((Object)"C", (Object)"1", 16000L);
            inputTopic.pipeInput((Object)"D", (Object)"1", 17000L);
            inputTopic.pipeInput((Object)"A", (Object)"5", 30000L);
            expected = processor.processed();
        }
        MockApiProcessorSupplier selfJoinSupplier = new MockApiProcessorSupplier();
        KStream stream1 = streamsBuilder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream2 = streamsBuilder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream selfJoin = stream1.join(stream2, valueJoiner, JoinWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofSeconds(11L), (Duration)Duration.ofSeconds(10L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        selfJoin.process(selfJoinSupplier, new String[0]);
        Topology selfJoinTopology = streamsBuilder.build(this.props);
        try (TopologyTestDriver driver = new TopologyTestDriver(selfJoinTopology, this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            MockApiProcessor processor = selfJoinSupplier.theCapturedProcessor();
            inputTopic.pipeInput((Object)"A", (Object)"1", 0L);
            inputTopic.pipeInput((Object)"A", (Object)"2", 11000L);
            inputTopic.pipeInput((Object)"B", (Object)"1", 12000L);
            inputTopic.pipeInput((Object)"A", (Object)"3", 13000L);
            inputTopic.pipeInput((Object)"A", (Object)"4", 15000L);
            inputTopic.pipeInput((Object)"C", (Object)"1", 16000L);
            inputTopic.pipeInput((Object)"D", (Object)"1", 17000L);
            inputTopic.pipeInput((Object)"A", (Object)"5", 30000L);
            processor.checkAndClearProcessResult(expected.toArray(new KeyValueTimestamp[0]));
        }
    }

    @Test
    public void shouldMatchInnerJoinWithSelfJoinOutOfOrderMessages() {
        ArrayList expected;
        this.props.setProperty("built.in.metrics.version", "latest");
        this.props.put("topology.optimization", "all");
        ValueJoiner valueJoiner = (v, v2) -> v + v2;
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier innerJoinSupplier = new MockApiProcessorSupplier();
        KStream stream3 = streamsBuilder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream4 = streamsBuilder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream innerJoin = stream3.join(stream4, valueJoiner, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofSeconds(10L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        innerJoin.process(innerJoinSupplier, new String[0]);
        Topology topology2 = streamsBuilder.build();
        try (TopologyTestDriver driver = new TopologyTestDriver(topology2);){
            TestInputTopic inputTopic = driver.createInputTopic("topic2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            MockApiProcessor processor = innerJoinSupplier.theCapturedProcessor();
            inputTopic.pipeInput((Object)"A", (Object)"1", 0L);
            inputTopic.pipeInput((Object)"A", (Object)"2", 9999L);
            inputTopic.pipeInput((Object)"B", (Object)"1", 11000L);
            inputTopic.pipeInput((Object)"A", (Object)"3", 13000L);
            inputTopic.pipeInput((Object)"A", (Object)"4", 15000L);
            inputTopic.pipeInput((Object)"C", (Object)"1", 16000L);
            inputTopic.pipeInput((Object)"D", (Object)"1", 17000L);
            inputTopic.pipeInput((Object)"A", (Object)"5", 30000L);
            inputTopic.pipeInput((Object)"A", (Object)"5", 6000L);
            expected = processor.processed();
        }
        KStream stream1 = streamsBuilder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream2 = streamsBuilder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        MockApiProcessorSupplier selfJoinSupplier = new MockApiProcessorSupplier();
        KStream selfJoin = stream1.join(stream2, valueJoiner, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofSeconds(10L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        selfJoin.process(selfJoinSupplier, new String[0]);
        Topology selfJoinTopology = streamsBuilder.build(this.props);
        try (TopologyTestDriver driver = new TopologyTestDriver(selfJoinTopology, this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            MockApiProcessor processor = selfJoinSupplier.theCapturedProcessor();
            inputTopic.pipeInput((Object)"A", (Object)"1", 0L);
            inputTopic.pipeInput((Object)"A", (Object)"2", 9999L);
            inputTopic.pipeInput((Object)"B", (Object)"1", 11000L);
            inputTopic.pipeInput((Object)"A", (Object)"3", 13000L);
            inputTopic.pipeInput((Object)"A", (Object)"4", 15000L);
            inputTopic.pipeInput((Object)"C", (Object)"1", 16000L);
            inputTopic.pipeInput((Object)"D", (Object)"1", 17000L);
            inputTopic.pipeInput((Object)"A", (Object)"5", 30000L);
            inputTopic.pipeInput((Object)"A", (Object)"5", 6000L);
            processor.checkAndClearProcessResult(expected.toArray(new KeyValueTimestamp[0]));
        }
    }
}

