/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class RepartitionTopicNamingTest {
    private final KeyValueMapper<String, String, String> kvMapper = (k, v) -> k + v;
    private static final String INPUT_TOPIC = "input";
    private static final String COUNT_TOPIC = "outputTopic_0";
    private static final String AGGREGATION_TOPIC = "outputTopic_1";
    private static final String REDUCE_TOPIC = "outputTopic_2";
    private static final String JOINED_TOPIC = "outputTopicForJoin";
    private final String firstRepartitionTopicName = "count-stream";
    private final String secondRepartitionTopicName = "aggregate-stream";
    private final String thirdRepartitionTopicName = "reduced-stream";
    private final String fourthRepartitionTopicName = "joined-stream";
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000002, count-stream-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000003\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-PROCESSOR-0000000004\n      <-- KSTREAM-FILTER-0000000002\n    Processor: count-stream-repartition-filter (stores: [])\n      --> count-stream-repartition-sink\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n      --> none\n      <-- KSTREAM-MAPVALUES-0000000003\n    Sink: count-stream-repartition-sink (topic: count-stream-repartition)\n      <-- count-stream-repartition-filter\n\n  Sub-topology: 1\n    Source: count-stream-repartition-source (topics: [count-stream-repartition])\n      --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n      --> KTABLE-TOSTREAM-0000000011\n      <-- count-stream-repartition-source\n    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n      --> joined-stream-other-windowed, KSTREAM-SINK-0000000012\n      <-- KSTREAM-AGGREGATE-0000000007\n    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n      --> KSTREAM-PEEK-0000000021\n      <-- count-stream-repartition-source\n    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n      --> joined-stream-this-windowed\n      <-- count-stream-repartition-source\n    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n      --> KSTREAM-REDUCE-0000000023\n      <-- KSTREAM-FILTER-0000000020\n    Processor: joined-stream-other-windowed (stores: [joined-stream-other-join-store])\n      --> joined-stream-other-join\n      <-- KTABLE-TOSTREAM-0000000011\n    Processor: joined-stream-this-windowed (stores: [joined-stream-this-join-store])\n      --> joined-stream-this-join\n      <-- KSTREAM-FILTER-0000000029\n    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n      --> KTABLE-TOSTREAM-0000000018\n      <-- count-stream-repartition-source\n    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n      --> KTABLE-TOSTREAM-0000000027\n      <-- KSTREAM-PEEK-0000000021\n    Processor: joined-stream-other-join (stores: [joined-stream-this-join-store])\n      --> joined-stream-merge\n      <-- joined-stream-other-windowed\n    Processor: joined-stream-this-join (stores: [joined-stream-other-join-store])\n      --> joined-stream-merge\n      <-- joined-stream-this-windowed\n    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KSTREAM-AGGREGATE-0000000014\n    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n      --> KSTREAM-SINK-0000000028\n      <-- KSTREAM-REDUCE-0000000023\n    Processor: joined-stream-merge (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- joined-stream-this-join, joined-stream-other-join\n    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000011\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KTABLE-TOSTREAM-0000000018\n    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n      <-- KTABLE-TOSTREAM-0000000027\n    Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n      <-- joined-stream-merge\n\n";
    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000029, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000020, aggregate-stream-repartition-filter, count-stream-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n      --> KSTREAM-PEEK-0000000021\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000003\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n      --> joined-stream-left-repartition-filter\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n      --> reduced-stream-repartition-filter\n      <-- KSTREAM-FILTER-0000000020\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-PROCESSOR-0000000004\n      <-- KSTREAM-FILTER-0000000002\n    Processor: aggregate-stream-repartition-filter (stores: [])\n      --> aggregate-stream-repartition-sink\n      <-- KSTREAM-MAP-0000000001\n    Processor: count-stream-repartition-filter (stores: [])\n      --> count-stream-repartition-sink\n      <-- KSTREAM-MAP-0000000001\n    Processor: joined-stream-left-repartition-filter (stores: [])\n      --> joined-stream-left-repartition-sink\n      <-- KSTREAM-FILTER-0000000029\n    Processor: reduced-stream-repartition-filter (stores: [])\n      --> reduced-stream-repartition-sink\n      <-- KSTREAM-PEEK-0000000021\n    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n      --> none\n      <-- KSTREAM-MAPVALUES-0000000003\n    Sink: aggregate-stream-repartition-sink (topic: aggregate-stream-repartition)\n      <-- aggregate-stream-repartition-filter\n    Sink: count-stream-repartition-sink (topic: count-stream-repartition)\n      <-- count-stream-repartition-filter\n    Sink: joined-stream-left-repartition-sink (topic: joined-stream-left-repartition)\n      <-- joined-stream-left-repartition-filter\n    Sink: reduced-stream-repartition-sink (topic: reduced-stream-repartition)\n      <-- reduced-stream-repartition-filter\n\n  Sub-topology: 1\n    Source: count-stream-repartition-source (topics: [count-stream-repartition])\n      --> KSTREAM-AGGREGATE-0000000007\n    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n      --> KTABLE-TOSTREAM-0000000011\n      <-- count-stream-repartition-source\n    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n      --> KSTREAM-SINK-0000000012, joined-stream-other-windowed\n      <-- KSTREAM-AGGREGATE-0000000007\n    Source: joined-stream-left-repartition-source (topics: [joined-stream-left-repartition])\n      --> joined-stream-this-windowed\n    Processor: joined-stream-other-windowed (stores: [joined-stream-other-join-store])\n      --> joined-stream-other-join\n      <-- KTABLE-TOSTREAM-0000000011\n    Processor: joined-stream-this-windowed (stores: [joined-stream-this-join-store])\n      --> joined-stream-this-join\n      <-- joined-stream-left-repartition-source\n    Processor: joined-stream-other-join (stores: [joined-stream-this-join-store])\n      --> joined-stream-merge\n      <-- joined-stream-other-windowed\n    Processor: joined-stream-this-join (stores: [joined-stream-other-join-store])\n      --> joined-stream-merge\n      <-- joined-stream-this-windowed\n    Processor: joined-stream-merge (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- joined-stream-this-join, joined-stream-other-join\n    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000011\n    Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n      <-- joined-stream-merge\n\n  Sub-topology: 2\n    Source: aggregate-stream-repartition-source (topics: [aggregate-stream-repartition])\n      --> KSTREAM-AGGREGATE-0000000014\n    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n      --> KTABLE-TOSTREAM-0000000018\n      <-- aggregate-stream-repartition-source\n    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KSTREAM-AGGREGATE-0000000014\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KTABLE-TOSTREAM-0000000018\n\n  Sub-topology: 3\n    Source: reduced-stream-repartition-source (topics: [reduced-stream-repartition])\n      --> KSTREAM-REDUCE-0000000023\n    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n      --> KTABLE-TOSTREAM-0000000027\n      <-- reduced-stream-repartition-source\n    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n      --> KSTREAM-SINK-0000000028\n      <-- KSTREAM-REDUCE-0000000023\n    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n      <-- KTABLE-TOSTREAM-0000000027\n\n";

    @Test
    public void shouldReuseFirstRepartitionTopicNameWhenOptimizing() {
        String optimizedTopology = this.buildTopology("all").describe().toString();
        String unOptimizedTopology = this.buildTopology("none").describe().toString();
        MatcherAssert.assertThat((Object)optimizedTopology, (Matcher)CoreMatchers.is((Object)EXPECTED_OPTIMIZED_TOPOLOGY));
        MatcherAssert.assertThat((Object)1, (Matcher)CoreMatchers.is((Object)this.getCountOfRepartitionTopicsFound(optimizedTopology, this.repartitionTopicPattern)));
        Assertions.assertTrue((boolean)optimizedTopology.contains("count-stream-repartition"));
        MatcherAssert.assertThat((Object)unOptimizedTopology, (Matcher)CoreMatchers.is((Object)EXPECTED_UNOPTIMIZED_TOPOLOGY));
        MatcherAssert.assertThat((Object)4, (Matcher)CoreMatchers.is((Object)this.getCountOfRepartitionTopicsFound(unOptimizedTopology, this.repartitionTopicPattern)));
        Assertions.assertTrue((boolean)unOptimizedTopology.contains("count-stream-repartition"));
        Assertions.assertTrue((boolean)unOptimizedTopology.contains("aggregate-stream-repartition"));
        Assertions.assertTrue((boolean)unOptimizedTopology.contains("reduced-stream-repartition"));
        Assertions.assertTrue((boolean)unOptimizedTopology.contains("joined-stream-left-repartition"));
    }

    @Test
    public void shouldFailWithSameRepartitionTopicName() {
        try {
            StreamsBuilder builder = new StreamsBuilder();
            builder.stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as((String)"grouping")).count().toStream();
            builder.stream("topicII").selectKey((k, v) -> k).groupByKey(Grouped.as((String)"grouping")).count().toStream();
            builder.build();
            Assertions.fail((String)"Should not build re-using repartition topic name");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStream() {
        StreamsBuilder builder = new StreamsBuilder();
        KGroupedStream kGroupedStream = builder.stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as((String)"grouping"));
        kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).count().toStream().to("output-one");
        kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(30L))).count().toStream().to("output-two");
        String topologyString = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)1, (Matcher)CoreMatchers.is((Object)this.getCountOfRepartitionTopicsFound(topologyString, this.repartitionTopicPattern)));
        Assertions.assertTrue((boolean)topologyString.contains("grouping-repartition"));
    }

    @Test
    public void shouldNotFailWithSameRepartitionTopicNameUsingSameTimeWindowStream() {
        StreamsBuilder builder = new StreamsBuilder();
        KGroupedStream kGroupedStream = builder.stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as((String)"grouping"));
        TimeWindowedKStream timeWindowedKStream = kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L)));
        timeWindowedKStream.count().toStream().to("output-one");
        timeWindowedKStream.reduce((v, v2) -> v + v2).toStream().to("output-two");
        kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(30L))).count().toStream().to("output-two");
        String topologyString = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)1, (Matcher)CoreMatchers.is((Object)this.getCountOfRepartitionTopicsFound(topologyString, this.repartitionTopicPattern)));
        Assertions.assertTrue((boolean)topologyString.contains("grouping-repartition"));
    }

    @Test
    public void shouldNotFailWithSameRepartitionTopicNameUsingSameSessionWindowStream() {
        StreamsBuilder builder = new StreamsBuilder();
        KGroupedStream kGroupedStream = builder.stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as((String)"grouping"));
        SessionWindowedKStream sessionWindowedKStream = kGroupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(10L)));
        sessionWindowedKStream.count().toStream().to("output-one");
        sessionWindowedKStream.reduce((v, v2) -> v + v2).toStream().to("output-two");
        kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(30L))).count().toStream().to("output-two");
        String topologyString = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)1, (Matcher)CoreMatchers.is((Object)this.getCountOfRepartitionTopicsFound(topologyString, this.repartitionTopicPattern)));
        Assertions.assertTrue((boolean)topologyString.contains("grouping-repartition"));
    }

    @Test
    public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedTable() {
        StreamsBuilder builder = new StreamsBuilder();
        KGroupedTable kGroupedTable = builder.table("topic").groupBy(KeyValue::pair, Grouped.as((String)"grouping"));
        kGroupedTable.count().toStream().to("output-count");
        kGroupedTable.reduce((v, v2) -> v2, (v, v2) -> v2).toStream().to("output-reduce");
        String topologyString = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)1, (Matcher)CoreMatchers.is((Object)this.getCountOfRepartitionTopicsFound(topologyString, this.repartitionTopicPattern)));
        Assertions.assertTrue((boolean)topologyString.contains("grouping-repartition"));
    }

    @Test
    public void shouldNotReuseRepartitionNodeWithUnnamedRepartitionTopics() {
        StreamsBuilder builder = new StreamsBuilder();
        KGroupedStream kGroupedStream = builder.stream("topic").selectKey((k, v) -> k).groupByKey();
        kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).count().toStream().to("output-one");
        kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(30L))).count().toStream().to("output-two");
        String topologyString = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)2, (Matcher)CoreMatchers.is((Object)this.getCountOfRepartitionTopicsFound(topologyString, this.repartitionTopicPattern)));
    }

    @Test
    public void shouldNotReuseRepartitionNodeWithUnnamedRepartitionTopicsKGroupedTable() {
        StreamsBuilder builder = new StreamsBuilder();
        KGroupedTable kGroupedTable = builder.table("topic").groupBy(KeyValue::pair);
        kGroupedTable.count().toStream().to("output-count");
        kGroupedTable.reduce((v, v2) -> v2, (v, v2) -> v2).toStream().to("output-reduce");
        String topologyString = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)2, (Matcher)CoreMatchers.is((Object)this.getCountOfRepartitionTopicsFound(topologyString, this.repartitionTopicPattern)));
    }

    @Test
    public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimizationsOn() {
        StreamsBuilder builder = new StreamsBuilder();
        KGroupedStream kGroupedStream = builder.stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as((String)"grouping"));
        kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).count();
        kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(30L))).count();
        Properties properties = new Properties();
        properties.put("topology.optimization", "all");
        Topology topology = builder.build(properties);
        MatcherAssert.assertThat((Object)this.getCountOfRepartitionTopicsFound(topology.describe().toString(), this.repartitionTopicPattern), (Matcher)CoreMatchers.is((Object)1));
    }

    @Test
    public void shouldFailWithSameRepartitionTopicNameInJoin() {
        try {
            StreamsBuilder builder = new StreamsBuilder();
            KStream stream1 = builder.stream("topic").selectKey((k, v) -> k);
            KStream stream2 = builder.stream("topic2").selectKey((k, v) -> k);
            KStream stream3 = builder.stream("topic3").selectKey((k, v) -> k);
            KStream joined = stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(30L)), StreamJoined.as((String)"join-store").withName("join-repartition"));
            joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(30L)), StreamJoined.as((String)"join-store").withName("join-repartition"));
            builder.build();
            Assertions.fail((String)"Should not build re-using repartition topic name");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() {
        StreamsBuilder builder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", "all");
        KGroupedStream kGroupedStream = builder.stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as((String)"grouping"));
        kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).count();
        kGroupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(30L))).count();
        builder.build(properties);
    }

    @Test
    public void shouldKeepRepartitionTopicNameForJoins() {
        String expectedLeftRepartitionTopic = "(topic: my-join-left-repartition)";
        String expectedRightRepartitionTopic = "(topic: my-join-right-repartition)";
        String joinTopologyFirst = this.buildStreamJoin(false);
        Assertions.assertTrue((boolean)joinTopologyFirst.contains("(topic: my-join-left-repartition)"));
        Assertions.assertTrue((boolean)joinTopologyFirst.contains("(topic: my-join-right-repartition)"));
        String joinTopologyUpdated = this.buildStreamJoin(true);
        Assertions.assertTrue((boolean)joinTopologyUpdated.contains("(topic: my-join-left-repartition)"));
        Assertions.assertTrue((boolean)joinTopologyUpdated.contains("(topic: my-join-right-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupByKeyTimeWindows() {
        String expectedTimeWindowRepartitionTopic = "(topic: time-window-grouping-repartition)";
        String timeWindowGroupingRepartitionTopology = this.buildStreamGroupByKeyTimeWindows(false, true);
        Assertions.assertTrue((boolean)timeWindowGroupingRepartitionTopology.contains("(topic: time-window-grouping-repartition)"));
        String timeWindowGroupingUpdatedTopology = this.buildStreamGroupByKeyTimeWindows(true, true);
        Assertions.assertTrue((boolean)timeWindowGroupingUpdatedTopology.contains("(topic: time-window-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupByTimeWindows() {
        String expectedTimeWindowRepartitionTopic = "(topic: time-window-grouping-repartition)";
        String timeWindowGroupingRepartitionTopology = this.buildStreamGroupByKeyTimeWindows(false, false);
        Assertions.assertTrue((boolean)timeWindowGroupingRepartitionTopology.contains("(topic: time-window-grouping-repartition)"));
        String timeWindowGroupingUpdatedTopology = this.buildStreamGroupByKeyTimeWindows(true, false);
        Assertions.assertTrue((boolean)timeWindowGroupingUpdatedTopology.contains("(topic: time-window-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupByKeyNoWindows() {
        String expectedNoWindowRepartitionTopic = "(topic: kstream-grouping-repartition)";
        String noWindowGroupingRepartitionTopology = this.buildStreamGroupByKeyNoWindows(false, true);
        Assertions.assertTrue((boolean)noWindowGroupingRepartitionTopology.contains("(topic: kstream-grouping-repartition)"));
        String noWindowGroupingUpdatedTopology = this.buildStreamGroupByKeyNoWindows(true, true);
        Assertions.assertTrue((boolean)noWindowGroupingUpdatedTopology.contains("(topic: kstream-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupByNoWindows() {
        String expectedNoWindowRepartitionTopic = "(topic: kstream-grouping-repartition)";
        String noWindowGroupingRepartitionTopology = this.buildStreamGroupByKeyNoWindows(false, false);
        Assertions.assertTrue((boolean)noWindowGroupingRepartitionTopology.contains("(topic: kstream-grouping-repartition)"));
        String noWindowGroupingUpdatedTopology = this.buildStreamGroupByKeyNoWindows(true, false);
        Assertions.assertTrue((boolean)noWindowGroupingUpdatedTopology.contains("(topic: kstream-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupByKeySessionWindows() {
        String expectedSessionWindowRepartitionTopic = "(topic: session-window-grouping-repartition)";
        String sessionWindowGroupingRepartitionTopology = this.buildStreamGroupByKeySessionWindows(false, true);
        Assertions.assertTrue((boolean)sessionWindowGroupingRepartitionTopology.contains("(topic: session-window-grouping-repartition)"));
        String sessionWindowGroupingUpdatedTopology = this.buildStreamGroupByKeySessionWindows(true, true);
        Assertions.assertTrue((boolean)sessionWindowGroupingUpdatedTopology.contains("(topic: session-window-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupBySessionWindows() {
        String expectedSessionWindowRepartitionTopic = "(topic: session-window-grouping-repartition)";
        String sessionWindowGroupingRepartitionTopology = this.buildStreamGroupByKeySessionWindows(false, false);
        Assertions.assertTrue((boolean)sessionWindowGroupingRepartitionTopology.contains("(topic: session-window-grouping-repartition)"));
        String sessionWindowGroupingUpdatedTopology = this.buildStreamGroupByKeySessionWindows(true, false);
        Assertions.assertTrue((boolean)sessionWindowGroupingUpdatedTopology.contains("(topic: session-window-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionNameForGroupByKTable() {
        String expectedKTableGroupByRepartitionTopic = "(topic: ktable-group-by-repartition)";
        String ktableGroupByTopology = this.buildKTableGroupBy(false);
        Assertions.assertTrue((boolean)ktableGroupByTopology.contains("(topic: ktable-group-by-repartition)"));
        String ktableUpdatedGroupByTopology = this.buildKTableGroupBy(true);
        Assertions.assertTrue((boolean)ktableUpdatedGroupByTopology.contains("(topic: ktable-group-by-repartition)"));
    }

    private String buildKTableGroupBy(boolean otherOperations) {
        String ktableGroupByTopicName = "ktable-group-by";
        StreamsBuilder builder = new StreamsBuilder();
        KTable ktable = builder.table("topic");
        if (otherOperations) {
            ktable.filter((k, v) -> true).groupBy(KeyValue::pair, Grouped.as((String)"ktable-group-by")).count();
        } else {
            ktable.groupBy(KeyValue::pair, Grouped.as((String)"ktable-group-by")).count();
        }
        return builder.build().describe().toString();
    }

    private String buildStreamGroupByKeyTimeWindows(boolean otherOperations, boolean isGroupByKey) {
        String groupedTimeWindowRepartitionTopicName = "time-window-grouping";
        StreamsBuilder builder = new StreamsBuilder();
        KStream selectKeyStream = builder.stream("topic").selectKey((k, v) -> k + v);
        if (isGroupByKey) {
            if (otherOperations) {
                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as((String)"time-window-grouping")).windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).count();
            } else {
                selectKeyStream.groupByKey(Grouped.as((String)"time-window-grouping")).windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).count();
            }
        } else if (otherOperations) {
            selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(this.kvMapper, Grouped.as((String)"time-window-grouping")).count();
        } else {
            selectKeyStream.groupBy(this.kvMapper, Grouped.as((String)"time-window-grouping")).count();
        }
        return builder.build().describe().toString();
    }

    private String buildStreamGroupByKeySessionWindows(boolean otherOperations, boolean isGroupByKey) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream selectKeyStream = builder.stream("topic").selectKey((k, v) -> k + v);
        String groupedSessionWindowRepartitionTopicName = "session-window-grouping";
        if (isGroupByKey) {
            if (otherOperations) {
                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as((String)"session-window-grouping")).windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(10L))).count();
            } else {
                selectKeyStream.groupByKey(Grouped.as((String)"session-window-grouping")).windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(10L))).count();
            }
        } else if (otherOperations) {
            selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(this.kvMapper, Grouped.as((String)"session-window-grouping")).windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(10L))).count();
        } else {
            selectKeyStream.groupBy(this.kvMapper, Grouped.as((String)"session-window-grouping")).windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(10L))).count();
        }
        return builder.build().describe().toString();
    }

    private String buildStreamGroupByKeyNoWindows(boolean otherOperations, boolean isGroupByKey) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream selectKeyStream = builder.stream("topic").selectKey((k, v) -> k + v);
        String groupByAndCountRepartitionTopicName = "kstream-grouping";
        if (isGroupByKey) {
            if (otherOperations) {
                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as((String)"kstream-grouping")).count();
            } else {
                selectKeyStream.groupByKey(Grouped.as((String)"kstream-grouping")).count();
            }
        } else if (otherOperations) {
            selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(this.kvMapper, Grouped.as((String)"kstream-grouping")).count();
        } else {
            selectKeyStream.groupBy(this.kvMapper, Grouped.as((String)"kstream-grouping")).count();
        }
        return builder.build().describe().toString();
    }

    private String buildStreamJoin(boolean includeOtherOperations) {
        KStream updatedStreamTwo;
        KStream updatedStreamOne;
        StreamsBuilder builder = new StreamsBuilder();
        KStream initialStreamOne = builder.stream("topic-one");
        KStream initialStreamTwo = builder.stream("topic-two");
        if (includeOtherOperations) {
            updatedStreamOne = initialStreamOne.selectKey((k, v) -> k + v).filter((k, v) -> true).peek((k, v) -> System.out.println(k + v));
            updatedStreamTwo = initialStreamTwo.selectKey((k, v) -> k + v).filter((k, v) -> true).peek((k, v) -> System.out.println(k + v));
        } else {
            updatedStreamOne = initialStreamOne.selectKey((k, v) -> k + v);
            updatedStreamTwo = initialStreamTwo.selectKey((k, v) -> k + v);
        }
        String joinRepartitionTopicName = "my-join";
        updatedStreamOne.join(updatedStreamTwo, (v1, v2) -> v1 + v2, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(1000L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String()).withName("my-join"));
        return builder.build().describe().toString();
    }

    private int getCountOfRepartitionTopicsFound(String topologyString, Pattern repartitionTopicPattern) {
        java.util.regex.Matcher matcher = repartitionTopicPattern.matcher(topologyString);
        ArrayList<String> repartitionTopicsFound = new ArrayList<String>();
        while (matcher.find()) {
            repartitionTopicsFound.add(matcher.group());
        }
        return repartitionTopicsFound.size();
    }

    private Topology buildTopology(String optimizationConfig) {
        Initializer initializer = () -> 0;
        Aggregator aggregator = (k, v, agg) -> agg + v.length();
        Reducer reducer = (v1, v2) -> v1 + ":" + v2;
        ArrayList processorValueCollector = new ArrayList();
        StreamsBuilder builder = new StreamsBuilder();
        KStream sourceStream = builder.stream(INPUT_TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream mappedStream = sourceStream.map((k, v) -> KeyValue.pair((Object)k.toUpperCase(Locale.getDefault()), (Object)v));
        mappedStream.filter((k, v) -> k.equals("B")).mapValues(v -> v.toUpperCase(Locale.getDefault())).process(() -> new SimpleProcessor(processorValueCollector), new String[0]);
        KStream countStream = mappedStream.groupByKey(Grouped.as((String)"count-stream")).count(Materialized.with((Serde)Serdes.String(), (Serde)Serdes.Long())).toStream();
        countStream.to(COUNT_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        mappedStream.groupByKey(Grouped.as((String)"aggregate-stream")).aggregate(initializer, aggregator, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.Integer())).toStream().to(AGGREGATION_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        mappedStream.filter((k, v) -> true).peek((k, v) -> System.out.println(k + ":" + v)).groupByKey(Grouped.as((String)"reduced-stream")).reduce(reducer, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).toStream().to(REDUCE_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        mappedStream.filter((k, v) -> k.equals("A")).join(countStream, (v1, v2) -> v1 + ":" + v2.toString(), JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(5000L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.Long()).withStoreName("joined-stream").withName("joined-stream")).to(JOINED_TOPIC);
        Properties properties = new Properties();
        properties.put("topology.optimization", optimizationConfig);
        return builder.build(properties);
    }

    private static class SimpleProcessor
    implements Processor<String, String, Void, Void> {
        final List<String> valueList;

        SimpleProcessor(List<String> valueList) {
            this.valueList = valueList;
        }

        public void process(Record<String, String> record) {
            this.valueList.add((String)record.value());
        }
    }
}

