/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KTableKTableInnerJoinTest {
    private static final KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final String output = "output";
    private final Consumed<Integer, String> consumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
    private final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());

    @Test
    public void testJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = builder.table("topic2", this.consumed);
        KTable joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
        joined.toStream().to("output");
        this.doTestJoin(builder, expectedKeys);
    }

    @Test
    public void testQueryableJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = builder.table("topic2", this.consumed);
        KTable table3 = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, this.materialized);
        table3.toStream().to("output");
        this.doTestJoin(builder, expectedKeys);
    }

    @Test
    public void testQueryableNotSendingOldValues() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<Integer, String, Void, Void>();
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = builder.table("topic2", this.consumed);
        KTable joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, this.materialized);
        builder.build().addProcessor("proc", supplier, new String[]{((KTableImpl)joined).name});
        this.doTestNotSendingOldValues(builder, expectedKeys, (KTable<Integer, String>)table1, (KTable<Integer, String>)table2, supplier, (KTable<Integer, String>)joined);
    }

    @Test
    public void testNotSendingOldValues() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<Integer, String, Void, Void>();
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = builder.table("topic2", this.consumed);
        KTable joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
        builder.build().addProcessor("proc", supplier, new String[]{((KTableImpl)joined).name});
        this.doTestNotSendingOldValues(builder, expectedKeys, (KTable<Integer, String>)table1, (KTable<Integer, String>)table2, supplier, (KTable<Integer, String>)joined);
    }

    @Test
    public void testSendingOldValues() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = builder.table("topic2", this.consumed);
        KTable joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
        ((KTableImpl)joined).enableSendingOldValues(true);
        builder.build().addProcessor("proc", supplier, new String[]{((KTableImpl)joined).name});
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor proc = supplier.theCapturedProcessor();
            Assert.assertTrue((boolean)((KTableImpl)table1).sendingOldValueEnabled());
            Assert.assertTrue((boolean)((KTableImpl)table2).sendingOldValueEnabled());
            Assert.assertTrue((boolean)((KTableImpl)joined).sendingOldValueEnabled());
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("X" + expectedKeys[i]), 5L + (long)i);
            }
            inputTopic1.pipeInput(null, (Object)"SomeVal", 42L);
            proc.checkAndClearProcessResult(EMPTY);
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("Y" + expectedKeys[i]), 10L * (long)i);
            }
            inputTopic2.pipeInput(null, (Object)"AnotherVal", 73L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"X0+Y0", null), 5L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"X1+Y1", null), 10L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XX" + expectedKey), 7L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XX0+Y0", (Object)"X0+Y0"), 7L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XX1+Y1", (Object)"X1+Y1"), 10L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("YY" + expectedKey), (long)expectedKey * 5L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XX0+YY0", (Object)"XX0+Y0"), 7L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XX1+YY1", (Object)"XX1+Y1"), 7L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XX2+YY2", null), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XX3+YY3", null), 15L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXX" + expectedKey), 6L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XXX0+YY0", (Object)"XX0+YY0"), 6L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XXX1+YY1", (Object)"XX1+YY1"), 6L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XXX2+YY2", (Object)"XX2+YY2"), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XXX3+YY3", (Object)"XX3+YY3"), 15L));
            inputTopic2.pipeInput((Object)expectedKeys[0], null, 5L);
            inputTopic2.pipeInput((Object)expectedKeys[1], null, 7L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change(null, (Object)"XXX0+YY0"), 6L), new KeyValueTimestamp<Integer, Change>(1, new Change(null, (Object)"XXX1+YY1"), 7L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXXX" + expectedKey), 13L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XXXX2+YY2", (Object)"XXX2+YY2"), 13L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XXXX3+YY3", (Object)"XXX3+YY3"), 15L));
            inputTopic1.pipeInput((Object)expectedKeys[0], null, 0L);
            inputTopic1.pipeInput((Object)expectedKeys[1], null, 42L);
            inputTopic1.pipeInput((Object)expectedKeys[2], null, 5L);
            inputTopic1.pipeInput((Object)expectedKeys[3], null, 20L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(2, new Change(null, (Object)"XXXX2+YY2"), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change(null, (Object)"XXXX3+YY3"), 20L));
        }
    }

    @Test
    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
        StreamsBuilder builder = new StreamsBuilder();
        Processor join = new KTableKTableInnerJoin((KTableImpl)builder.table("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())), (KTableImpl)builder.table("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())), null).get();
        MockProcessorContext context = new MockProcessorContext(this.props);
        context.setRecordMetadata("left", -1, -2L);
        join.init((ProcessorContext)context);
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableKTableInnerJoin.class);){
            join.process(new Record(null, (Object)new Change((Object)"new", (Object)"old"), 0L));
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. topic=[left] partition=[-1] offset=[-2]"));
        }
    }

    private void doTestNotSendingOldValues(StreamsBuilder builder, int[] expectedKeys, KTable<Integer, String> table1, KTable<Integer, String> table2, MockApiProcessorSupplier<Integer, String, Void, Void> supplier, KTable<Integer, String> joined) {
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor<Integer, String, Void, Void> proc = supplier.theCapturedProcessor();
            Assert.assertFalse((boolean)((KTableImpl)table1).sendingOldValueEnabled());
            Assert.assertFalse((boolean)((KTableImpl)table2).sendingOldValueEnabled());
            Assert.assertFalse((boolean)((KTableImpl)joined).sendingOldValueEnabled());
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("X" + expectedKeys[i]), 5L + (long)i);
            }
            inputTopic1.pipeInput(null, (Object)"SomeVal", 42L);
            proc.checkAndClearProcessResult(EMPTY);
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("Y" + expectedKeys[i]), 10L * (long)i);
            }
            inputTopic2.pipeInput(null, (Object)"AnotherVal", 73L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"X0+Y0", null), 5L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"X1+Y1", null), 10L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XX" + expectedKey), 7L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XX0+Y0", null), 7L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XX1+Y1", null), 10L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("YY" + expectedKey), (long)expectedKey * 5L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XX0+YY0", null), 7L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XX1+YY1", null), 7L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XX2+YY2", null), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XX3+YY3", null), 15L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXX" + expectedKey), 6L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change((Object)"XXX0+YY0", null), 6L), new KeyValueTimestamp<Integer, Change>(1, new Change((Object)"XXX1+YY1", null), 6L), new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XXX2+YY2", null), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XXX3+YY3", null), 15L));
            inputTopic2.pipeInput((Object)expectedKeys[0], null, 5L);
            inputTopic2.pipeInput((Object)expectedKeys[1], null, 7L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(0, new Change(null, null), 6L), new KeyValueTimestamp<Integer, Change>(1, new Change(null, null), 7L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXXX" + expectedKey), 13L);
            }
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(2, new Change((Object)"XXXX2+YY2", null), 13L), new KeyValueTimestamp<Integer, Change>(3, new Change((Object)"XXXX3+YY3", null), 15L));
            inputTopic1.pipeInput((Object)expectedKeys[0], null, 0L);
            inputTopic1.pipeInput((Object)expectedKeys[1], null, 42L);
            inputTopic1.pipeInput((Object)expectedKeys[2], null, 5L);
            inputTopic1.pipeInput((Object)expectedKeys[3], null, 20L);
            proc.checkAndClearProcessResult(new KeyValueTimestamp<Integer, Change>(2, new Change(null, null), 10L), new KeyValueTimestamp<Integer, Change>(3, new Change(null, null), 20L));
        }
    }

    private void doTestJoin(StreamsBuilder builder, int[] expectedKeys) {
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestOutputTopic outputTopic = driver.createOutputTopic("output", Serdes.Integer().deserializer(), Serdes.String().deserializer());
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("X" + expectedKeys[i]), 5L + (long)i);
            }
            inputTopic1.pipeInput(null, (Object)"SomeVal", 42L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("Y" + expectedKeys[i]), 10L * (long)i);
            }
            inputTopic2.pipeInput(null, (Object)"AnotherVal", 73L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "X0+Y0", 5L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "X1+Y1", 10L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XX" + expectedKey), 7L);
            }
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "XX0+Y0", 7L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "XX1+Y1", 10L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("YY" + expectedKey), (long)expectedKey * 5L);
            }
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "XX0+YY0", 7L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "XX1+YY1", 7L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 2, "XX2+YY2", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 3, "XX3+YY3", 15L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXX" + expectedKey), 6L);
            }
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, "XXX0+YY0", 6L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, "XXX1+YY1", 6L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 2, "XXX2+YY2", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 3, "XXX3+YY3", 15L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            inputTopic2.pipeInput((Object)expectedKeys[0], null, 5L);
            inputTopic2.pipeInput((Object)expectedKeys[1], null, 7L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 0, null, 6L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 1, null, 7L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("XXXX" + expectedKey), 13L);
            }
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 2, "XXXX2+YY2", 13L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 3, "XXXX3+YY3", 15L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
            inputTopic1.pipeInput((Object)expectedKeys[0], null, 0L);
            inputTopic1.pipeInput((Object)expectedKeys[1], null, 42L);
            inputTopic1.pipeInput((Object)expectedKeys[2], null, 5L);
            inputTopic1.pipeInput((Object)expectedKeys[3], null, 20L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 2, null, 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Integer, String>)outputTopic, 3, null, 20L);
            Assert.assertTrue((boolean)outputTopic.isEmpty());
        }
    }

    private void assertOutputKeyValueTimestamp(TestOutputTopic<Integer, String> outputTopic, Integer expectedKey, String expectedValue, long expectedTimestamp) {
        MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)expectedKey, (Object)expectedValue, null, Long.valueOf(expectedTimestamp))));
    }
}

