/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RepartitionWithMergeOptimizingTest {
    private final Logger log = LoggerFactory.getLogger(RepartitionWithMergeOptimizingTest.class);
    private static final String INPUT_A_TOPIC = "inputA";
    private static final String INPUT_B_TOPIC = "inputB";
    private static final String COUNT_TOPIC = "outputTopic_0";
    private static final String STRING_COUNT_TOPIC = "outputTopic_1";
    private static final int ONE_REPARTITION_TOPIC = 1;
    private static final int TWO_REPARTITION_TOPICS = 2;
    private final Serializer<String> stringSerializer = new StringSerializer();
    private final Deserializer<String> stringDeserializer = new StringDeserializer();
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private Properties streamsConfiguration;
    private TopologyTestDriver topologyTestDriver;
    private final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)6L), KeyValue.pair((Object)"B", (Object)6L), KeyValue.pair((Object)"C", (Object)6L));
    private final List<KeyValue<String, String>> expectedStringCountKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)"6"), KeyValue.pair((Object)"B", (Object)"6"), KeyValue.pair((Object)"C", (Object)"6"));
    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: sourceAStream (topics: [inputA])\n      --> mappedAStream\n    Source: sourceBStream (topics: [inputB])\n      --> mappedBStream\n    Processor: mappedAStream (stores: [])\n      --> mergedStream\n      <-- sourceAStream\n    Processor: mappedBStream (stores: [])\n      --> mergedStream\n      <-- sourceBStream\n    Processor: mergedStream (stores: [])\n      --> long-groupByKey-repartition-filter\n      <-- mappedAStream, mappedBStream\n    Processor: long-groupByKey-repartition-filter (stores: [])\n      --> long-groupByKey-repartition-sink\n      <-- mergedStream\n    Sink: long-groupByKey-repartition-sink (topic: long-groupByKey-repartition)\n      <-- long-groupByKey-repartition-filter\n\n  Sub-topology: 1\n    Source: long-groupByKey-repartition-source (topics: [long-groupByKey-repartition])\n      --> long-count, string-count\n    Processor: string-count (stores: [string-store])\n      --> string-toStream\n      <-- long-groupByKey-repartition-source\n    Processor: long-count (stores: [long-store])\n      --> long-toStream\n      <-- long-groupByKey-repartition-source\n    Processor: string-toStream (stores: [])\n      --> string-mapValues\n      <-- string-count\n    Processor: long-toStream (stores: [])\n      --> long-to\n      <-- long-count\n    Processor: string-mapValues (stores: [])\n      --> string-to\n      <-- string-toStream\n    Sink: long-to (topic: outputTopic_0)\n      <-- long-toStream\n    Sink: string-to (topic: outputTopic_1)\n      <-- string-mapValues\n\n";
    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: sourceAStream (topics: [inputA])\n      --> mappedAStream\n    Source: sourceBStream (topics: [inputB])\n      --> mappedBStream\n    Processor: mappedAStream (stores: [])\n      --> mergedStream\n      <-- sourceAStream\n    Processor: mappedBStream (stores: [])\n      --> mergedStream\n      <-- sourceBStream\n    Processor: mergedStream (stores: [])\n      --> long-groupByKey-repartition-filter, string-groupByKey-repartition-filter\n      <-- mappedAStream, mappedBStream\n    Processor: long-groupByKey-repartition-filter (stores: [])\n      --> long-groupByKey-repartition-sink\n      <-- mergedStream\n    Processor: string-groupByKey-repartition-filter (stores: [])\n      --> string-groupByKey-repartition-sink\n      <-- mergedStream\n    Sink: long-groupByKey-repartition-sink (topic: long-groupByKey-repartition)\n      <-- long-groupByKey-repartition-filter\n    Sink: string-groupByKey-repartition-sink (topic: string-groupByKey-repartition)\n      <-- string-groupByKey-repartition-filter\n\n  Sub-topology: 1\n    Source: long-groupByKey-repartition-source (topics: [long-groupByKey-repartition])\n      --> long-count\n    Processor: long-count (stores: [long-store])\n      --> long-toStream\n      <-- long-groupByKey-repartition-source\n    Processor: long-toStream (stores: [])\n      --> long-to\n      <-- long-count\n    Sink: long-to (topic: outputTopic_0)\n      <-- long-toStream\n\n  Sub-topology: 2\n    Source: string-groupByKey-repartition-source (topics: [string-groupByKey-repartition])\n      --> string-count\n    Processor: string-count (stores: [string-store])\n      --> string-toStream\n      <-- string-groupByKey-repartition-source\n    Processor: string-toStream (stores: [])\n      --> string-mapValues\n      <-- string-count\n    Processor: string-mapValues (stores: [])\n      --> string-to\n      <-- string-toStream\n    Sink: string-to (topic: outputTopic_1)\n      <-- string-mapValues\n\n";

    @Before
    public void setUp() {
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
        this.streamsConfiguration.setProperty("cache.max.bytes.buffering", Integer.toString(10240));
        this.streamsConfiguration.setProperty("commit.interval.ms", Long.toString(5000L));
    }

    @After
    public void tearDown() {
        try {
            this.topologyTestDriver.close();
        }
        catch (RuntimeException e) {
            this.log.warn("The following exception was thrown while trying to close the TopologyTestDriver (note that KAFKA-6647 causes this when running on Windows):", (Throwable)e);
        }
    }

    @Test
    public void shouldSendCorrectRecords_OPTIMIZED() {
        this.runTest("all", 1);
    }

    @Test
    public void shouldSendCorrectResults_NO_OPTIMIZATION() {
        this.runTest("none", 2);
    }

    private void runTest(String optimizationConfig, int expectedNumberRepartitionTopics) {
        this.streamsConfiguration.setProperty("topology.optimization", optimizationConfig);
        StreamsBuilder builder = new StreamsBuilder();
        KStream sourceAStream = builder.stream(INPUT_A_TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()).withName("sourceAStream"));
        KStream sourceBStream = builder.stream(INPUT_B_TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()).withName("sourceBStream"));
        KStream mappedAStream = sourceAStream.map((k, v) -> KeyValue.pair((Object)v.split(":")[0], (Object)v), Named.as((String)"mappedAStream"));
        KStream mappedBStream = sourceBStream.map((k, v) -> KeyValue.pair((Object)v.split(":")[0], (Object)v), Named.as((String)"mappedBStream"));
        KStream mergedStream = mappedAStream.merge(mappedBStream, Named.as((String)"mergedStream"));
        mergedStream.groupByKey(Grouped.as((String)"long-groupByKey")).count(Named.as((String)"long-count"), Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"long-store"))).toStream(Named.as((String)"long-toStream")).to(COUNT_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()).withName("long-to"));
        mergedStream.groupByKey(Grouped.as((String)"string-groupByKey")).count(Named.as((String)"string-count"), Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"string-store"))).toStream(Named.as((String)"string-toStream")).mapValues(v -> v.toString(), Named.as((String)"string-mapValues")).to(STRING_COUNT_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()).withName("string-to"));
        Topology topology = builder.build(this.streamsConfiguration);
        this.topologyTestDriver = new TopologyTestDriver(topology, this.streamsConfiguration);
        TestInputTopic inputTopicA = this.topologyTestDriver.createInputTopic(INPUT_A_TOPIC, this.stringSerializer, this.stringSerializer);
        TestInputTopic inputTopicB = this.topologyTestDriver.createInputTopic(INPUT_B_TOPIC, this.stringSerializer, this.stringSerializer);
        TestOutputTopic countOutputTopic = this.topologyTestDriver.createOutputTopic(COUNT_TOPIC, this.stringDeserializer, (Deserializer)new LongDeserializer());
        TestOutputTopic stringCountOutputTopic = this.topologyTestDriver.createOutputTopic(STRING_COUNT_TOPIC, this.stringDeserializer, this.stringDeserializer);
        inputTopicA.pipeKeyValueList(this.getKeyValues());
        inputTopicB.pipeKeyValueList(this.getKeyValues());
        String topologyString = topology.describe().toString();
        if (optimizationConfig.equals("all")) {
            Assert.assertEquals((Object)EXPECTED_OPTIMIZED_TOPOLOGY, (Object)topologyString);
        } else {
            Assert.assertEquals((Object)EXPECTED_UNOPTIMIZED_TOPOLOGY, (Object)topologyString);
        }
        Assert.assertEquals((long)expectedNumberRepartitionTopics, (long)this.getCountOfRepartitionTopicsFound(topologyString));
        MatcherAssert.assertThat((Object)countOutputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.equalTo(this.keyValueListToMap(this.expectedCountKeyValues)));
        MatcherAssert.assertThat((Object)stringCountOutputTopic.readKeyValuesToMap(), (Matcher)CoreMatchers.equalTo(this.keyValueListToMap(this.expectedStringCountKeyValues)));
    }

    private <K, V> Map<K, V> keyValueListToMap(List<KeyValue<K, V>> keyValuePairs) {
        HashMap<Object, Object> map = new HashMap<Object, Object>();
        for (KeyValue<K, V> pair : keyValuePairs) {
            map.put(pair.key, pair.value);
        }
        return map;
    }

    private int getCountOfRepartitionTopicsFound(String topologyString) {
        java.util.regex.Matcher matcher = this.repartitionTopicPattern.matcher(topologyString);
        ArrayList<String> repartitionTopicsFound = new ArrayList<String>();
        while (matcher.find()) {
            repartitionTopicsFound.add(matcher.group());
        }
        return repartitionTopicsFound.size();
    }

    private List<KeyValue<String, String>> getKeyValues() {
        ArrayList<KeyValue<String, String>> keyValueList = new ArrayList<KeyValue<String, String>>();
        String[] keys = new String[]{"X", "Y", "Z"};
        String[] values = new String[]{"A:foo", "B:foo", "C:foo"};
        for (String key : keys) {
            for (String value : values) {
                keyValueList.add((KeyValue<String, String>)KeyValue.pair((Object)key, (Object)value));
            }
        }
        return keyValueList;
    }
}

