/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KGroupedStreamImplTest {
    private static final String TOPIC = "topic";
    private static final String INVALID_STORE_NAME = "~foo bar~";
    private final StreamsBuilder builder = new StreamsBuilder();
    private KGroupedStream<String, String> groupedStream;
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Before
    public void before() {
        KStream stream = this.builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream = stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullAggregatorOnCogroup() {
        this.groupedStream.cogroup(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullReducerOnReduce() {
        this.groupedStream.reduce(null);
    }

    @Test(expected=TopologyException.class)
    public void shouldNotHaveInvalidStoreNameOnReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullReducerWithWindowedReduce() {
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L))).reduce(null, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullWindowsWithWindowedReduce() {
        this.groupedStream.windowedBy((Windows)null);
    }

    @Test(expected=TopologyException.class)
    public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L))).reduce(MockReducer.STRING_ADDER, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullInitializerOnAggregate() {
        this.groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullAdderOnAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as((String)"store"));
    }

    @Test(expected=TopologyException.class)
    public void shouldNotHaveInvalidStoreNameOnAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullInitializerOnWindowedAggregate() {
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L))).aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullAdderOnWindowedAggregate() {
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, null, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullWindowsOnWindowedAggregate() {
        this.groupedStream.windowedBy((Windows)null);
    }

    @Test(expected=TopologyException.class)
    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)INVALID_STORE_NAME));
    }

    private void doAggregateSessionWindows(MockProcessorSupplier<Windowed<String>, Integer> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"1", (Object)"1", 10L);
            inputTopic.pipeInput((Object)"2", (Object)"2", 15L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 30L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 70L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 100L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 90L);
        }
        Map result = supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)2, (long)30L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)1, (long)15L), result.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)3, (long)100L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldAggregateSessionWindows() {
        MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<Windowed<String>, Integer>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).aggregate(() -> 0, (aggKey, value, aggregate) -> aggregate + 1, (aggKey, aggOne, aggTwo) -> aggOne + aggTwo, Materialized.as((String)"session-store").withValueSerde(Serdes.Integer()));
        table.toStream().process(supplier, new String[0]);
        this.doAggregateSessionWindows(supplier);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldAggregateSessionWindowsWithInternalStoreName() {
        MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<Windowed<String>, Integer>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).aggregate(() -> 0, (aggKey, value, aggregate) -> aggregate + 1, (aggKey, aggOne, aggTwo) -> aggOne + aggTwo, Materialized.with(null, (Serde)Serdes.Integer()));
        table.toStream().process(supplier, new String[0]);
        this.doAggregateSessionWindows(supplier);
    }

    private void doCountSessionWindows(MockProcessorSupplier<Windowed<String>, Long> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"1", (Object)"1", 10L);
            inputTopic.pipeInput((Object)"2", (Object)"2", 15L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 30L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 70L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 100L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 90L);
        }
        Map result = supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)2L, (long)30L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)1L, (long)15L), result.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)3L, (long)100L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldCountSessionWindows() {
        MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<Windowed<String>, Long>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).count(Materialized.as((String)"session-store"));
        table.toStream().process(supplier, new String[0]);
        this.doCountSessionWindows(supplier);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldCountSessionWindowsWithInternalStoreName() {
        MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<Windowed<String>, Long>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).count();
        table.toStream().process(supplier, new String[0]);
        this.doCountSessionWindows(supplier);
        Assert.assertNull((Object)table.queryableStoreName());
    }

    private void doReduceSessionWindows(MockProcessorSupplier<Windowed<String>, String> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"1", (Object)"A", 10L);
            inputTopic.pipeInput((Object)"2", (Object)"Z", 15L);
            inputTopic.pipeInput((Object)"1", (Object)"B", 30L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 70L);
            inputTopic.pipeInput((Object)"1", (Object)"B", 100L);
            inputTopic.pipeInput((Object)"1", (Object)"C", 90L);
        }
        Map result = supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"A:B", (long)30L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"Z", (long)15L), result.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"A:B:C", (long)100L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldReduceSessionWindows() {
        MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<Windowed<String>, String>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).reduce((value1, value2) -> value1 + ":" + value2, Materialized.as((String)"session-store"));
        table.toStream().process(supplier, new String[0]);
        this.doReduceSessionWindows(supplier);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldReduceSessionWindowsWithInternalStoreName() {
        MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<Windowed<String>, String>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).reduce((value1, value2) -> value1 + ":" + value2);
        table.toStream().process(supplier, new String[0]);
        this.doReduceSessionWindows(supplier);
        Assert.assertNull((Object)table.queryableStoreName());
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).reduce(null, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
        this.groupedStream.windowedBy((SessionWindows)null);
    }

    @Test(expected=TopologyException.class)
    public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).reduce(MockReducer.STRING_ADDER, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).reduce(null, Materialized.as(null));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).aggregate(null, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as((String)"storeName"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).aggregate(MockInitializer.STRING_INIT, null, (aggKey, aggOne, aggTwo) -> null, Materialized.as((String)"storeName"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.as((String)"storeName"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy((SessionWindows)null);
    }

    @Test
    public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
    }

    @Test(expected=TopologyException.class)
    public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
        this.groupedStream.count((Materialized)null);
    }

    @Test
    public void shouldCountAndMaterializeResults() {
        this.groupedStream.count(Materialized.as((String)"count").withKeySerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            KeyValueStore count = driver.getKeyValueStore("count");
            MatcherAssert.assertThat((Object)count.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)3L));
            MatcherAssert.assertThat((Object)count.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)1L));
            MatcherAssert.assertThat((Object)count.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)2L));
            count = driver.getTimestampedKeyValueStore("count");
            MatcherAssert.assertThat((Object)count.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)3L, (long)10L)));
            MatcherAssert.assertThat((Object)count.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)1L, (long)1L)));
            MatcherAssert.assertThat((Object)count.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)2L, (long)9L)));
        }
    }

    @Test
    public void shouldLogAndMeasureSkipsInAggregateWithBuiltInMetricsVersion0100To24() {
        this.shouldLogAndMeasureSkipsInAggregate("0.10.0-2.4");
    }

    @Test
    public void shouldLogAndMeasureSkipsInAggregateWithBuiltInMetricsVersionLatest() {
        this.shouldLogAndMeasureSkipsInAggregate("latest");
    }

    private void shouldLogAndMeasureSkipsInAggregate(String builtInMetricsVersion) {
        this.groupedStream.count(Materialized.as((String)"count").withKeySerde(Serdes.String()));
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        this.props.setProperty("built.in.metrics.version", builtInMetricsVersion);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            LogCaptureAppender.unregister(appender);
            if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
                Map metrics = driver.metrics();
                Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
                Assert.assertNotEquals((Object)0.0, (Object)StreamsTestUtils.getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
            }
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
        }
    }

    @Test
    public void shouldReduceAndMaterializeResults() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)"reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            KeyValueStore reduced = driver.getKeyValueStore("reduce");
            MatcherAssert.assertThat((Object)reduced.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)"A+C+D"));
            MatcherAssert.assertThat((Object)reduced.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)"B"));
            MatcherAssert.assertThat((Object)reduced.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)"E+F"));
            reduced = driver.getTimestampedKeyValueStore("reduce");
            MatcherAssert.assertThat((Object)reduced.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"A+C+D", (long)10L)));
            MatcherAssert.assertThat((Object)reduced.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"B", (long)1L)));
            MatcherAssert.assertThat((Object)reduced.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"E+F", (long)9L)));
        }
    }

    @Test
    public void shouldLogAndMeasureSkipsInReduceWithBuiltInMetricsVersion0100To24() {
        this.shouldLogAndMeasureSkipsInReduce("0.10.0-2.4");
    }

    @Test
    public void shouldLogAndMeasureSkipsInReduceWithBuiltInMetricsVersionLatest() {
        this.shouldLogAndMeasureSkipsInReduce("latest");
    }

    private void shouldLogAndMeasureSkipsInReduce(String builtInMetricsVersion) {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)"reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        this.props.setProperty("built.in.metrics.version", builtInMetricsVersion);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            LogCaptureAppender.unregister(appender);
            if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
                Map metrics = driver.metrics();
                Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
                Assert.assertNotEquals((Object)0.0, (Object)StreamsTestUtils.getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
            }
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
        }
    }

    @Test
    public void shouldAggregateAndMaterializeResults() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"aggregate").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            KeyValueStore aggregate = driver.getKeyValueStore("aggregate");
            MatcherAssert.assertThat((Object)aggregate.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)"0+A+C+D"));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)"0+B"));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)"0+E+F"));
            aggregate = driver.getTimestampedKeyValueStore("aggregate");
            MatcherAssert.assertThat((Object)aggregate.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+A+C+D", (long)10L)));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+B", (long)1L)));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+E+F", (long)9L)));
        }
    }

    @Test
    public void shouldAggregateWithDefaultSerdes() {
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("1"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+A+C+D", (long)10L)));
            MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("2"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+B", (long)1L)));
            MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("3"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+E+F", (long)9L)));
        }
    }

    private void processData(TopologyTestDriver driver) {
        TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        inputTopic.pipeInput((Object)"1", (Object)"A", 5L);
        inputTopic.pipeInput((Object)"2", (Object)"B", 1L);
        inputTopic.pipeInput((Object)"1", (Object)"C", 3L);
        inputTopic.pipeInput((Object)"1", (Object)"D", 10L);
        inputTopic.pipeInput((Object)"3", (Object)"E", 8L);
        inputTopic.pipeInput((Object)"3", (Object)"F", 9L);
        inputTopic.pipeInput((Object)"3", (Object)null);
    }

    private void doCountWindowed(MockProcessorSupplier<Windowed<String>, Long> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"1", (Object)"A", 0L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 499L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 100L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 0L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 100L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 200L);
            inputTopic.pipeInput((Object)"3", (Object)"C", 1L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 500L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 500L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 500L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 500L);
            inputTopic.pipeInput((Object)"3", (Object)"B", 100L);
        }
        MatcherAssert.assertThat(supplier.theCapturedProcessor().processed, (Matcher)CoreMatchers.equalTo(Arrays.asList(new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), 1L, 0L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), 2L, 499L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), 3L, 499L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(0L, 500L)), 1L, 0L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(0L, 500L)), 2L, 100L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(0L, 500L)), 3L, 200L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"3", (Window)new TimeWindow(0L, 500L)), 1L, 1L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), 1L, 500L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), 2L, 500L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), 1L, 500L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), 2L, 500L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"3", (Window)new TimeWindow(0L, 500L)), 2L, 100L))));
    }

    @Test
    public void shouldCountWindowed() {
        MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<Windowed<String>, Long>();
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).count(Materialized.as((String)"aggregate-by-key-windowed")).toStream().process(supplier, new String[0]);
        this.doCountWindowed(supplier);
    }

    @Test
    public void shouldCountWindowedWithInternalStoreName() {
        MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<Windowed<String>, Long>();
        ArrayList results = new ArrayList();
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).count().toStream().process(supplier, new String[0]);
        this.doCountWindowed(supplier);
    }
}

