/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

public class KTableKTableForeignKeyJoinDefaultSerdeTest {
    @Test
    public void shouldWorkWithDefaultSerdes() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable aTable = builder.table("A");
        KTable bTable = builder.table("B");
        KTable fkJoinResult = aTable.join(bTable, value -> value.split("-")[0], (aVal, bVal) -> "(" + aVal + "," + bVal + ")", Materialized.as((String)"asdf"));
        KTable finalJoinResult = aTable.join(fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")");
        finalJoinResult.toStream().to("output");
        KTableKTableForeignKeyJoinDefaultSerdeTest.validateTopologyCanProcessData(builder);
    }

    @Test
    public void shouldWorkWithDefaultAndConsumedSerdes() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable aTable = builder.table("A", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KTable bTable = builder.table("B");
        KTable fkJoinResult = aTable.join(bTable, value -> value.split("-")[0], (aVal, bVal) -> "(" + aVal + "," + bVal + ")", Materialized.as((String)"asdf"));
        KTable finalJoinResult = aTable.join(fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")");
        finalJoinResult.toStream().to("output");
        KTableKTableForeignKeyJoinDefaultSerdeTest.validateTopologyCanProcessData(builder);
    }

    @Test
    public void shouldWorkWithDefaultAndJoinResultSerdes() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable aTable = builder.table("A");
        KTable bTable = builder.table("B");
        KTable fkJoinResult = aTable.join(bTable, value -> value.split("-")[0], (aVal, bVal) -> "(" + aVal + "," + bVal + ")", Materialized.as((String)"asdf").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        KTable finalJoinResult = aTable.join(fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")");
        finalJoinResult.toStream().to("output");
        KTableKTableForeignKeyJoinDefaultSerdeTest.validateTopologyCanProcessData(builder);
    }

    @Test
    public void shouldWorkWithDefaultAndEquiJoinResultSerdes() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable aTable = builder.table("A");
        KTable bTable = builder.table("B");
        KTable fkJoinResult = aTable.join(bTable, value -> value.split("-")[0], (aVal, bVal) -> "(" + aVal + "," + bVal + ")", Materialized.as((String)"asdf"));
        KTable finalJoinResult = aTable.join(fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")", Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        finalJoinResult.toStream().to("output");
        KTableKTableForeignKeyJoinDefaultSerdeTest.validateTopologyCanProcessData(builder);
    }

    @Test
    public void shouldWorkWithDefaultAndProducedSerdes() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable aTable = builder.table("A");
        KTable bTable = builder.table("B");
        KTable fkJoinResult = aTable.join(bTable, value -> value.split("-")[0], (aVal, bVal) -> "(" + aVal + "," + bVal + ")", Materialized.as((String)"asdf"));
        KTable finalJoinResult = aTable.join(fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")");
        finalJoinResult.toStream().to("output", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KTableKTableForeignKeyJoinDefaultSerdeTest.validateTopologyCanProcessData(builder);
    }

    private static void validateTopologyCanProcessData(StreamsBuilder builder) {
        Properties config = new Properties();
        config.setProperty("application.id", "dummy-" + UUID.randomUUID());
        config.setProperty("bootstrap.servers", "dummy");
        config.setProperty("default.key.serde", Serdes.StringSerde.class.getName());
        config.setProperty("default.value.serde", Serdes.StringSerde.class.getName());
        config.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config);){
            TestInputTopic aTopic = topologyTestDriver.createInputTopic("A", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic bTopic = topologyTestDriver.createInputTopic("B", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic output = topologyTestDriver.createOutputTopic("output", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            aTopic.pipeInput((Object)"a1", (Object)"b1-alpha");
            bTopic.pipeInput((Object)"b1", (Object)"beta");
            Map x = output.readKeyValuesToMap();
            MatcherAssert.assertThat((Object)x, (Matcher)Matchers.is(Collections.singletonMap("a1", "(b1-alpha,(b1-alpha,beta))")));
        }
    }
}

