/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KStreamGlobalKTableLeftJoinTest {
    private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
    private final String streamTopic = "streamTopic";
    private final String globalTableTopic = "globalTableTopic";
    private final int[] expectedKeys = new int[]{0, 1, 2, 3};
    private MockApiProcessor<Integer, String, Void, Void> processor;
    private TopologyTestDriver driver;
    private StreamsBuilder builder;

    @Before
    public void setUp() {
        this.builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        Consumed streamConsumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
        Consumed tableConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        KStream stream = this.builder.stream("streamTopic", streamConsumed);
        GlobalKTable table = this.builder.globalTable("globalTableTopic", tableConsumed);
        KeyValueMapper keyMapper = (key, value) -> {
            String[] tokens = value.split(",");
            return tokens.length > 1 ? tokens[1] : null;
        };
        stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier, new String[0]);
        Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
        this.driver = new TopologyTestDriver(this.builder.build(), props);
        this.processor = supplier.theCapturedProcessor();
    }

    @After
    public void cleanup() {
        this.driver.close();
    }

    private void pushToStream(int messageCount, String valuePrefix, boolean includeForeignKey, boolean includeNullKey) {
        TestInputTopic inputTopic = this.driver.createInputTopic("streamTopic", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
        for (int i = 0; i < messageCount; ++i) {
            String value = valuePrefix + this.expectedKeys[i];
            if (includeForeignKey) {
                value = value + ",FKey" + this.expectedKeys[i];
            }
            Integer key = this.expectedKeys[i];
            if (includeNullKey && i == 0) {
                key = null;
            }
            inputTopic.pipeInput((Object)key, (Object)value);
        }
    }

    private void pushToGlobalTable(int messageCount, String valuePrefix) {
        TestInputTopic inputTopic = this.driver.createInputTopic("globalTableTopic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
        for (int i = 0; i < messageCount; ++i) {
            inputTopic.pipeInput((Object)("FKey" + this.expectedKeys[i]), (Object)(valuePrefix + this.expectedKeys[i]));
        }
    }

    private void pushNullValueToGlobalTable(int messageCount) {
        TestInputTopic inputTopic = this.driver.createInputTopic("globalTableTopic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
        for (int i = 0; i < messageCount; ++i) {
            inputTopic.pipeInput((Object)("FKey" + this.expectedKeys[i]), (Object)null);
        }
    }

    @Test
    public void shouldNotRequireCopartitioning() {
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(this.builder.build()).copartitionGroups();
        Assert.assertEquals((String)"KStream-GlobalKTable joins do not need to be co-partitioned", (long)0L, (long)copartitionGroups.size());
    }

    @Test
    public void shouldNotJoinWithEmptyGlobalTableOnStreamUpdates() {
        this.pushToStream(2, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "X0,FKey0+null", 0L), new KeyValueTimestamp<Integer, String>(1, "X1,FKey1+null", 1L));
    }

    @Test
    public void shouldNotJoinOnGlobalTableUpdates() {
        this.pushToStream(2, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "X0,FKey0+null", 0L), new KeyValueTimestamp<Integer, String>(1, "X1,FKey1+null", 1L));
        this.pushToGlobalTable(2, "Y");
        this.processor.checkAndClearProcessResult(EMPTY);
        this.pushToStream(4, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "X0,FKey0+Y0", 0L), new KeyValueTimestamp<Integer, String>(1, "X1,FKey1+Y1", 1L), new KeyValueTimestamp<Integer, String>(2, "X2,FKey2+null", 2L), new KeyValueTimestamp<Integer, String>(3, "X3,FKey3+null", 3L));
        this.pushToGlobalTable(4, "YY");
        this.processor.checkAndClearProcessResult(EMPTY);
        this.pushToStream(4, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "X0,FKey0+YY0", 0L), new KeyValueTimestamp<Integer, String>(1, "X1,FKey1+YY1", 1L), new KeyValueTimestamp<Integer, String>(2, "X2,FKey2+YY2", 2L), new KeyValueTimestamp<Integer, String>(3, "X3,FKey3+YY3", 3L));
        this.pushToGlobalTable(4, "YYY");
        this.processor.checkAndClearProcessResult(EMPTY);
    }

    @Test
    public void shouldJoinRegardlessIfMatchFoundOnStreamUpdates() {
        this.pushToGlobalTable(2, "Y");
        this.processor.checkAndClearProcessResult(EMPTY);
        this.pushToStream(4, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "X0,FKey0+Y0", 0L), new KeyValueTimestamp<Integer, String>(1, "X1,FKey1+Y1", 1L), new KeyValueTimestamp<Integer, String>(2, "X2,FKey2+null", 2L), new KeyValueTimestamp<Integer, String>(3, "X3,FKey3+null", 3L));
    }

    @Test
    public void shouldClearGlobalTableEntryOnNullValueUpdates() {
        this.pushToGlobalTable(4, "Y");
        this.processor.checkAndClearProcessResult(EMPTY);
        this.pushToStream(4, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "X0,FKey0+Y0", 0L), new KeyValueTimestamp<Integer, String>(1, "X1,FKey1+Y1", 1L), new KeyValueTimestamp<Integer, String>(2, "X2,FKey2+Y2", 2L), new KeyValueTimestamp<Integer, String>(3, "X3,FKey3+Y3", 3L));
        this.pushNullValueToGlobalTable(2);
        this.processor.checkAndClearProcessResult(EMPTY);
        this.pushToStream(4, "XX", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "XX0,FKey0+null", 0L), new KeyValueTimestamp<Integer, String>(1, "XX1,FKey1+null", 1L), new KeyValueTimestamp<Integer, String>(2, "XX2,FKey2+Y2", 2L), new KeyValueTimestamp<Integer, String>(3, "XX3,FKey3+Y3", 3L));
    }

    @Test
    public void shouldNotJoinOnNullKeyMapperValues() {
        this.pushToGlobalTable(4, "Y");
        this.processor.checkAndClearProcessResult(EMPTY);
        this.pushToStream(4, "XXX", false, false);
        this.processor.checkAndClearProcessResult(EMPTY);
    }

    @Test
    public void shouldJoinOnNullKeyWithNonNullKeyMapperValues() {
        this.pushToGlobalTable(4, "Y");
        this.processor.checkAndClearProcessResult(EMPTY);
        this.pushToStream(4, "X", true, true);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<Object, String>(null, "X0,FKey0+Y0", 0L), new KeyValueTimestamp<Integer, String>(1, "X1,FKey1+Y1", 1L), new KeyValueTimestamp<Integer, String>(2, "X2,FKey2+Y2", 2L), new KeyValueTimestamp<Integer, String>(3, "X3,FKey3+Y3", 3L));
    }
}

