/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class KStreamGlobalKTableLeftJoinTest {
    private final String streamTopic = "streamTopic";
    private final String globalTableTopic = "globalTableTopic";
    private final Serde<Integer> intSerde = Serdes.Integer();
    private final Serde<String> stringSerde = Serdes.String();
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();
    private File stateDir = null;
    private MockProcessorSupplier<Integer, String> processor;
    private final int[] expectedKeys = new int[]{0, 1, 2, 3};
    private StreamsBuilder builder;

    @Before
    public void setUp() throws IOException {
        this.stateDir = TestUtils.tempDirectory((String)"kafka-test");
        this.builder = new StreamsBuilder();
        this.processor = new MockProcessorSupplier();
        Consumed streamConsumed = Consumed.with(this.intSerde, this.stringSerde);
        Consumed tableConsumed = Consumed.with(this.stringSerde, this.stringSerde);
        KStream stream = this.builder.stream("streamTopic", streamConsumed);
        GlobalKTable table = this.builder.globalTable("globalTableTopic", tableConsumed);
        KeyValueMapper<Integer, String, String> keyMapper = new KeyValueMapper<Integer, String, String>(){

            public String apply(Integer key, String value) {
                String[] tokens = value.split(",");
                return tokens.length > 1 ? tokens[1] : null;
            }
        };
        stream.leftJoin(table, (KeyValueMapper)keyMapper, MockValueJoiner.TOSTRING_JOINER).process(this.processor, new String[0]);
        this.driver.setUp(this.builder, this.stateDir);
        this.driver.setTime(0L);
    }

    private void pushToStream(int messageCount, String valuePrefix, boolean includeForeignKey) {
        for (int i = 0; i < messageCount; ++i) {
            String value = valuePrefix + this.expectedKeys[i];
            if (includeForeignKey) {
                value = value + ",FKey" + this.expectedKeys[i];
            }
            this.driver.process("streamTopic", this.expectedKeys[i], value);
        }
    }

    private void pushToGlobalTable(int messageCount, String valuePrefix) {
        for (int i = 0; i < messageCount; ++i) {
            this.driver.process("globalTableTopic", "FKey" + this.expectedKeys[i], valuePrefix + this.expectedKeys[i]);
        }
    }

    private void pushNullValueToGlobalTable(int messageCount) {
        for (int i = 0; i < messageCount; ++i) {
            this.driver.process("globalTableTopic", "FKey" + this.expectedKeys[i], null);
        }
    }

    @Test
    public void shouldNotRequireCopartitioning() {
        Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(this.builder);
        Assert.assertEquals((String)"KStream-GlobalKTable joins do not need to be co-partitioned", (long)0L, (long)copartitionGroups.size());
    }

    @Test
    public void shouldNotJoinWithEmptyGlobalTableOnStreamUpdates() {
        this.pushToStream(2, "X", true);
        this.processor.checkAndClearProcessResult("0:X0,FKey0+null", "1:X1,FKey1+null");
    }

    @Test
    public void shouldNotJoinOnGlobalTableUpdates() {
        this.pushToStream(2, "X", true);
        this.processor.checkAndClearProcessResult("0:X0,FKey0+null", "1:X1,FKey1+null");
        this.pushToGlobalTable(2, "Y");
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "X", true);
        this.processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+null", "3:X3,FKey3+null");
        this.pushToGlobalTable(4, "YY");
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "X", true);
        this.processor.checkAndClearProcessResult("0:X0,FKey0+YY0", "1:X1,FKey1+YY1", "2:X2,FKey2+YY2", "3:X3,FKey3+YY3");
        this.pushToGlobalTable(4, "YYY");
        this.processor.checkAndClearProcessResult(new String[0]);
    }

    @Test
    public void shouldJoinRegardlessIfMatchFoundOnStreamUpdates() {
        this.pushToGlobalTable(2, "Y");
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "X", true);
        this.processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+null", "3:X3,FKey3+null");
    }

    @Test
    public void shouldClearGlobalTableEntryOnNullValueUpdates() {
        this.pushToGlobalTable(4, "Y");
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "X", true);
        this.processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+Y2", "3:X3,FKey3+Y3");
        this.pushNullValueToGlobalTable(2);
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "XX", true);
        this.processor.checkAndClearProcessResult("0:XX0,FKey0+null", "1:XX1,FKey1+null", "2:XX2,FKey2+Y2", "3:XX3,FKey3+Y3");
    }

    @Test
    public void shouldJoinOnNullKeyMapperValues() {
        this.pushToGlobalTable(4, "Y");
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "XXX", false);
        this.processor.checkAndClearProcessResult("0:XXX0+null", "1:XXX1+null", "2:XXX2+null", "3:XXX3+null");
    }
}

