/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.tools.ConsoleConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class KStreamAggregationIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String userSessionsStream;
    private String outputTopic;
    private KGroupedStream<String, String> groupedStream;
    private Reducer<String> reducer;
    private Initializer<Integer> initializer;
    private Aggregator<String, String, Integer> aggregator;
    private KStream<Integer, String> stream;

    public KStreamAggregationIntegrationTest() {
        this.mockTime = KStreamAggregationIntegrationTest.CLUSTER.time;
    }

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws InterruptedException {
        this.builder = new StreamsBuilder();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        this.createTopics(safeTestName);
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "app-" + safeTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("statestore.cache.max.bytes", (Object)0);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100L);
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        KeyValueMapper mapper = MockMapper.selectValueMapper();
        this.stream = this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.groupedStream = this.stream.groupBy(mapper, Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.reducer = (value1, value2) -> value1 + ":" + value2;
        this.initializer = () -> 0;
        this.aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
    }

    @AfterEach
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldReduce(TestInfo testInfo) throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.reduce(this.reducer, Materialized.as((String)"reduce-by-key")).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.startStreams();
        this.produceMessages(this.mockTime.milliseconds());
        List results = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), 10, testInfo);
        results.sort(KStreamAggregationIntegrationTest::compare);
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(new KeyValueTimestamp<String, String>("A", "A", this.mockTime.milliseconds()), new KeyValueTimestamp<String, String>("A", "A:A", this.mockTime.milliseconds()), new KeyValueTimestamp<String, String>("B", "B", this.mockTime.milliseconds()), new KeyValueTimestamp<String, String>("B", "B:B", this.mockTime.milliseconds()), new KeyValueTimestamp<String, String>("C", "C", this.mockTime.milliseconds()), new KeyValueTimestamp<String, String>("C", "C:C", this.mockTime.milliseconds()), new KeyValueTimestamp<String, String>("D", "D", this.mockTime.milliseconds()), new KeyValueTimestamp<String, String>("D", "D:D", this.mockTime.milliseconds()), new KeyValueTimestamp<String, String>("E", "E", this.mockTime.milliseconds()), new KeyValueTimestamp<String, String>("E", "E:E", this.mockTime.milliseconds()))));
    }

    private static <K extends Comparable, V extends Comparable> int compare(KeyValueTimestamp<K, V> o1, KeyValueTimestamp<K, V> o2) {
        int keyComparison = ((Comparable)o1.key()).compareTo(o2.key());
        if (keyComparison == 0) {
            int valueComparison = ((Comparable)o1.value()).compareTo(o2.value());
            if (valueComparison == 0) {
                return Long.compare(o1.timestamp(), o2.timestamp());
            }
            return valueComparison;
        }
        return keyComparison;
    }

    @Test
    public void shouldReduceWindowed(TestInfo testInfo) throws Exception {
        String[] allRecords;
        long firstBatchTimestamp = this.mockTime.milliseconds();
        this.mockTime.sleep(1000L);
        this.produceMessages(firstBatchTimestamp);
        long secondBatchTimestamp = this.mockTime.milliseconds();
        this.produceMessages(secondBatchTimestamp);
        this.produceMessages(secondBatchTimestamp);
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)500L);
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).reduce(this.reducer).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)Serdes.String()));
        this.startStreams();
        List windowedOutput = this.receiveMessages((Deserializer)new TimeWindowedDeserializer(), (Deserializer)new StringDeserializer(), String.class, 15, testInfo);
        String resultFromConsoleConsumer = this.readWindowedKeyedMessagesViaConsoleConsumer((Deserializer)new TimeWindowedDeserializer(), (Deserializer)new StringDeserializer(), String.class, 15, true);
        Comparator<KeyValueTimestamp> comparator = Comparator.comparing(o -> (String)((Windowed)o.key()).key()).thenComparing(KeyValueTimestamp::value);
        windowedOutput.sort(comparator);
        long firstBatchWindowStart = firstBatchTimestamp / 500L * 500L;
        long firstBatchWindowEnd = firstBatchWindowStart + 500L;
        long secondBatchWindowStart = secondBatchTimestamp / 500L * 500L;
        long secondBatchWindowEnd = secondBatchWindowStart + 500L;
        List<KeyValueTimestamp> expectResult = Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "A", firstBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A:A", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "B", firstBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B:B", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "C", firstBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C:C", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "D", firstBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D:D", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "E", firstBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E:E", secondBatchTimestamp));
        MatcherAssert.assertThat(windowedOutput, (Matcher)Is.is(expectResult));
        HashSet<String> expectResultString = new HashSet<String>(expectResult.size());
        for (KeyValueTimestamp eachRecord : expectResult) {
            expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + eachRecord.key() + ", " + (String)eachRecord.value());
        }
        for (String record : allRecords = resultFromConsoleConsumer.split("\n")) {
            Assertions.assertTrue((boolean)expectResultString.contains(record));
        }
    }

    @Test
    public void shouldAggregate(TestInfo testInfo) throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.aggregate(this.initializer, this.aggregator, Materialized.as((String)"aggregate-by-selected-key")).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        this.startStreams();
        this.produceMessages(this.mockTime.milliseconds());
        List results = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new IntegerDeserializer(), 10, testInfo);
        results.sort(KStreamAggregationIntegrationTest::compare);
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(new KeyValueTimestamp<String, Integer>("A", 1, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Integer>("A", 2, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Integer>("B", 1, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Integer>("B", 2, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Integer>("C", 1, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Integer>("C", 2, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Integer>("D", 1, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Integer>("D", 2, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Integer>("E", 1, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Integer>("E", 2, this.mockTime.milliseconds()))));
    }

    @Test
    public void shouldAggregateWindowed(TestInfo testInfo) throws Exception {
        String[] allRecords;
        long firstTimestamp = this.mockTime.milliseconds();
        this.mockTime.sleep(1000L);
        this.produceMessages(firstTimestamp);
        long secondTimestamp = this.mockTime.milliseconds();
        this.produceMessages(secondTimestamp);
        this.produceMessages(secondTimestamp);
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)500L);
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).aggregate(this.initializer, this.aggregator, Materialized.with(null, (Serde)Serdes.Integer())).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)Serdes.Integer()));
        this.startStreams();
        List windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(500L)), (Deserializer)new IntegerDeserializer(), String.class, 15, testInfo);
        String resultFromConsoleConsumer = this.readWindowedKeyedMessagesViaConsoleConsumer((Deserializer)new TimeWindowedDeserializer(), (Deserializer)new IntegerDeserializer(), String.class, 15, true);
        Comparator<KeyValueTimestamp> comparator = Comparator.comparing(o -> (String)((Windowed)o.key()).key()).thenComparingInt(KeyValueTimestamp::value);
        windowedMessages.sort(comparator);
        long firstWindowStart = firstTimestamp / 500L * 500L;
        long firstWindowEnd = firstWindowStart + 500L;
        long secondWindowStart = secondTimestamp / 500L * 500L;
        long secondWindowEnd = secondWindowStart + 500L;
        List<KeyValueTimestamp> expectResult = Arrays.asList(new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"A", (Window)new TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"A", (Window)new TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"A", (Window)new TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"B", (Window)new TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"B", (Window)new TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"B", (Window)new TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"C", (Window)new TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"C", (Window)new TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"C", (Window)new TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"D", (Window)new TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"D", (Window)new TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"D", (Window)new TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"E", (Window)new TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"E", (Window)new TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"E", (Window)new TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp));
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
        HashSet<String> expectResultString = new HashSet<String>(expectResult.size());
        for (KeyValueTimestamp eachRecord : expectResult) {
            expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + eachRecord.key() + ", " + eachRecord.value());
        }
        for (String record : allRecords = resultFromConsoleConsumer.split("\n")) {
            Assertions.assertTrue((boolean)expectResultString.contains(record));
        }
    }

    private void shouldCountHelper(TestInfo testInfo) throws Exception {
        this.startStreams();
        this.produceMessages(this.mockTime.milliseconds());
        List results = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), 10, testInfo);
        results.sort(KStreamAggregationIntegrationTest::compare);
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(new KeyValueTimestamp<String, Long>("A", 1L, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Long>("A", 2L, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Long>("B", 1L, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Long>("B", 2L, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Long>("C", 1L, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Long>("C", 2L, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Long>("D", 1L, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Long>("D", 2L, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Long>("E", 1L, this.mockTime.milliseconds()), new KeyValueTimestamp<String, Long>("E", 2L, this.mockTime.milliseconds()))));
    }

    @Test
    public void shouldCount(TestInfo testInfo) throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.count(Materialized.as((String)"count-by-key")).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.shouldCountHelper(testInfo);
    }

    @Test
    public void shouldCountWithInternalStore(TestInfo testInfo) throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.count().toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.shouldCountHelper(testInfo);
    }

    @Test
    public void shouldGroupByKey(TestInfo testInfo) throws Exception {
        long timestamp = this.mockTime.milliseconds();
        this.produceMessages(timestamp);
        this.produceMessages(timestamp);
        this.stream.groupByKey(Grouped.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).count().toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.startStreams();
        List results = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), 10, testInfo);
        results.sort(KStreamAggregationIntegrationTest::compare);
        long window = timestamp / 500L * 500L;
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(new KeyValueTimestamp<String, Long>("1@" + window, 1L, timestamp), new KeyValueTimestamp<String, Long>("1@" + window, 2L, timestamp), new KeyValueTimestamp<String, Long>("2@" + window, 1L, timestamp), new KeyValueTimestamp<String, Long>("2@" + window, 2L, timestamp), new KeyValueTimestamp<String, Long>("3@" + window, 1L, timestamp), new KeyValueTimestamp<String, Long>("3@" + window, 2L, timestamp), new KeyValueTimestamp<String, Long>("4@" + window, 1L, timestamp), new KeyValueTimestamp<String, Long>("4@" + window, 2L, timestamp), new KeyValueTimestamp<String, Long>("5@" + window, 1L, timestamp), new KeyValueTimestamp<String, Long>("5@" + window, 2L, timestamp))));
    }

    @Test
    public void shouldReduceSlidingWindows(TestInfo testInfo) throws Exception {
        String[] allRecords;
        long firstBatchTimestamp = this.mockTime.milliseconds();
        long timeDifference = 500L;
        this.produceMessages(firstBatchTimestamp);
        long secondBatchTimestamp = firstBatchTimestamp + 250L;
        this.produceMessages(secondBatchTimestamp);
        long thirdBatchTimestamp = firstBatchTimestamp + 500L - 100L;
        this.produceMessages(thirdBatchTimestamp);
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)500L);
        this.groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(500L), (Duration)Duration.ofMillis(2000L))).reduce(this.reducer).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)Serdes.String()));
        this.startStreams();
        List windowedOutput = this.receiveMessages((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(500L)), (Deserializer)new StringDeserializer(), String.class, 30, testInfo);
        String resultFromConsoleConsumer = this.readWindowedKeyedMessagesViaConsoleConsumer((Deserializer)new TimeWindowedDeserializer(), (Deserializer)new StringDeserializer(), String.class, 30, true);
        Comparator<KeyValueTimestamp> comparator = Comparator.comparing(o -> (String)((Windowed)o.key()).key()).thenComparing(KeyValueTimestamp::value);
        windowedOutput.sort(comparator);
        long firstBatchLeftWindowStart = firstBatchTimestamp - 500L;
        long firstBatchLeftWindowEnd = firstBatchLeftWindowStart + 500L;
        long firstBatchRightWindowStart = firstBatchTimestamp + 1L;
        long firstBatchRightWindowEnd = firstBatchRightWindowStart + 500L;
        long secondBatchLeftWindowStart = secondBatchTimestamp - 500L;
        long secondBatchLeftWindowEnd = secondBatchLeftWindowStart + 500L;
        long secondBatchRightWindowStart = secondBatchTimestamp + 1L;
        long secondBatchRightWindowEnd = secondBatchRightWindowStart + 500L;
        long thirdBatchLeftWindowStart = thirdBatchTimestamp - 500L;
        long thirdBatchLeftWindowEnd = thirdBatchLeftWindowStart + 500L;
        List<KeyValueTimestamp> expectResult = Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "A", firstBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "A", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "A:A", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A:A", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "A:A:A", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "B", firstBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "B", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "B:B", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B:B", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "B:B:B", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "C", firstBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "C", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "C:C", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C:C", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "C:C:C", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "D", firstBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "D", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "D:D", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D:D", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "D:D:D", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "E", firstBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "E", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "E:E", secondBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E:E", thirdBatchTimestamp), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "E:E:E", thirdBatchTimestamp));
        MatcherAssert.assertThat(windowedOutput, (Matcher)Is.is(expectResult));
        HashSet<String> expectResultString = new HashSet<String>(expectResult.size());
        for (KeyValueTimestamp eachRecord : expectResult) {
            expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + eachRecord.key() + ", " + (String)eachRecord.value());
        }
        for (String record : allRecords = resultFromConsoleConsumer.split("\n")) {
            Assertions.assertTrue((boolean)expectResultString.contains(record));
        }
    }

    @Test
    public void shouldAggregateSlidingWindows(TestInfo testInfo) throws Exception {
        String[] allRecords;
        long firstBatchTimestamp = this.mockTime.milliseconds();
        long timeDifference = 500L;
        this.produceMessages(firstBatchTimestamp);
        long secondBatchTimestamp = firstBatchTimestamp + 250L;
        this.produceMessages(secondBatchTimestamp);
        long thirdBatchTimestamp = firstBatchTimestamp + 500L - 100L;
        this.produceMessages(thirdBatchTimestamp);
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)500L);
        this.groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(500L), (Duration)Duration.ofMinutes(5L))).aggregate(this.initializer, this.aggregator, Materialized.with(null, (Serde)Serdes.Integer())).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)Serdes.Integer()));
        this.startStreams();
        List windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer(), (Deserializer)new IntegerDeserializer(), String.class, 30, testInfo);
        String resultFromConsoleConsumer = this.readWindowedKeyedMessagesViaConsoleConsumer((Deserializer)new TimeWindowedDeserializer(), (Deserializer)new IntegerDeserializer(), String.class, 30, true);
        Comparator<KeyValueTimestamp> comparator = Comparator.comparing(o -> (String)((Windowed)o.key()).key()).thenComparingInt(KeyValueTimestamp::value);
        windowedMessages.sort(comparator);
        long firstBatchLeftWindowStart = firstBatchTimestamp - 500L;
        long firstBatchLeftWindowEnd = firstBatchLeftWindowStart + 500L;
        long firstBatchRightWindowStart = firstBatchTimestamp + 1L;
        long firstBatchRightWindowEnd = firstBatchRightWindowStart + 500L;
        long secondBatchLeftWindowStart = secondBatchTimestamp - 500L;
        long secondBatchLeftWindowEnd = secondBatchLeftWindowStart + 500L;
        long secondBatchRightWindowStart = secondBatchTimestamp + 1L;
        long secondBatchRightWindowEnd = secondBatchRightWindowStart + 500L;
        long thirdBatchLeftWindowStart = thirdBatchTimestamp - 500L;
        long thirdBatchLeftWindowEnd = thirdBatchLeftWindowStart + 500L;
        List<KeyValueTimestamp> expectResult = Arrays.asList(new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"A", (Window)new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, firstBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"A", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, secondBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"A", (Window)new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"A", (Window)new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, secondBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"A", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"A", (Window)new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"B", (Window)new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, firstBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"B", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, secondBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"B", (Window)new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"B", (Window)new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, secondBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"B", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"B", (Window)new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"C", (Window)new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, firstBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"C", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, secondBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"C", (Window)new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"C", (Window)new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, secondBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"C", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"C", (Window)new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"D", (Window)new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, firstBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"D", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, secondBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"D", (Window)new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"D", (Window)new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, secondBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"D", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"D", (Window)new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"E", (Window)new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, firstBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"E", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, secondBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"E", (Window)new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"E", (Window)new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, secondBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"E", (Window)new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, thirdBatchTimestamp), new KeyValueTimestamp<Windowed, Integer>(new Windowed((Object)"E", (Window)new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, thirdBatchTimestamp));
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
        HashSet<String> expectResultString = new HashSet<String>(expectResult.size());
        for (KeyValueTimestamp eachRecord : expectResult) {
            expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + eachRecord.key() + ", " + eachRecord.value());
        }
        for (String record : allRecords = resultFromConsoleConsumer.split("\n")) {
            Assertions.assertTrue((boolean)expectResultString.contains(record));
        }
    }

    @Test
    public void shouldCountSessionWindows() throws Exception {
        long sessionGap = 300000L;
        List t1Messages = Arrays.asList(new KeyValue((Object)"bob", (Object)"start"), new KeyValue((Object)"penny", (Object)"start"), new KeyValue((Object)"jo", (Object)"pause"), new KeyValue((Object)"emily", (Object)"pause"));
        long t1 = this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, t1Messages, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t1);
        long t2 = t1 + 150000L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue((Object)"emily", (Object)"resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t2);
        long t3 = t1 + 300000L + 1L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"pause"), new KeyValue((Object)"penny", (Object)"stop")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t3);
        long t4 = t3 + 150000L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"resume"), new KeyValue((Object)"jo", (Object)"resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t4);
        long t5 = t4 - 1L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue((Object)"jo", (Object)"late")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t5);
        final HashMap results = new HashMap();
        final CountDownLatch latch = new CountDownLatch(13);
        this.builder.stream(this.userSessionsStream, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(300000L))).count().toStream().transform(() -> new Transformer<Windowed<String>, Long, KeyValue<Object, Object>>(){
            private ProcessorContext context;

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public KeyValue<Object, Object> transform(Windowed<String> key, Long value) {
                results.put(key, KeyValue.pair((Object)value, (Object)this.context.timestamp()));
                latch.countDown();
                return null;
            }

            public void close() {
            }
        }, new String[0]);
        this.startStreams();
        latch.await(30L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(results.get(new Windowed((Object)"bob", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t1)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"penny", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t1)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"jo", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t1)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"jo", (Window)new SessionWindow(t5, t4))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)2L, (Object)t4)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"emily", (Window)new SessionWindow(t1, t2))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)2L, (Object)t2)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"bob", (Window)new SessionWindow(t3, t4))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)2L, (Object)t4)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"penny", (Window)new SessionWindow(t3, t3))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t3)));
    }

    @Test
    public void shouldReduceSessionWindows() throws Exception {
        long sessionGap = 1000L;
        List t1Messages = Arrays.asList(new KeyValue((Object)"bob", (Object)"start"), new KeyValue((Object)"penny", (Object)"start"), new KeyValue((Object)"jo", (Object)"pause"), new KeyValue((Object)"emily", (Object)"pause"));
        long t1 = this.mockTime.milliseconds();
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, t1Messages, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t1);
        long t2 = t1 + 500L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue((Object)"emily", (Object)"resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t2);
        long t3 = t1 + 1000L + 1L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"pause"), new KeyValue((Object)"penny", (Object)"stop")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t3);
        long t4 = t3 + 500L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"resume"), new KeyValue((Object)"jo", (Object)"resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t4);
        long t5 = t4 - 1L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue((Object)"jo", (Object)"late")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t5);
        final HashMap results = new HashMap();
        final CountDownLatch latch = new CountDownLatch(13);
        String userSessionsStore = "UserSessionsStore";
        this.builder.stream(this.userSessionsStream, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(1000L))).reduce((value1, value2) -> value1 + ":" + value2, Materialized.as((String)"UserSessionsStore")).toStream().transform(() -> new Transformer<Windowed<String>, String, KeyValue<Object, Object>>(){
            private ProcessorContext context;

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public KeyValue<Object, Object> transform(Windowed<String> key, String value) {
                results.put(key, KeyValue.pair((Object)value, (Object)this.context.timestamp()));
                latch.countDown();
                return null;
            }

            public void close() {
            }
        }, new String[0]);
        this.startStreams();
        latch.await(30L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(results.get(new Windowed((Object)"bob", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)"start", (Object)t1)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"penny", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)"start", (Object)t1)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"jo", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)"pause", (Object)t1)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"jo", (Window)new SessionWindow(t5, t4))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)"resume:late", (Object)t4)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"emily", (Window)new SessionWindow(t1, t2))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)"pause:resume", (Object)t2)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"bob", (Window)new SessionWindow(t3, t4))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)"pause:resume", (Object)t4)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"penny", (Window)new SessionWindow(t3, t3))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)"stop", (Object)t3)));
        ReadOnlySessionStore sessionStore = (ReadOnlySessionStore)IntegrationTestUtils.getStore("UserSessionsStore", this.kafkaStreams, QueryableStoreTypes.sessionStore());
        try (KeyValueIterator bob = sessionStore.fetch((Object)"bob");){
            MatcherAssert.assertThat((Object)bob.next(), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)new Windowed((Object)"bob", (Window)new SessionWindow(t1, t1)), (Object)"start")));
            MatcherAssert.assertThat((Object)bob.next(), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)new Windowed((Object)"bob", (Window)new SessionWindow(t3, t4)), (Object)"pause:resume")));
            Assertions.assertFalse((boolean)bob.hasNext());
        }
    }

    @Test
    public void shouldCountUnlimitedWindows() throws Exception {
        long startTime = this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS) + 1L;
        long incrementTime = Duration.ofDays(1L).toMillis();
        long t1 = this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS);
        List t1Messages = Arrays.asList(new KeyValue((Object)"bob", (Object)"start"), new KeyValue((Object)"penny", (Object)"start"), new KeyValue((Object)"jo", (Object)"pause"), new KeyValue((Object)"emily", (Object)"pause"));
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, t1Messages, producerConfig, t1);
        long t2 = t1 + incrementTime;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue((Object)"emily", (Object)"resume")), producerConfig, t2);
        long t3 = t2 + incrementTime;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"pause"), new KeyValue((Object)"penny", (Object)"stop")), producerConfig, t3);
        long t4 = t3 + incrementTime;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"resume"), new KeyValue((Object)"jo", (Object)"resume")), producerConfig, t4);
        final HashMap results = new HashMap();
        final CountDownLatch latch = new CountDownLatch(5);
        this.builder.stream(this.userSessionsStream, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)UnlimitedWindows.of().startOn(Instant.ofEpochMilli(startTime))).count().toStream().transform(() -> new Transformer<Windowed<String>, Long, KeyValue<Object, Object>>(){
            private ProcessorContext context;

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public KeyValue<Object, Object> transform(Windowed<String> key, Long value) {
                results.put(key, KeyValue.pair((Object)value, (Object)this.context.timestamp()));
                latch.countDown();
                return null;
            }

            public void close() {
            }
        }, new String[0]);
        this.startStreams();
        Assertions.assertTrue((boolean)latch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"bob", (Window)new UnlimitedWindow(startTime))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)2L, (Object)t4)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"penny", (Window)new UnlimitedWindow(startTime))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t3)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"jo", (Window)new UnlimitedWindow(startTime))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t4)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"emily", (Window)new UnlimitedWindow(startTime))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t2)));
    }

    private void produceMessages(long timestamp) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.streamOneInput, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"), new KeyValue((Object)3, (Object)"C"), new KeyValue((Object)4, (Object)"D"), new KeyValue((Object)5, (Object)"E")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    private void createTopics(String safeTestName) throws InterruptedException {
        this.streamOneInput = "stream-one-" + safeTestName;
        this.outputTopic = "output-" + safeTestName;
        this.userSessionsStream = "user-sessions-" + safeTestName;
        CLUSTER.createTopic(this.streamOneInput, 3, 1);
        CLUSTER.createTopics(this.userSessionsStream, this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int numMessages, TestInfo testInfo) throws Exception {
        return this.receiveMessages(keyDeserializer, valueDeserializer, null, numMessages, testInfo);
    }

    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Class innerClass, int numMessages, TestInfo testInfo) throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "group-" + safeTestName);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keyDeserializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        consumerProperties.put("window.size.ms", (Object)500L);
        if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
            consumerProperties.setProperty("windowed.inner.class.serde", Serdes.serdeFrom((Class)innerClass).getClass().getName());
        }
        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(consumerProperties, this.outputTopic, numMessages, 60000L);
    }

    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Class innerClass, int numMessages, TestInfo testInfo) throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "group-" + safeTestName);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keyDeserializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        consumerProperties.put("window.size.ms", (Object)500L);
        if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
            consumerProperties.setProperty("windowed.inner.class.serde", Serdes.serdeFrom((Class)innerClass).getClass().getName());
        }
        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(consumerProperties, this.outputTopic, numMessages, 60000L);
    }

    private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Class innerClass, int numMessages, boolean printTimestamp) {
        ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
        PrintStream originalStream = System.out;
        try (PrintStream newStream = new PrintStream(newConsole);){
            System.setOut(newStream);
            String keySeparator = ", ";
            String[] args = new String[]{"--bootstrap-server", CLUSTER.bootstrapServers(), "--from-beginning", "--property", "print.key=true", "--property", "print.timestamp=" + printTimestamp, "--topic", this.outputTopic, "--max-messages", String.valueOf(numMessages), "--property", "key.deserializer=" + keyDeserializer.getClass().getName(), "--property", "value.deserializer=" + valueDeserializer.getClass().getName(), "--property", "key.separator=, ", "--property", "key.deserializer.windowed.inner.class.serde=" + Serdes.serdeFrom((Class)innerClass).getClass().getName(), "--property", "key.deserializer.window.size.ms=500"};
            ConsoleConsumer.messageCount_$eq((int)0);
            ConsoleConsumer.run((ConsoleConsumer.ConsumerConfig)new ConsoleConsumer.ConsumerConfig(args));
            newStream.flush();
            System.setOut(originalStream);
            String string = newConsole.toString();
            return string;
        }
    }
}

