/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

public class KTableKTableForeignKeyJoinDefaultSerdeTest {
    @Test
    public void shouldWorkWithDefaultSerdes() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable aTable = builder.table("A");
        KTable bTable = builder.table("B");
        KTable fkJoinResult = aTable.join(bTable, value -> Integer.parseInt(value.split("-")[0]), (aVal, bVal) -> "(" + aVal + "," + bVal + ")", Materialized.as((String)"asdf"));
        KTable finalJoinResult = aTable.join(fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")");
        finalJoinResult.toStream().to("output");
        this.validateTopologyCanProcessData(builder);
    }

    @Test
    public void shouldWorkWithDefaultAndConsumedSerdes() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable aTable = builder.table("A", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        KTable bTable = builder.table("B");
        KTable fkJoinResult = aTable.join(bTable, value -> Integer.parseInt(value.split("-")[0]), (aVal, bVal) -> "(" + aVal + "," + bVal + ")", Materialized.as((String)"asdf"));
        KTable finalJoinResult = aTable.join(fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")");
        finalJoinResult.toStream().to("output");
        this.validateTopologyCanProcessData(builder);
    }

    @Test
    public void shouldWorkWithDefaultAndJoinResultSerdes() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable aTable = builder.table("A");
        KTable bTable = builder.table("B");
        KTable fkJoinResult = aTable.join(bTable, value -> Integer.parseInt(value.split("-")[0]), (aVal, bVal) -> "(" + aVal + "," + bVal + ")", Materialized.as((String)"asdf").withKeySerde(Serdes.Integer()).withValueSerde(Serdes.String()));
        KTable finalJoinResult = aTable.join(fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")");
        finalJoinResult.toStream().to("output");
        this.validateTopologyCanProcessData(builder);
    }

    @Test
    public void shouldWorkWithDefaultAndEquiJoinResultSerdes() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable aTable = builder.table("A");
        KTable bTable = builder.table("B");
        KTable fkJoinResult = aTable.join(bTable, value -> Integer.parseInt(value.split("-")[0]), (aVal, bVal) -> "(" + aVal + "," + bVal + ")", Materialized.as((String)"asdf"));
        KTable finalJoinResult = aTable.join(fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")", Materialized.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        finalJoinResult.toStream().to("output");
        this.validateTopologyCanProcessData(builder);
    }

    @Test
    public void shouldWorkWithDefaultAndProducedSerdes() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable aTable = builder.table("A");
        KTable bTable = builder.table("B");
        KTable fkJoinResult = aTable.join(bTable, value -> Integer.parseInt(value.split("-")[0]), (aVal, bVal) -> "(" + aVal + "," + bVal + ")", Materialized.as((String)"asdf"));
        KTable finalJoinResult = aTable.join(fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")");
        finalJoinResult.toStream().to("output", Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.validateTopologyCanProcessData(builder);
    }

    @Test
    public void shouldUseExpectedTopicsWithSerde() {
        String applicationId = "ktable-ktable-joinOnForeignKey";
        Properties streamsConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"ktable-ktable-joinOnForeignKey"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"asdf:0000"), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath())}));
        UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
        StreamsBuilder builder = new StreamsBuilder();
        String leftTable = "left_table";
        String rightTable = "right_table";
        String output = "output-topic";
        KTable left = builder.table("left_table", Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
        KTable right = builder.table("right_table", Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
        left.join(right, value -> Integer.parseInt(value.split("\\|")[1]), (value1, value2) -> "(" + value1 + "," + value2 + ")", Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))).toStream().to("output-topic");
        Topology topology = builder.build(streamsConfig);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig);){
            TestInputTopic leftInput = driver.createInputTopic("left_table", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
            TestInputTopic rightInput = driver.createInputTopic("right_table", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
            leftInput.pipeInput((Object)2, (Object)"lhsValue1|1");
            rightInput.pipeInput((Object)1, (Object)"rhsValue1");
        }
        MatcherAssert.assertThat(serdeScope.registeredTopics(), (Matcher)Matchers.is((Object)Utils.mkSet((Object[])new String[]{"ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key", "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key", "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value", "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key", "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key", "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value", "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--key", "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--value", "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--key", "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--value", "output-topic--key", "output-topic--value"})));
    }

    private void validateTopologyCanProcessData(StreamsBuilder builder) {
        Properties config = new Properties();
        config.setProperty("application.id", "dummy-" + UUID.randomUUID());
        config.setProperty("bootstrap.servers", "dummy");
        config.setProperty("default.key.serde", Serdes.IntegerSerde.class.getName());
        config.setProperty("default.value.serde", Serdes.StringSerde.class.getName());
        config.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config);){
            TestInputTopic aTopic = topologyTestDriver.createInputTopic("A", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
            TestInputTopic bTopic = topologyTestDriver.createInputTopic("B", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic output = topologyTestDriver.createOutputTopic("output", (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer());
            aTopic.pipeInput((Object)1, (Object)"999-alpha");
            bTopic.pipeInput((Object)999, (Object)"beta");
            Map x = output.readKeyValuesToMap();
            MatcherAssert.assertThat((Object)x, (Matcher)Matchers.is(Collections.singletonMap(1, "(999-alpha,(999-alpha,beta))")));
        }
    }
}

