/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;

public class SuppressScenarioTest {
    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
    private final Properties config = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)this.getClass().getSimpleName().toLowerCase(Locale.getDefault())), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"bootstrap.servers", (Object)"bogus")}));

    @Test
    public void shouldImmediatelyEmitEventsWithZeroEmitAfter() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count();
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ZERO, (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v2", 1L));
            driver.pipeInput(recordFactory.create("input", (Object)"k2", (Object)"v1", 2L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, 0L), new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L), new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, 0L), new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L), new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            driver.pipeInput(recordFactory.create("input", (Object)"x", (Object)"x", 3L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("x", 1L, 3L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("x", 1L, 3L)));
            driver.pipeInput(recordFactory.create("input", (Object)"x", (Object)"x", 4L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("x", 0L, 4L), new KeyValueTimestamp<String, Long>("x", 1L, 4L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("x", 0L, 4L), new KeyValueTimestamp<String, Long>("x", 1L, 4L)));
        }
    }

    @Test
    public void shouldSuppressIntermediateEventsWithTimeLimit() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count();
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(2L), (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v2", 1L));
            driver.pipeInput(recordFactory.create("input", (Object)"k2", (Object)"v1", 2L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, 0L), new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L), new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            driver.pipeInput(recordFactory.create("input", (Object)"tick", (Object)"tick", 3L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("tick", 1L, 3L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("v2", 1L, 1L)));
            driver.pipeInput(recordFactory.create("input", (Object)"tick", (Object)"tick", 4L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("tick", 0L, 4L), new KeyValueTimestamp<String, Long>("tick", 1L, 4L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.emptyList());
        }
    }

    @Test
    public void shouldSuppressIntermediateEventsWithRecordLimit() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count(Materialized.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L).emitEarlyWhenFull())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology.describe());
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v2", 1L));
            driver.pipeInput(recordFactory.create("input", (Object)"k2", (Object)"v1", 2L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, 0L), new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L), new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L)));
            driver.pipeInput(recordFactory.create("input", (Object)"x", (Object)"x", 3L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("x", 1L, 3L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
        }
    }

    @Test
    public void shouldSuppressIntermediateEventsWithBytesLimit() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count();
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxBytes((long)200L).emitEarlyWhenFull())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology.describe());
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v2", 1L));
            driver.pipeInput(recordFactory.create("input", (Object)"k2", (Object)"v1", 2L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, 0L), new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L), new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 0L, 1L), new KeyValueTimestamp<String, Long>("v2", 1L, 1L)));
            driver.pipeInput(recordFactory.create("input", (Object)"x", (Object)"x", 3L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("x", 1L, 3L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp<String, Long>("v1", 1L, 2L)));
        }
    }

    @Test
    public void shouldSupportFinalResultsForTimeWindows() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((k, v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(2L)).grace(Duration.ofMillis(1L))).count(Materialized.as((String)"counts").withCachingDisabled());
        valueCounts.suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology.describe());
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 1L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 2L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 1L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 5L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/2]", 1L, 0L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 2L, 1L), new KeyValueTimestamp<String, Long>("[k1@2/4]", 1L, 2L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 3L, 1L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 4L, 0L), new KeyValueTimestamp<String, Long>("[k1@4/6]", 1L, 5L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/2]", 4L, 0L), new KeyValueTimestamp<String, Long>("[k1@2/4]", 1L, 2L)));
        }
    }

    @Test
    public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((k, v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(2L)).grace(Duration.ofMillis(2L))).count(Materialized.as((String)"counts").withCachingDisabled().withKeySerde(STRING_SERDE));
        valueCounts.suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology.describe());
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 1L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 2L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 3L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 4L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 30L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/2]", 1L, 0L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 2L, 1L), new KeyValueTimestamp<String, Long>("[k1@2/4]", 1L, 2L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 3L, 0L), new KeyValueTimestamp<String, Long>("[k1@2/4]", 2L, 3L), new KeyValueTimestamp<String, Long>("[k1@0/2]", 4L, 0L), new KeyValueTimestamp<String, Long>("[k1@4/6]", 1L, 4L), new KeyValueTimestamp<String, Long>("[k1@30/32]", 1L, 30L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/2]", 4L, 0L), new KeyValueTimestamp<String, Long>("[k1@2/4]", 2L, 3L), new KeyValueTimestamp<String, Long>("[k1@4/6]", 1L, 4L)));
        }
    }

    @Test
    public void shouldSupportFinalResultsForSessionWindows() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((k, v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(5L)).grace(Duration.ofMillis(0L))).count(Materialized.as((String)"counts").withCachingDisabled());
        valueCounts.suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-suppressed", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-raw", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology.describe());
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)STRING_SERIALIZER, (Serializer)STRING_SERIALIZER);
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.config);){
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 0L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 5L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 1L));
            driver.pipeInput(recordFactory.create("input", (Object)"k2", (Object)"v1", 6L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 5L));
            driver.pipeInput(recordFactory.create("input", (Object)"k1", (Object)"v1", 30L));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/0]", 1L, 0L), new KeyValueTimestamp<String, Object>("[k1@0/0]", null, 5L), new KeyValueTimestamp<String, Long>("[k1@0/5]", 2L, 5L), new KeyValueTimestamp<String, Object>("[k1@0/5]", null, 1L), new KeyValueTimestamp<String, Long>("[k1@0/5]", 3L, 1L), new KeyValueTimestamp<String, Long>("[k2@6/6]", 1L, 6L), new KeyValueTimestamp<String, Long>("[k1@30/30]", 1L, 30L)));
            SuppressScenarioTest.verify(SuppressScenarioTest.drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp<String, Long>("[k1@0/5]", 3L, 1L), new KeyValueTimestamp<String, Long>("[k2@6/6]", 1L, 6L)));
        }
    }

    private static <K, V> void verify(List<ProducerRecord<K, V>> results, List<KeyValueTimestamp<K, V>> expectedResults) {
        if (results.size() != expectedResults.size()) {
            throw new AssertionError((Object)(SuppressScenarioTest.printRecords(results) + " != " + expectedResults));
        }
        Iterator<KeyValueTimestamp<K, V>> expectedIterator = expectedResults.iterator();
        for (ProducerRecord<K, V> result : results) {
            KeyValueTimestamp<K, V> expected = expectedIterator.next();
            try {
                OutputVerifier.compareKeyValueTimestamp(result, expected.key(), expected.value(), (long)expected.timestamp());
            }
            catch (AssertionError e) {
                throw new AssertionError(SuppressScenarioTest.printRecords(results) + " != " + expectedResults, (Throwable)((Object)e));
            }
        }
    }

    private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(TopologyTestDriver driver, String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        LinkedList<ProducerRecord> result = new LinkedList<ProducerRecord>();
        ProducerRecord next = driver.readOutput(topic, keyDeserializer, valueDeserializer);
        while (next != null) {
            result.add(next);
            next = driver.readOutput(topic, keyDeserializer, valueDeserializer);
        }
        return new ArrayList<ProducerRecord<K, V>>(result);
    }

    private static <K, V> String printRecords(List<ProducerRecord<K, V>> result) {
        StringBuilder resultStr = new StringBuilder();
        resultStr.append("[\n");
        for (ProducerRecord<K, V> record : result) {
            resultStr.append("  ").append(record).append("\n");
        }
        resultStr.append("]");
        return resultStr.toString();
    }
}

