/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class GlobalKTableJoinsTest {
    private final StreamsBuilder builder = new StreamsBuilder();
    private final String streamTopic = "stream";
    private final String globalTopic = "global";
    private GlobalKTable<String, String> global;
    private KStream<String, String> stream;
    private KeyValueMapper<String, String, String> keyValueMapper;

    @Before
    public void setUp() {
        Consumed consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        this.global = this.builder.globalTable("global", consumed);
        this.stream = this.builder.stream("stream", consumed);
        this.keyValueMapper = (key, value) -> value;
    }

    @Test
    public void shouldLeftJoinWithStream() {
        MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<String, String, Void, Void>();
        this.stream.leftJoin(this.global, this.keyValueMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier, new String[0]);
        HashMap<String, ValueAndTimestamp<String>> expected = new HashMap<String, ValueAndTimestamp<String>>();
        expected.put("1", ValueAndTimestamp.make((Object)"a+A", (long)2L));
        expected.put("2", ValueAndTimestamp.make((Object)"b+B", (long)10L));
        expected.put("3", ValueAndTimestamp.make((Object)"c+null", (long)3L));
        this.verifyJoin(expected, supplier);
    }

    @Test
    public void shouldInnerJoinWithStream() {
        MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<String, String, Void, Void>();
        this.stream.join(this.global, this.keyValueMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier, new String[0]);
        HashMap<String, ValueAndTimestamp<String>> expected = new HashMap<String, ValueAndTimestamp<String>>();
        expected.put("1", ValueAndTimestamp.make((Object)"a+A", (long)2L));
        expected.put("2", ValueAndTimestamp.make((Object)"b+B", (long)10L));
        this.verifyJoin(expected, supplier);
    }

    private void verifyJoin(Map<String, ValueAndTimestamp<String>> expected, MockApiProcessorSupplier<String, String, Void, Void> supplier) {
        Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), props);){
            TestInputTopic globalInputTopic = driver.createInputTopic("global", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            globalInputTopic.pipeInput((Object)"a", (Object)"A", 1L);
            globalInputTopic.pipeInput((Object)"b", (Object)"B", 5L);
            TestInputTopic streamInputTopic = driver.createInputTopic("stream", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            streamInputTopic.pipeInput((Object)"1", (Object)"a", 2L);
            streamInputTopic.pipeInput((Object)"2", (Object)"b", 10L);
            streamInputTopic.pipeInput((Object)"3", (Object)"c", 3L);
        }
        Assert.assertEquals(expected, supplier.theCapturedProcessor().lastValueAndTimestampPerKey());
    }
}

