/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KTableKTableOuterJoinTest {
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final String output = "output";
    private final Consumed<Integer, String> consumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());

    @Test
    public void testJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = builder.table("topic2", this.consumed);
        KTable joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
        joined.toStream().to("output");
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestOutputTopic outputTopic = driver.createOutputTopic("output", Serdes.Integer().deserializer(), Serdes.String().deserializer());
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("X" + expectedKeys[i]), 5L + (long)i);
            }
            inputTopic1.pipeInput(null, (Object)"SomeVal", 42L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "X0+null", 5L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "X1+null", 6L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("Y" + expectedKeys[i]), 10L * (long)i);
            }
            inputTopic2.pipeInput(null, (Object)"AnotherVal", 73L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "X0+Y0", 5L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "X1+Y1", 10L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XX" + expectedKey), 7L);
            }
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "XX0+Y0", 7L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "XX1+Y1", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 2, "XX2+null", 7L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 3, "XX3+null", 7L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("YY" + expectedKey), (long)expectedKey * 5L);
            }
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "XX0+YY0", 7L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "XX1+YY1", 7L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 2, "XX2+YY2", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 3, "XX3+YY3", 15L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXX" + expectedKey), 6L);
            }
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "XXX0+YY0", 6L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "XXX1+YY1", 6L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 2, "XXX2+YY2", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 3, "XXX3+YY3", 15L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            inputTopic2.pipeInput((Object)expectedKeys[0], null, 5L);
            inputTopic2.pipeInput((Object)expectedKeys[1], null, 7L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "XXX0+null", 6L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "XXX1+null", 7L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXXX" + expectedKey), 13L);
            }
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "XXXX0+null", 13L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "XXXX1+null", 13L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 2, "XXXX2+YY2", 13L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 3, "XXXX3+YY3", 15L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            inputTopic1.pipeInput((Object)expectedKeys[0], null, 0L);
            inputTopic1.pipeInput((Object)expectedKeys[1], null, 42L);
            inputTopic1.pipeInput((Object)expectedKeys[2], null, 5L);
            inputTopic1.pipeInput((Object)expectedKeys[3], null, 20L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, null, 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, null, 42L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 2, "null+YY2", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 3, "null+YY3", 20L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
        }
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = builder.table("topic2", this.consumed);
        KTable joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        Topology topology = builder.build().addProcessor("proc", supplier, new String[]{((KTableImpl)joined).name});
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor proc = supplier.theCapturedProcessor();
            Assert.assertTrue((boolean)((KTableImpl)table1).sendingOldValueEnabled());
            Assert.assertTrue((boolean)((KTableImpl)table2).sendingOldValueEnabled());
            Assert.assertFalse((boolean)((KTableImpl)joined).sendingOldValueEnabled());
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("X" + expectedKeys[i]), 5L + (long)i);
            }
            inputTopic1.pipeInput(null, (Object)"SomeVal", 42L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"X0+null", null), 5L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"X1+null", null), 6L));
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("Y" + expectedKeys[i]), 10L * (long)i);
            }
            inputTopic2.pipeInput(null, (Object)"AnotherVal", 73L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"X0+Y0", null), 5L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"X1+Y1", null), 10L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XX" + expectedKey), 7L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XX0+Y0", null), 7L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XX1+Y1", null), 10L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XX2+null", null), 7L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XX3+null", null), 7L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("YY" + expectedKey), (long)expectedKey * 5L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XX0+YY0", null), 7L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XX1+YY1", null), 7L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XX2+YY2", null), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XX3+YY3", null), 15L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXX" + expectedKey), 6L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XXX0+YY0", null), 6L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XXX1+YY1", null), 6L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XXX2+YY2", null), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XXX3+YY3", null), 15L));
            inputTopic2.pipeInput((Object)expectedKeys[0], null, 5L);
            inputTopic2.pipeInput((Object)expectedKeys[1], null, 7L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XXX0+null", null), 6L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XXX1+null", null), 7L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXXX" + expectedKey), 13L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XXXX0+null", null), 13L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XXXX1+null", null), 13L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XXXX2+YY2", null), 13L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XXXX3+YY3", null), 15L));
            inputTopic1.pipeInput((Object)expectedKeys[0], null, 0L);
            inputTopic1.pipeInput((Object)expectedKeys[1], null, 42L);
            inputTopic1.pipeInput((Object)expectedKeys[2], null, 5L);
            inputTopic1.pipeInput((Object)expectedKeys[3], null, 20L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change(null, null), 0L), new KeyValueTimestamp<Integer, Change>(1, new Change(null, null), 42L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"null+YY2", null), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"null+YY3", null), 20L));
        }
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = builder.table("topic2", this.consumed);
        KTable joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
        ((KTableImpl)joined).enableSendingOldValues();
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        Topology topology = builder.build().addProcessor("proc", supplier, new String[]{((KTableImpl)joined).name});
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor proc = supplier.theCapturedProcessor();
            Assert.assertTrue((boolean)((KTableImpl)table1).sendingOldValueEnabled());
            Assert.assertTrue((boolean)((KTableImpl)table2).sendingOldValueEnabled());
            Assert.assertTrue((boolean)((KTableImpl)joined).sendingOldValueEnabled());
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("X" + expectedKeys[i]), 5L + (long)i);
            }
            inputTopic1.pipeInput(null, (Object)"SomeVal", 42L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"X0+null", null), 5L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"X1+null", null), 6L));
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("Y" + expectedKeys[i]), 10L * (long)i);
            }
            inputTopic2.pipeInput(null, (Object)"AnotherVal", 73L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"X0+Y0", (Object)"X0+null"), 5L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"X1+Y1", (Object)"X1+null"), 10L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XX" + expectedKey), 7L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XX0+Y0", (Object)"X0+Y0"), 7L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XX1+Y1", (Object)"X1+Y1"), 10L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XX2+null", null), 7L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XX3+null", null), 7L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("YY" + expectedKey), (long)expectedKey * 5L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XX0+YY0", (Object)"XX0+Y0"), 7L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XX1+YY1", (Object)"XX1+Y1"), 7L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XX2+YY2", (Object)"XX2+null"), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XX3+YY3", (Object)"XX3+null"), 15L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXX" + expectedKey), 6L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XXX0+YY0", (Object)"XX0+YY0"), 6L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XXX1+YY1", (Object)"XX1+YY1"), 6L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XXX2+YY2", (Object)"XX2+YY2"), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XXX3+YY3", (Object)"XX3+YY3"), 15L));
            inputTopic2.pipeInput((Object)expectedKeys[0], null, 5L);
            inputTopic2.pipeInput((Object)expectedKeys[1], null, 7L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XXX0+null", (Object)"XXX0+YY0"), 6L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XXX1+null", (Object)"XXX1+YY1"), 7L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXXX" + expectedKey), 13L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XXXX0+null", (Object)"XXX0+null"), 13L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XXXX1+null", (Object)"XXX1+null"), 13L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XXXX2+YY2", (Object)"XXX2+YY2"), 13L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XXXX3+YY3", (Object)"XXX3+YY3"), 15L));
            inputTopic1.pipeInput((Object)expectedKeys[0], null, 0L);
            inputTopic1.pipeInput((Object)expectedKeys[1], null, 42L);
            inputTopic1.pipeInput((Object)expectedKeys[2], null, 5L);
            inputTopic1.pipeInput((Object)expectedKeys[3], null, 20L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change(null, (Object)"XXXX0+null"), 0L), new KeyValueTimestamp<Integer, Change>(1, new Change(null, (Object)"XXXX1+null"), 42L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"null+YY2", (Object)"XXXX2+YY2"), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"null+YY3", (Object)"XXXX3+YY3"), 20L));
        }
    }

    @Test
    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKeyWithBuiltInMetricsVersionLatest() {
        this.shouldLogAndMeterSkippedRecordsDueToNullLeftKey("latest");
    }

    @Test
    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKeyWithBuiltInMetricsVersion0100To24() {
        this.shouldLogAndMeterSkippedRecordsDueToNullLeftKey("0.10.0-2.4");
    }

    private void shouldLogAndMeterSkippedRecordsDueToNullLeftKey(String builtInMetricsVersion) {
        StreamsBuilder builder = new StreamsBuilder();
        Processor join = new KTableKTableOuterJoin((KTableImpl)builder.table("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())), (KTableImpl)builder.table("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())), null).get();
        this.props.setProperty("built.in.metrics.version", builtInMetricsVersion);
        MockProcessorContext context = new MockProcessorContext(this.props);
        context.setRecordMetadata("left", -1, -2L, null, -3L);
        join.init((ProcessorContext)context);
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        join.process(null, (Object)new Change((Object)"new", (Object)"old"));
        LogCaptureAppender.unregister(appender);
        if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
        }
        MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
    }

    private void assertOutputKeyValueTimestamp(TestOutputTopic<Integer, String> outputTopic, Integer expectedKey, String expectedValue, long expectedTimestamp) {
        MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)expectedKey, (Object)expectedValue, null, Long.valueOf(expectedTimestamp))));
    }
}

