/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Suppressed;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.Test;

public class SuppressTopologyTest {
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final String NAMED_FINAL_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000004 (stores: [])\n      --> KSTREAM-SINK-0000000003\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Sink: KSTREAM-SINK-0000000003 (topic: counts-repartition)\n      <-- KSTREAM-FILTER-0000000004\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000005 (topics: [counts-repartition])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [counts])\n      --> myname\n      <-- KSTREAM-SOURCE-0000000005\n    Processor: myname (stores: [myname-store])\n      --> KTABLE-TOSTREAM-0000000006\n      <-- KSTREAM-AGGREGATE-0000000002\n    Processor: KTABLE-TOSTREAM-0000000006 (stores: [])\n      --> KSTREAM-MAP-0000000007\n      <-- myname\n    Processor: KSTREAM-MAP-0000000007 (stores: [])\n      --> KSTREAM-SINK-0000000008\n      <-- KTABLE-TOSTREAM-0000000006\n    Sink: KSTREAM-SINK-0000000008 (topic: output-suppressed)\n      <-- KSTREAM-MAP-0000000007\n\n";
    private static final String ANONYMOUS_FINAL_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000004 (stores: [])\n      --> KSTREAM-SINK-0000000003\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Sink: KSTREAM-SINK-0000000003 (topic: counts-repartition)\n      <-- KSTREAM-FILTER-0000000004\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000005 (topics: [counts-repartition])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [counts])\n      --> KTABLE-SUPPRESS-0000000006\n      <-- KSTREAM-SOURCE-0000000005\n    Processor: KTABLE-SUPPRESS-0000000006 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000007])\n      --> KTABLE-TOSTREAM-0000000008\n      <-- KSTREAM-AGGREGATE-0000000002\n    Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n      --> KSTREAM-MAP-0000000009\n      <-- KTABLE-SUPPRESS-0000000006\n    Processor: KSTREAM-MAP-0000000009 (stores: [])\n      --> KSTREAM-SINK-0000000010\n      <-- KTABLE-TOSTREAM-0000000008\n    Sink: KSTREAM-SINK-0000000010 (topic: output-suppressed)\n      <-- KSTREAM-MAP-0000000009\n\n";
    private static final String NAMED_INTERMEDIATE_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> asdf\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: asdf (stores: [asdf-store])\n      --> KTABLE-TOSTREAM-0000000003\n      <-- KSTREAM-AGGREGATE-0000000002\n    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000004\n      <-- asdf\n    Sink: KSTREAM-SINK-0000000004 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000003\n\n";
    private static final String ANONYMOUS_INTERMEDIATE_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> KTABLE-SUPPRESS-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KTABLE-SUPPRESS-0000000003 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000004])\n      --> KTABLE-TOSTREAM-0000000005\n      <-- KSTREAM-AGGREGATE-0000000002\n    Processor: KTABLE-TOSTREAM-0000000005 (stores: [])\n      --> KSTREAM-SINK-0000000006\n      <-- KTABLE-SUPPRESS-0000000003\n    Sink: KSTREAM-SINK-0000000006 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000005\n\n";

    @Test
    public void shouldUseNumberingForAnonymousFinalSuppressionNode() {
        StreamsBuilder anonymousNodeBuilder = new StreamsBuilder();
        anonymousNodeBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((k, v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(5L)).grace(Duration.ofMillis(5L))).count(Materialized.as((String)"counts").withCachingDisabled()).suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        String anonymousNodeTopology = anonymousNodeBuilder.build().describe().toString();
        MatcherAssert.assertThat((Object)anonymousNodeTopology, (Matcher)Is.is((Object)ANONYMOUS_FINAL_TOPOLOGY));
    }

    @Test
    public void shouldApplyNameToFinalSuppressionNode() {
        StreamsBuilder namedNodeBuilder = new StreamsBuilder();
        namedNodeBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((k, v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(5L)).grace(Duration.ofMillis(5L))).count(Materialized.as((String)"counts").withCachingDisabled()).suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded()).withName("myname")).toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        String namedNodeTopology = namedNodeBuilder.build().describe().toString();
        MatcherAssert.assertThat((Object)namedNodeTopology, (Matcher)Is.is((Object)NAMED_FINAL_TOPOLOGY));
    }

    @Test
    public void shouldUseNumberingForAnonymousSuppressionNode() {
        StreamsBuilder anonymousNodeBuilder = new StreamsBuilder();
        anonymousNodeBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupByKey().count().suppress(Suppressed.untilTimeLimit((Duration)Duration.ofSeconds(1L), (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded())).toStream().to("output", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        String anonymousNodeTopology = anonymousNodeBuilder.build().describe().toString();
        MatcherAssert.assertThat((Object)anonymousNodeTopology, (Matcher)Is.is((Object)ANONYMOUS_INTERMEDIATE_TOPOLOGY));
    }

    @Test
    public void shouldApplyNameToSuppressionNode() {
        StreamsBuilder namedNodeBuilder = new StreamsBuilder();
        namedNodeBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupByKey().count().suppress(Suppressed.untilTimeLimit((Duration)Duration.ofSeconds(1L), (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded()).withName("asdf")).toStream().to("output", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        String namedNodeTopology = namedNodeBuilder.build().describe().toString();
        MatcherAssert.assertThat((Object)namedNodeTopology, (Matcher)Is.is((Object)NAMED_INTERMEDIATE_TOPOLOGY));
    }
}

