/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KStreamKStreamJoinTest {
    private static final String[] EMPTY = new String[0];
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Consumed<Integer, String> consumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory((Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), 0L);
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void shouldLogAndMeterOnSkippedRecordsWithNullValue() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new IntegerSerializer());
        left.join(right, (value1, value2) -> value1 + value2, JoinWindows.of((Duration)Duration.ofMillis(100L)), Joined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(recordFactory.create("left", (Object)"A", null));
            LogCaptureAppender.unregister(appender);
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
        }
    }

    @Test
    public void testJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            MockProcessor processor = supplier.theCapturedProcessor();
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("A" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult(EMPTY);
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i], (Object)("a" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("B" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("b" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)", "3:B3+b3 (ts: 0)");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("C" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
            for (int i2 = 0; i2 < 2; ++i2) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i2], (Object)("c" + expectedKeys[i2])));
            }
            processor.checkAndClearProcessResult("0:A0+c0 (ts: 0)", "0:B0+c0 (ts: 0)", "0:C0+c0 (ts: 0)", "1:A1+c1 (ts: 0)", "1:B1+c1 (ts: 0)", "1:C1+c1 (ts: 0)");
        }
    }

    @Test
    public void testOuterJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            MockProcessor processor = supplier.theCapturedProcessor();
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("A" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i], (Object)("a" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("B" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)", "2:B2+null (ts: 0)", "3:B3+null (ts: 0)");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("b" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)", "3:B3+b3 (ts: 0)");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("C" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
            for (int i2 = 0; i2 < 2; ++i2) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i2], (Object)("c" + expectedKeys[i2])));
            }
            processor.checkAndClearProcessResult("0:A0+c0 (ts: 0)", "0:B0+c0 (ts: 0)", "0:C0+c0 (ts: 0)", "1:A1+c1 (ts: 0)", "1:B1+c1 (ts: 0)", "1:C1+c1 (ts: 0)");
        }
    }

    @Test
    public void testWindowing() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            MockProcessor processor = supplier.theCapturedProcessor();
            long time = 0L;
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time));
            }
            processor.checkAndClearProcessResult(EMPTY);
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i], (Object)("a" + expectedKeys[i]), time));
            }
            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
            time = 1000L;
            for (i = 0; i < expectedKeys.length; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("B" + expectedKeys[i]), time + (long)i));
            }
            processor.checkAndClearProcessResult(EMPTY);
            time += 100L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("b" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:B0+b0 (ts: 1100)", "1:B1+b1 (ts: 1100)", "2:B2+b2 (ts: 1100)", "3:B3+b3 (ts: 1100)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("c" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("1:B1+c1 (ts: 1101)", "2:B2+c2 (ts: 1101)", "3:B3+c3 (ts: 1101)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("d" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("2:B2+d2 (ts: 1102)", "3:B3+d3 (ts: 1102)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("e" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("3:B3+e3 (ts: 1103)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("f" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(EMPTY);
            time = 899L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("g" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(EMPTY);
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("h" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:B0+h0 (ts: 1000)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("i" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:B0+i0 (ts: 1000)", "1:B1+i1 (ts: 1001)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("j" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:B0+j0 (ts: 1000)", "1:B1+j1 (ts: 1001)", "2:B2+j2 (ts: 1002)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("k" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:B0+k0 (ts: 1000)", "1:B1+k1 (ts: 1001)", "2:B2+k2 (ts: 1002)", "3:B3+k3 (ts: 1003)");
            time = 2000L;
            for (int i2 = 0; i2 < expectedKeys.length; ++i2) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i2], (Object)("l" + expectedKeys[i2]), time + (long)i2));
            }
            processor.checkAndClearProcessResult(EMPTY);
            time = 2100L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("C" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:C0+l0 (ts: 2100)", "1:C1+l1 (ts: 2100)", "2:C2+l2 (ts: 2100)", "3:C3+l3 (ts: 2100)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("D" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("1:D1+l1 (ts: 2101)", "2:D2+l2 (ts: 2101)", "3:D3+l3 (ts: 2101)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("E" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("2:E2+l2 (ts: 2102)", "3:E3+l3 (ts: 2102)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("F" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("3:F3+l3 (ts: 2103)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("G" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(EMPTY);
            time = 1899L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("H" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(EMPTY);
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("I" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:I0+l0 (ts: 2000)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("J" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:J0+l0 (ts: 2000)", "1:J1+l1 (ts: 2001)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("K" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:K0+l0 (ts: 2000)", "1:K1+l1 (ts: 2001)", "2:K2+l2 (ts: 2002)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("L" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:L0+l0 (ts: 2000)", "1:L1+l1 (ts: 2001)", "2:L2+l2 (ts: 2002)", "3:L3+l3 (ts: 2003)");
        }
    }

    @Test
    public void testAsymmetricWindowingAfter() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(0L)).after(Duration.ofMillis(100L)), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            MockProcessor processor = supplier.theCapturedProcessor();
            long time = 1000L;
            for (int i = 0; i < expectedKeys.length; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time + (long)i));
            }
            processor.checkAndClearProcessResult(EMPTY);
            time = 999L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("a" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(EMPTY);
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("b" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:A0+b0 (ts: 1000)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("c" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:A0+c0 (ts: 1001)", "1:A1+c1 (ts: 1001)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("d" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:A0+d0 (ts: 1002)", "1:A1+d1 (ts: 1002)", "2:A2+d2 (ts: 1002)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("e" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:A0+e0 (ts: 1003)", "1:A1+e1 (ts: 1003)", "2:A2+e2 (ts: 1003)", "3:A3+e3 (ts: 1003)");
            time = 1100L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("f" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:A0+f0 (ts: 1100)", "1:A1+f1 (ts: 1100)", "2:A2+f2 (ts: 1100)", "3:A3+f3 (ts: 1100)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("g" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("1:A1+g1 (ts: 1101)", "2:A2+g2 (ts: 1101)", "3:A3+g3 (ts: 1101)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("h" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("2:A2+h2 (ts: 1102)", "3:A3+h3 (ts: 1102)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("i" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("3:A3+i3 (ts: 1103)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("j" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(EMPTY);
        }
    }

    @Test
    public void testAsymmetricWindowingBefore() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(0L)).before(Duration.ofMillis(100L)), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            MockProcessor processor = supplier.theCapturedProcessor();
            long time = 1000L;
            for (int i = 0; i < expectedKeys.length; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time + (long)i));
            }
            processor.checkAndClearProcessResult(EMPTY);
            time = 899L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("a" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(EMPTY);
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("b" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:A0+b0 (ts: 1000)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("c" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:A0+c0 (ts: 1000)", "1:A1+c1 (ts: 1001)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("d" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:A0+d0 (ts: 1000)", "1:A1+d1 (ts: 1001)", "2:A2+d2 (ts: 1002)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("e" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:A0+e0 (ts: 1000)", "1:A1+e1 (ts: 1001)", "2:A2+e2 (ts: 1002)", "3:A3+e3 (ts: 1003)");
            time = 1000L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("f" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:A0+f0 (ts: 1000)", "1:A1+f1 (ts: 1001)", "2:A2+f2 (ts: 1002)", "3:A3+f3 (ts: 1003)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("g" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("1:A1+g1 (ts: 1001)", "2:A2+g2 (ts: 1002)", "3:A3+g3 (ts: 1003)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("h" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("2:A2+h2 (ts: 1002)", "3:A3+h3 (ts: 1003)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("i" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("3:A3+i3 (ts: 1003)");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("j" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(EMPTY);
        }
    }
}

