/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KStreamKStreamJoinTest {
    private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Consumed<Integer, String> consumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final JoinWindows joinWindows = JoinWindows.of((Duration)Duration.ofMillis(50L)).grace(Duration.ofMillis(50L));
    private final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
    private final String errorMessagePrefix = "Window settings mismatch. WindowBytesStoreSupplier settings";

    @Test
    public void shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVersion0100To24() {
        this.shouldLogAndMeterOnSkippedRecordsWithNullValue("0.10.0-2.4");
    }

    @Test
    public void shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVersionLatest() {
        this.shouldLogAndMeterOnSkippedRecordsWithNullValue("latest");
    }

    private void shouldLogAndMeterOnSkippedRecordsWithNullValue(String builtInMetricsVersion) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        left.join(right, (value1, value2) -> value1 + value2, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        this.props.setProperty("built.in.metrics.version", builtInMetricsVersion);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("left", (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer());
            inputTopic.pipeInput((Object)"A", null);
            LogCaptureAppender.unregister(appender);
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
            if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
                Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
            }
        }
    }

    @Test
    public void shouldThrowExceptionThisStoreSupplierRetentionDoNotMatchWindowsSizeAndGrace() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 500L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionThisStoreSupplierWindowSizeDoesNotMatchJoinWindowsWindowSize() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 150L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionWhenThisJoinStoreSetsRetainDuplicatesFalse() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, false);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "The StoreSupplier must set retainDuplicates=true, found retainDuplicates=false");
    }

    @Test
    public void shouldThrowExceptionOtherStoreSupplierRetentionDoNotMatchWindowsSizeAndGrace() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 500L, 100L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionOtherStoreSupplierWindowSizeDoesNotMatchJoinWindowsWindowSize() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 150L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionWhenOtherJoinStoreSetsRetainDuplicatesFalse() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, false);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "The StoreSupplier must set retainDuplicates=true, found retainDuplicates=false");
    }

    @Test
    public void shouldBuildJoinWithCustomStoresAndCorrectWindowSettings() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, true);
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        left.join(right, (value1, value2) -> value1 + value2, this.joinWindows, this.streamJoined);
        builder.build();
    }

    @Test
    public void shouldExceptionWhenJoinStoresDontHaveUniqueNames() {
        JoinWindows joinWindows = JoinWindows.of((Duration)Duration.ofMillis(100L)).grace(Duration.ofMillis(50L));
        StreamJoined streamJoined = StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows, "Both StoreSuppliers have the same name.  StoreSuppliers must provide unique names");
    }

    @Test
    public void shouldJoinWithCustomStoreSuppliers() {
        JoinWindows joinWindows = JoinWindows.of((Duration)Duration.ofMillis(100L));
        WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store-other", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        StreamJoined streamJoined = StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        this.runJoin((StreamJoined<String, Integer, Integer>)streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
        this.runJoin((StreamJoined<String, Integer, Integer>)streamJoined.withThisStoreSupplier(thisStoreSupplier), joinWindows);
        this.runJoin((StreamJoined<String, Integer, Integer>)streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
    }

    private void runJoin(StreamJoined<String, Integer, Integer> streamJoined, JoinWindows joinWindows) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream joinedStream = left.join(right, (value1, value2) -> value1 + value2, joinWindows, streamJoined);
        joinedStream.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopicLeft = driver.createInputTopic("left", (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopicRight = driver.createInputTopic("right", (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            inputTopicLeft.pipeInput((Object)"A", (Object)1, 1L);
            inputTopicLeft.pipeInput((Object)"B", (Object)1, 2L);
            inputTopicRight.pipeInput((Object)"A", (Object)1, 1L);
            inputTopicRight.pipeInput((Object)"B", (Object)2, 2L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<String, Integer>("A", 2, 1L), new KeyValueTimestamp<String, Integer>("B", 3, 2L));
        }
    }

    @Test
    public void testJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(EMPTY);
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("a" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("B" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+a1", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+b0", 0L), new KeyValueTimestamp<Integer, String>(0, "B0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+b1", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "B2+b2", 0L), new KeyValueTimestamp<Integer, String>(3, "B3+b3", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("C" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "C0+a0", 0L), new KeyValueTimestamp<Integer, String>(0, "C0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+a1", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "C2+b2", 0L), new KeyValueTimestamp<Integer, String>(3, "C3+b3", 0L));
            for (int i2 = 0; i2 < 2; ++i2) {
                inputTopic2.pipeInput((Object)expectedKeys[i2], (Object)("c" + expectedKeys[i2]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+c0", 0L), new KeyValueTimestamp<Integer, String>(0, "B0+c0", 0L), new KeyValueTimestamp<Integer, String>(0, "C0+c0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+c1", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+c1", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+c1", 0L));
        }
    }

    @Test
    public void testOuterJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+null", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+null", 0L));
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("a" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("B" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+a1", 0L), new KeyValueTimestamp<Integer, String>(2, "B2+null", 0L), new KeyValueTimestamp<Integer, String>(3, "B3+null", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+b0", 0L), new KeyValueTimestamp<Integer, String>(0, "B0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+b1", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "B2+b2", 0L), new KeyValueTimestamp<Integer, String>(3, "B3+b3", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("C" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "C0+a0", 0L), new KeyValueTimestamp<Integer, String>(0, "C0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+a1", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "C2+b2", 0L), new KeyValueTimestamp<Integer, String>(3, "C3+b3", 0L));
            for (int i2 = 0; i2 < 2; ++i2) {
                inputTopic2.pipeInput((Object)expectedKeys[i2], (Object)("c" + expectedKeys[i2]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+c0", 0L), new KeyValueTimestamp<Integer, String>(0, "B0+c0", 0L), new KeyValueTimestamp<Integer, String>(0, "C0+c0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+c1", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+c1", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+c1", 0L));
        }
    }

    @Test
    public void testWindowing() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            long time = 0L;
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time);
            }
            processor.checkAndClearProcessResult(EMPTY);
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("a" + expectedKeys[i]), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 0L));
            time = 1000L;
            for (i = 0; i < expectedKeys.length; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("B" + expectedKeys[i]), time + (long)i);
            }
            processor.checkAndClearProcessResult(EMPTY);
            time += 100L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+b0", 1100L), new KeyValueTimestamp<Integer, String>(1, "B1+b1", 1100L), new KeyValueTimestamp<Integer, String>(2, "B2+b2", 1100L), new KeyValueTimestamp<Integer, String>(3, "B3+b3", 1100L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("c" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "B1+c1", 1101L), new KeyValueTimestamp<Integer, String>(2, "B2+c2", 1101L), new KeyValueTimestamp<Integer, String>(3, "B3+c3", 1101L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("d" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "B2+d2", 1102L), new KeyValueTimestamp<Integer, String>(3, "B3+d3", 1102L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("e" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(3, "B3+e3", 1103L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("f" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(EMPTY);
            time = 899L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("g" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(EMPTY);
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("h" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+h0", 1000L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("i" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+i0", 1000L), new KeyValueTimestamp<Integer, String>(1, "B1+i1", 1001L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("j" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+j0", 1000L), new KeyValueTimestamp<Integer, String>(1, "B1+j1", 1001L), new KeyValueTimestamp<Integer, String>(2, "B2+j2", 1002L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("k" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+k0", 1000L), new KeyValueTimestamp<Integer, String>(1, "B1+k1", 1001L), new KeyValueTimestamp<Integer, String>(2, "B2+k2", 1002L), new KeyValueTimestamp<Integer, String>(3, "B3+k3", 1003L));
            time = 2000L;
            for (int i2 = 0; i2 < expectedKeys.length; ++i2) {
                inputTopic2.pipeInput((Object)expectedKeys[i2], (Object)("l" + expectedKeys[i2]), time + (long)i2);
            }
            processor.checkAndClearProcessResult(EMPTY);
            time = 2100L;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("C" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "C0+l0", 2100L), new KeyValueTimestamp<Integer, String>(1, "C1+l1", 2100L), new KeyValueTimestamp<Integer, String>(2, "C2+l2", 2100L), new KeyValueTimestamp<Integer, String>(3, "C3+l3", 2100L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("D" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "D1+l1", 2101L), new KeyValueTimestamp<Integer, String>(2, "D2+l2", 2101L), new KeyValueTimestamp<Integer, String>(3, "D3+l3", 2101L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("E" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "E2+l2", 2102L), new KeyValueTimestamp<Integer, String>(3, "E3+l3", 2102L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("F" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(3, "F3+l3", 2103L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("G" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(EMPTY);
            time = 1899L;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("H" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(EMPTY);
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("I" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "I0+l0", 2000L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("J" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "J0+l0", 2000L), new KeyValueTimestamp<Integer, String>(1, "J1+l1", 2001L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("K" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "K0+l0", 2000L), new KeyValueTimestamp<Integer, String>(1, "K1+l1", 2001L), new KeyValueTimestamp<Integer, String>(2, "K2+l2", 2002L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("L" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "L0+l0", 2000L), new KeyValueTimestamp<Integer, String>(1, "L1+l1", 2001L), new KeyValueTimestamp<Integer, String>(2, "L2+l2", 2002L), new KeyValueTimestamp<Integer, String>(3, "L3+l3", 2003L));
        }
    }

    @Test
    public void testAsymmetricWindowingAfter() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(0L)).after(Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            long time = 1000L;
            for (int i = 0; i < expectedKeys.length; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time + (long)i);
            }
            processor.checkAndClearProcessResult(EMPTY);
            time = 999L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("a" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(EMPTY);
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+b0", 1000L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("c" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+c0", 1001L), new KeyValueTimestamp<Integer, String>(1, "A1+c1", 1001L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("d" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+d0", 1002L), new KeyValueTimestamp<Integer, String>(1, "A1+d1", 1002L), new KeyValueTimestamp<Integer, String>(2, "A2+d2", 1002L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("e" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+e0", 1003L), new KeyValueTimestamp<Integer, String>(1, "A1+e1", 1003L), new KeyValueTimestamp<Integer, String>(2, "A2+e2", 1003L), new KeyValueTimestamp<Integer, String>(3, "A3+e3", 1003L));
            time = 1100L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("f" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+f0", 1100L), new KeyValueTimestamp<Integer, String>(1, "A1+f1", 1100L), new KeyValueTimestamp<Integer, String>(2, "A2+f2", 1100L), new KeyValueTimestamp<Integer, String>(3, "A3+f3", 1100L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("g" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "A1+g1", 1101L), new KeyValueTimestamp<Integer, String>(2, "A2+g2", 1101L), new KeyValueTimestamp<Integer, String>(3, "A3+g3", 1101L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("h" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "A2+h2", 1102L), new KeyValueTimestamp<Integer, String>(3, "A3+h3", 1102L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("i" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(3, "A3+i3", 1103L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("j" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(EMPTY);
        }
    }

    @Test
    public void testAsymmetricWindowingBefore() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(0L)).before(Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            long time = 1000L;
            for (int i = 0; i < expectedKeys.length; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time + (long)i);
            }
            processor.checkAndClearProcessResult(EMPTY);
            time = 899L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("a" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(EMPTY);
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+b0", 1000L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("c" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+c0", 1000L), new KeyValueTimestamp<Integer, String>(1, "A1+c1", 1001L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("d" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+d0", 1000L), new KeyValueTimestamp<Integer, String>(1, "A1+d1", 1001L), new KeyValueTimestamp<Integer, String>(2, "A2+d2", 1002L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("e" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+e0", 1000L), new KeyValueTimestamp<Integer, String>(1, "A1+e1", 1001L), new KeyValueTimestamp<Integer, String>(2, "A2+e2", 1002L), new KeyValueTimestamp<Integer, String>(3, "A3+e3", 1003L));
            time = 1000L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("f" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+f0", 1000L), new KeyValueTimestamp<Integer, String>(1, "A1+f1", 1001L), new KeyValueTimestamp<Integer, String>(2, "A2+f2", 1002L), new KeyValueTimestamp<Integer, String>(3, "A3+f3", 1003L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("g" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "A1+g1", 1001L), new KeyValueTimestamp<Integer, String>(2, "A2+g2", 1002L), new KeyValueTimestamp<Integer, String>(3, "A3+g3", 1003L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("h" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "A2+h2", 1002L), new KeyValueTimestamp<Integer, String>(3, "A3+h3", 1003L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("i" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(3, "A3+i3", 1003L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("j" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(EMPTY);
        }
    }

    private void buildStreamsJoinThatShouldThrow(StreamJoined<String, Integer, Integer> streamJoined, JoinWindows joinWindows, String expectedExceptionMessagePrefix) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        StreamsException streamsException = (StreamsException)Assert.assertThrows(StreamsException.class, () -> left.join(right, (value1, value2) -> value1 + value2, joinWindows, streamJoined));
        Assert.assertTrue((boolean)streamsException.getMessage().startsWith(expectedExceptionMessagePrefix));
    }

    private WindowBytesStoreSupplier buildWindowBytesStoreSupplier(String name, long retentionPeriod, long windowSize, boolean retainDuplicates) {
        return Stores.inMemoryWindowStore((String)name, (Duration)Duration.ofMillis(retentionPeriod), (Duration)Duration.ofMillis(windowSize), (boolean)retainDuplicates);
    }
}

