/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Test;

public class SuppressScenarioTest {
    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
    private final Properties config = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)this.getClass().getSimpleName().toLowerCase(Locale.getDefault())), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"bootstrap.servers", (Object)"bogus")}));

    @Test
    public void shouldImmediatelyEmitEventsWithZeroEmitAfter() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count();
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ZERO, (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            TestInputTopic inputTopic = driver.createInputTopic("input", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v2", 1L);
            inputTopic.pipeInput((Object)"k2", (Object)"v1", 2L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, 0L), new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L), new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, 0L), new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L), new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            inputTopic.pipeInput((Object)"x", (Object)"x", 3L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("x", 1L, 3L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("x", 1L, 3L)));
            inputTopic.pipeInput((Object)"x", (Object)"y", 4L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("x", 0L, 4L), new KeyValueTimestamp<String, Long>("y", 1L, 4L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("x", 0L, 4L), new KeyValueTimestamp<String, Long>("y", 1L, 4L)));
        }
    }

    @Test
    public void shouldSuppressIntermediateEventsWithTimeLimit() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count();
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(2L), (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            TestInputTopic inputTopic = driver.createInputTopic("input", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v2", 1L);
            inputTopic.pipeInput((Object)"k2", (Object)"v1", 2L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, 0L), new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L), new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            inputTopic.pipeInput((Object)"tick", (Object)"tick", 3L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("tick", 1L, 3L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("v2", 1L, 1L)));
            inputTopic.pipeInput((Object)"tick", (Object)"tock", 4L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("tick", 0L, 4L), new KeyValueTimestamp<String, Long>("tock", 1L, 4L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.emptyList());
        }
    }

    @Test
    public void shouldSuppressIntermediateEventsWithRecordLimit() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count(Materialized.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L).emitEarlyWhenFull())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology.describe());
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            TestInputTopic inputTopic = driver.createInputTopic("input", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v2", 1L);
            inputTopic.pipeInput((Object)"k2", (Object)"v1", 2L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, 0L), new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L), new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L)));
            inputTopic.pipeInput((Object)"x", (Object)"x", 3L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("x", 1L, 3L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
        }
    }

    @Test
    public void shouldSuppressIntermediateEventsWithBytesLimit() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count();
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxBytes((long)200L).emitEarlyWhenFull())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology.describe());
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            TestInputTopic inputTopic = driver.createInputTopic("input", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v2", 1L);
            inputTopic.pipeInput((Object)"k2", (Object)"v1", 2L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, 0L), new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L), new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L)));
            inputTopic.pipeInput((Object)"x", (Object)"x", 3L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("x", 1L, 3L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
        }
    }

    @Test
    public void shouldSupportFinalResultsForTimeWindows() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((k, v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(2L)).grace(Duration.ofMillis(1L))).count(Materialized.as((String)"counts").withCachingDisabled());
        valueCounts.suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology.describe());
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            TestInputTopic inputTopic = driver.createInputTopic("input", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 1L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 2L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 1L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 5L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/2]", 1L, 0L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 2L, 1L), new KeyValueTimestamp<String, Long>("[k1@2/4]", 1L, 2L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 3L, 1L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 4L, 1L), new KeyValueTimestamp<String, Long>("[k1@4/6]", 1L, 5L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/2]", 4L, 1L), new KeyValueTimestamp<String, Long>("[k1@2/4]", 1L, 2L)));
        }
    }

    @Test
    public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((k, v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(2L)).grace(Duration.ofMillis(2L))).count(Materialized.as((String)"counts").withCachingDisabled().withKeySerde(STRING_SERDE));
        valueCounts.suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology.describe());
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            TestInputTopic inputTopic = driver.createInputTopic("input", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 1L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 2L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 3L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 4L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 30L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/2]", 1L, 0L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 2L, 1L), new KeyValueTimestamp<String, Long>("[k1@2/4]", 1L, 2L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 3L, 1L), new KeyValueTimestamp<String, Long>("[k1@2/4]", 2L, 3L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 4L, 1L), new KeyValueTimestamp<String, Long>("[k1@4/6]", 1L, 4L), new KeyValueTimestamp<String, Long>("[k1@30/32]", 1L, 30L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/2]", 4L, 1L), new KeyValueTimestamp<String, Long>("[k1@2/4]", 2L, 3L), new KeyValueTimestamp<String, Long>("[k1@4/6]", 1L, 4L)));
        }
    }

    @Test
    public void shouldSupportFinalResultsForSessionWindows() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((k, v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(5L)).grace(Duration.ofMillis(0L))).count(Materialized.as((String)"counts").withCachingDisabled());
        valueCounts.suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology.describe());
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            TestInputTopic inputTopic = driver.createInputTopic("input", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 0L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 5L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 1L);
            inputTopic.pipeInput((Object)"k2", (Object)"v1", 6L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 5L);
            inputTopic.pipeInput((Object)"k1", (Object)"v1", 30L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/0]", 1L, 0L), new KeyValueTimestamp<String, Object>("[k1@0/0]", null, 0L), new KeyValueTimestamp<String, Long>("[k1@0/5]", 2L, 5L), new KeyValueTimestamp<String, Object>("[k1@0/5]", null, 5L), new KeyValueTimestamp<String, Long>("[k1@0/5]", 3L, 5L), new KeyValueTimestamp<String, Long>("[k2@6/6]", 1L, 6L), new KeyValueTimestamp<String, Long>("[k1@30/30]", 1L, 30L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/5]", 3L, 5L), new KeyValueTimestamp<String, Long>("[k2@6/6]", 1L, 6L)));
        }
    }

    @Test
    public void shouldWorkBeforeGroupBy() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.table("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(10L), (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded())).groupBy(KeyValue::pair, Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).count().toStream().to("output", Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.config);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            inputTopic.pipeInput((Object)"A", (Object)"a", 0L);
            inputTopic.pipeInput((Object)"tick", (Object)"tick", 10L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("A", 1L, 0L)));
        }
    }

    @Test
    public void shouldWorkBeforeJoinRight() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable left = builder.table("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KTable right = builder.table("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(10L), (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded()));
        left.outerJoin(right, (l, r) -> String.format("(%s,%s)", l, r)).toStream().to("output", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.config);){
            TestInputTopic inputTopicRight = driver.createInputTopic("right", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            TestInputTopic inputTopicLeft = driver.createInputTopic("left", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            inputTopicRight.pipeInput((Object)"B", (Object)"1", 0L);
            inputTopicRight.pipeInput((Object)"A", (Object)"1", 0L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.emptyList());
            inputTopicRight.pipeInput((Object)"tick", (Object)"tick", 10L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, String>("A", "(null,1)", 0L), new KeyValueTimestamp<String, String>("B", "(null,1)", 0L)));
            inputTopicRight.pipeInput((Object)"A", (Object)"2", 11L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.emptyList());
            inputTopicLeft.pipeInput((Object)"A", (Object)"a", 12L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, String>("A", "(a,1)", 12L)));
            inputTopicLeft.pipeInput((Object)"B", (Object)"b", 12L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, String>("B", "(b,1)", 12L)));
            inputTopicLeft.pipeInput((Object)"A", (Object)"b", 13L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, String>("A", "(b,1)", 13L)));
            inputTopicRight.pipeInput((Object)"tick", (Object)"tick1", 21L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, String>("tick", "(null,tick1)", 21L), new KeyValueTimestamp<String, String>("A", "(b,2)", 13L)));
        }
    }

    @Test
    public void shouldWorkBeforeJoinLeft() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable left = builder.table("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(10L), (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded()));
        KTable right = builder.table("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        left.outerJoin(right, (l, r) -> String.format("(%s,%s)", l, r)).toStream().to("output", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        Topology topology = builder.build();
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            TestInputTopic inputTopicRight = driver.createInputTopic("right", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            TestInputTopic inputTopicLeft = driver.createInputTopic("left", (Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
            inputTopicLeft.pipeInput((Object)"B", (Object)"1", 0L);
            inputTopicLeft.pipeInput((Object)"A", (Object)"1", 0L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.emptyList());
            inputTopicLeft.pipeInput((Object)"tick", (Object)"tick", 10L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, String>("A", "(1,null)", 0L), new KeyValueTimestamp<String, String>("B", "(1,null)", 0L)));
            inputTopicLeft.pipeInput((Object)"A", (Object)"2", 11L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.emptyList());
            inputTopicRight.pipeInput((Object)"A", (Object)"a", 12L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, String>("A", "(1,a)", 12L)));
            inputTopicRight.pipeInput((Object)"B", (Object)"b", 12L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, String>("B", "(1,b)", 12L)));
            inputTopicRight.pipeInput((Object)"A", (Object)"b", 13L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, String>("A", "(1,b)", 13L)));
            inputTopicLeft.pipeInput((Object)"tick", (Object)"tick1", 21L);
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, String>("tick", "(tick1,null)", 21L), new KeyValueTimestamp<String, String>("A", "(2,b)", 13L)));
        }
    }

    private static <K, V> void verify(List<TestRecord<K, V>> results, List<KeyValueTimestamp<K, V>> expectedResults) {
        if (results.size() != expectedResults.size()) {
            throw new AssertionError((Object)(SuppressScenarioTest.printRecords(results) + " != " + expectedResults));
        }
        Iterator<KeyValueTimestamp<K, V>> expectedIterator = expectedResults.iterator();
        for (TestRecord<K, V> result : results) {
            KeyValueTimestamp<K, V> expected = expectedIterator.next();
            try {
                MatcherAssert.assertThat(result, (Matcher)CoreMatchers.equalTo((Object)new TestRecord(expected.key(), expected.value(), null, Long.valueOf(expected.timestamp()))));
            }
            catch (AssertionError e) {
                throw new AssertionError(SuppressScenarioTest.printRecords(results) + " != " + expectedResults, (Throwable)((Object)e));
            }
        }
    }

    private static <K, V> List<TestRecord<K, V>> drainProducerRecords(TopologyTestDriver driver, String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        return driver.createOutputTopic(topic, keyDeserializer, valueDeserializer).readRecordsToList();
    }

    private static <K, V> String printRecords(List<TestRecord<K, V>> result) {
        StringBuilder resultStr = new StringBuilder();
        resultStr.append("[\n");
        for (TestRecord<K, V> record : result) {
            resultStr.append("  ").append(record).append("\n");
        }
        resultStr.append("]");
        return resultStr.toString();
    }
}

