/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class KTableKTableForeignKeyJoinIntegrationTest {
    private static final String LEFT_TABLE = "left_table";
    private static final String RIGHT_TABLE = "right_table";
    private static final String OUTPUT = "output-topic";
    private static final String REJOIN_OUTPUT = "rejoin-output-topic";
    private final boolean leftJoin;
    private final boolean materialized;
    private final String optimization;
    private final boolean rejoin;
    private Properties streamsConfig;
    @Rule
    public TestName testName = new TestName();

    public KTableKTableForeignKeyJoinIntegrationTest(boolean leftJoin, String optimization, boolean materialized, boolean rejoin) {
        this.rejoin = rejoin;
        this.leftJoin = leftJoin;
        this.materialized = materialized;
        this.optimization = optimization;
    }

    @Before
    public void before() {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)("app-" + safeTestName)), Utils.mkEntry((Object)"bootstrap.servers", (Object)"asdf:0000"), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"topology.optimization", (Object)this.optimization)}));
    }

    @Parameterized.Parameters(name="leftJoin={0}, optimization={1}, materialized={2}, rejoin={3}")
    public static Collection<Object[]> data() {
        List<Boolean> booleans = Arrays.asList(true, false);
        List<String> optimizations = Arrays.asList("all", "none");
        return KTableKTableForeignKeyJoinIntegrationTest.buildParameters(booleans, optimizations, booleans, booleans);
    }

    private static Collection<Object[]> buildParameters(List<?> ... argOptions) {
        List<Object[]> result = new LinkedList<Object[]>();
        result.add(new Object[0]);
        for (List<?> argOption : argOptions) {
            result = KTableKTableForeignKeyJoinIntegrationTest.times(result, argOption);
        }
        return result;
    }

    private static List<Object[]> times(List<Object[]> left, List<?> right) {
        LinkedList<Object[]> result = new LinkedList<Object[]>();
        for (Object[] args : left) {
            for (Object rightElem : right) {
                Object[] resArgs = new Object[args.length + 1];
                System.arraycopy(args, 0, resArgs, 0, args.length);
                resArgs[args.length] = rightElem;
                result.add(resArgs);
            }
        }
        return result;
    }

    @Test
    public void doJoinFromLeftThenDeleteLeftEntity() {
        Topology topology = KTableKTableForeignKeyJoinIntegrationTest.getTopology(this.streamsConfig, this.materialized ? "store" : null, this.leftJoin, this.rejoin);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.streamsConfig);){
            TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic left = driver.createInputTopic(LEFT_TABLE, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            TestOutputTopic rejoinOutputTopic = this.rejoin ? driver.createOutputTopic(REJOIN_OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer()) : null;
            KeyValueStore store = driver.getKeyValueStore("store");
            right.pipeInput((Object)"rhs1", (Object)"rhsValue1");
            right.pipeInput((Object)"rhs2", (Object)"rhsValue2");
            right.pipeInput((Object)"rhs3", (Object)"rhsValue3");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            if (this.rejoin) {
                MatcherAssert.assertThat((Object)rejoinOutputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            }
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            }
            left.pipeInput((Object)"lhs1", (Object)"lhsValue1|rhs1");
            left.pipeInput((Object)"lhs2", (Object)"lhsValue2|rhs2");
            Map expected = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,rhsValue2)")});
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)expected));
            if (this.rejoin) {
                MatcherAssert.assertThat((Object)rejoinOutputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"), Utils.mkEntry((Object)"lhs2", (Object)"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")})));
            }
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)expected));
            }
            left.pipeInput((Object)"lhs3", (Object)"lhsValue3|rhs1");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,rhsValue1)")})));
            if (this.rejoin) {
                MatcherAssert.assertThat((Object)rejoinOutputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs3", (Object)"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")})));
            }
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,rhsValue2)"), Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,rhsValue1)")})));
            }
            left.pipeInput((Object)"lhs1", (Object)null);
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", null)})));
            if (this.rejoin) {
                MatcherAssert.assertThat((Object)rejoinOutputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", null)})));
            }
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,rhsValue2)"), Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,rhsValue1)")})));
            }
        }
    }

    @Test
    public void doJoinFromRightThenDeleteRightEntity() {
        Topology topology = KTableKTableForeignKeyJoinIntegrationTest.getTopology(this.streamsConfig, this.materialized ? "store" : null, this.leftJoin, this.rejoin);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.streamsConfig);){
            TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic left = driver.createInputTopic(LEFT_TABLE, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            KeyValueStore store = driver.getKeyValueStore("store");
            left.pipeInput((Object)"lhs1", (Object)"lhsValue1|rhs1");
            left.pipeInput((Object)"lhs2", (Object)"lhsValue2|rhs2");
            left.pipeInput((Object)"lhs3", (Object)"lhsValue3|rhs1");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,null)"), Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,null)"), Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,null)")}) : Collections.emptyMap()));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,null)"), Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,null)"), Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,null)")}) : Collections.emptyMap()));
            }
            right.pipeInput((Object)"rhs1", (Object)"rhsValue1");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,rhsValue1)")})));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,null)"), Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,rhsValue1)")}) : Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,rhsValue1)")}))));
            }
            right.pipeInput((Object)"rhs2", (Object)"rhsValue2");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,rhsValue2)")})));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,rhsValue2)"), Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,rhsValue1)")})));
            }
            right.pipeInput((Object)"rhs3", (Object)"rhsValue3");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,rhsValue2)"), Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,rhsValue1)")})));
            }
            right.pipeInput((Object)"rhs1", (Object)null);
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)(this.leftJoin ? "(lhsValue1|rhs1,null)" : null)), Utils.mkEntry((Object)"lhs3", (Object)(this.leftJoin ? "(lhsValue3|rhs1,null)" : null))})));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,null)"), Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,rhsValue2)"), Utils.mkEntry((Object)"lhs3", (Object)"(lhsValue3|rhs1,null)")}) : Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs2", (Object)"(lhsValue2|rhs2,rhsValue2)")}))));
            }
        }
    }

    @Test
    public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() {
        Topology topology = KTableKTableForeignKeyJoinIntegrationTest.getTopology(this.streamsConfig, this.materialized ? "store" : null, this.leftJoin, this.rejoin);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.streamsConfig);){
            TestInputTopic left = driver.createInputTopic(LEFT_TABLE, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            KeyValueStore store = driver.getKeyValueStore("store");
            left.pipeInput((Object)"lhs1", (Object)"lhsValue1|rhs1");
            Map expected = this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,null)")}) : Collections.emptyMap();
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(expected));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)expected));
            }
            left.pipeInput((Object)"lhs1", (Object)null);
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(this.leftJoin || !this.materialized && !this.rejoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", null)}) : Collections.emptyMap()));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            }
            left.pipeInput((Object)"lhs1", (Object)null);
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            }
        }
    }

    @Test
    public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords() {
        Topology topology = KTableKTableForeignKeyJoinIntegrationTest.getTopology(this.streamsConfig, this.materialized ? "store" : null, this.leftJoin, this.rejoin);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.streamsConfig);){
            TestInputTopic left = driver.createInputTopic(LEFT_TABLE, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            KeyValueStore store = driver.getKeyValueStore("store");
            left.pipeInput((Object)"lhs1", (Object)null);
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            }
        }
    }

    @Test
    public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey() {
        Topology topology = KTableKTableForeignKeyJoinIntegrationTest.getTopology(this.streamsConfig, this.materialized ? "store" : null, this.leftJoin, this.rejoin);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.streamsConfig);){
            TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic left = driver.createInputTopic(LEFT_TABLE, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            KeyValueStore store = driver.getKeyValueStore("store");
            left.pipeInput((Object)"lhs1", (Object)"lhsValue1|rhs1");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,null)")}) : Collections.emptyMap()));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,null)")}) : Collections.emptyMap()));
            }
            left.pipeInput((Object)"lhs1", (Object)"lhsValue1|rhs2");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs2,null)")}) : (this.materialized || this.rejoin ? Collections.emptyMap() : Collections.singletonMap("lhs1", null))));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs2,null)")}) : Collections.emptyMap()));
            }
            left.pipeInput((Object)"lhs1", (Object)"lhsValue1|rhs3");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs3,null)")}) : (this.materialized || this.rejoin ? Collections.emptyMap() : Collections.singletonMap("lhs1", null))));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs3,null)")}) : Collections.emptyMap()));
            }
            right.pipeInput((Object)"rhs1", (Object)"rhsValue1");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs3,null)")}) : Collections.emptyMap()));
            }
            left.pipeInput((Object)"lhs1", (Object)"lhsValue1|rhs1");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,rhsValue1)")})));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,rhsValue1)")})));
            }
            left.pipeInput((Object)"lhs1", (Object)"lhsValue1|rhs2");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)(this.leftJoin ? "(lhsValue1|rhs2,null)" : null))})));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(this.leftJoin ? Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs2,null)")}) : Collections.emptyMap()));
            }
        }
    }

    @Test
    public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() {
        Topology topology = KTableKTableForeignKeyJoinIntegrationTest.getTopology(this.streamsConfig, this.materialized ? "store" : null, this.leftJoin, this.rejoin);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.streamsConfig);){
            TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic left = driver.createInputTopic(LEFT_TABLE, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            KeyValueStore store = driver.getKeyValueStore("store");
            right.pipeInput((Object)"rhs1", (Object)"rhsValue1");
            right.pipeInput((Object)"rhs2", (Object)"rhsValue2");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            }
            left.pipeInput((Object)"lhs1", (Object)"lhsValue1|rhs1");
            Map expected = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs1,rhsValue1)")});
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)expected));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)expected));
            }
            left.pipeInput((Object)"lhs1", (Object)"lhsValue1|rhs2");
            expected = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs2,rhsValue2)")});
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is((Object)expected));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)expected));
            }
            right.pipeInput((Object)"rhs1", (Object)"rhsValue1Delta");
            MatcherAssert.assertThat((Object)outputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.is(Collections.emptyMap()));
            if (this.materialized) {
                MatcherAssert.assertThat(KTableKTableForeignKeyJoinIntegrationTest.asMap((KeyValueStore<String, String>)store), (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"lhs1", (Object)"(lhsValue1|rhs2,rhsValue2)")})));
            }
        }
    }

    private static Map<String, String> asMap(KeyValueStore<String, String> store) {
        HashMap<String, String> result = new HashMap<String, String>();
        store.all().forEachRemaining(kv -> {
            String cfr_ignored_0 = (String)result.put((String)kv.key, (String)kv.value);
        });
        return result;
    }

    private static Topology getTopology(Properties streamsConfig, String queryableStoreName, boolean leftJoin, boolean rejoin) {
        Materialized rejoinMaterialized;
        Materialized mainMaterialized;
        UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
        StreamsBuilder builder = new StreamsBuilder();
        KTable left = builder.table(LEFT_TABLE, Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
        KTable right = builder.table(RIGHT_TABLE, Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
        Function<String, String> extractor = value -> value.split("\\|")[1];
        ValueJoiner joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
        ValueJoiner rejoiner = rejoin ? (value1, value2) -> "rejoin(" + value1 + "," + value2 + ")" : null;
        Materialized materialized = mainMaterialized = queryableStoreName == null ? Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)).withCachingDisabled() : Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)queryableStoreName)).withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)).withCachingDisabled();
        Materialized materialized2 = !rejoin ? null : (rejoinMaterialized = queryableStoreName == null ? Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) : Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)(queryableStoreName + "-rejoin"))).withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)).withCachingDisabled());
        if (leftJoin) {
            KTable fkJoin = left.leftJoin(right, extractor, joiner, mainMaterialized);
            fkJoin.toStream().to(OUTPUT);
            if (rejoin) {
                fkJoin.leftJoin(left, rejoiner, rejoinMaterialized).toStream().to(REJOIN_OUTPUT);
            }
        } else {
            KTable fkJoin = left.join(right, extractor, joiner, mainMaterialized);
            fkJoin.toStream().to(OUTPUT);
            if (rejoin) {
                fkJoin.join(left, rejoiner, rejoinMaterialized).toStream().to(REJOIN_OUTPUT);
            }
        }
        return builder.build(streamsConfig);
    }
}

