/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Category(value={IntegrationTest.class})
@RunWith(value=Parameterized.class)
public class SlidingWindowedKStreamIntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"log.retention.hours", (Object)"-1"), Utils.mkEntry((Object)"log.retention.bytes", (Object)"-1")})));
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String streamTwoInput;
    private String outputTopic;
    @Rule
    public TestName testName = new TestName();
    @Parameterized.Parameter
    public EmitStrategy.StrategyType type;
    @Parameterized.Parameter(value=1)
    public boolean withCache;
    private EmitStrategy emitStrategy;
    private boolean emitFinal;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Parameterized.Parameters(name="{0}_cache:{1}")
    public static Collection<Object[]> getEmitStrategy() {
        return Arrays.asList({EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true}, {EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false}, {EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true}, {EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false});
    }

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfiguration.put("application.id", "app-" + safeTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100L);
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("__emit.interval.ms.kstreams.windowed.aggregation__", (Object)0);
        this.streamsConfiguration.put("windowstore.changelog.additional.retention.ms", (Object)Long.MAX_VALUE);
        this.emitStrategy = EmitStrategy.StrategyType.forType((EmitStrategy.StrategyType)this.type);
        this.emitFinal = this.type.equals((Object)EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
            this.kafkaStreams.cleanUp();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldAggregateWindowedWithNoGrace() throws Exception {
        this.produceMessages(this.streamOneInput, new KeyValueTimestamp<String, String>("A", "1", 0L), new KeyValueTimestamp<String, String>("A", "2", 5L), new KeyValueTimestamp<String, String>("A", "3", 10L), new KeyValueTimestamp<String, String>("A", "4", 17L), new KeyValueTimestamp<String, String>("B", "5", 6L), new KeyValueTimestamp<String, String>("B", "6", 11L), new KeyValueTimestamp<String, String>("B", "7", 18L), new KeyValueTimestamp<String, String>("C", "8", 25L));
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)10L);
        this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey().windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(10L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.getMaterialized()).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)new Serdes.StringSerde()));
        this.startStreams();
        List windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(10L)), (Deserializer)new StringDeserializer(), 10L, String.class, this.emitFinal ? 5 : 10);
        List<KeyValueTimestamp> expectResult = this.emitFinal ? Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(1L, 11L)), "0+2+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(6L, 16L)), "0+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(7L, 17L)), "0+3+4", 17L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(8L, 18L)), "0+7", 18L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(11L, 21L)), "0+4", 17L)) : Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(1L, 11L)), "0+2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(1L, 11L)), "0+2+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(6L, 16L)), "0+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(11L, 21L)), "0+4", 17L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(7L, 17L)), "0+3+4", 17L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(8L, 18L)), "0+7", 18L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(15L, 25L)), "0+8", 25L));
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
    }

    @Test
    public void shouldAggregateWindowedWithGrace() throws Exception {
        this.produceMessages(this.streamOneInput, new KeyValueTimestamp<String, String>("A", "1", 0L), new KeyValueTimestamp<String, String>("A", "2", 5L), new KeyValueTimestamp<String, String>("A", "3", 10L), new KeyValueTimestamp<String, String>("A", "4", 6L), new KeyValueTimestamp<String, String>("A", "5", 11L), new KeyValueTimestamp<String, String>("A", "6", 16L), new KeyValueTimestamp<String, String>("A", "7", 27L), new KeyValueTimestamp<String, String>("A", "8", 11L));
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)10L);
        this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey().windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.getMaterialized()).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)new Serdes.StringSerde()));
        this.startStreams();
        List windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(10L)), (Deserializer)new StringDeserializer(), 10L, String.class, this.emitFinal ? 5 : 20);
        List<KeyValueTimestamp> expectResult = this.emitFinal ? Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2+3+4", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(1L, 11L)), "0+2+3+4+5", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(6L, 16L)), "0+3+4+5+6", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(7L, 17L)), "0+3+5+6", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(11L, 21L)), "0+5+6", 16L)) : Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(1L, 11L)), "0+2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(1L, 11L)), "0+2+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(6L, 16L)), "0+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(1L, 11L)), "0+2+3+4", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(6L, 16L)), "0+3+4", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(7L, 17L)), "0+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2+3+4", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(7L, 17L)), "0+3+5", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(6L, 16L)), "0+3+4+5", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(1L, 11L)), "0+2+3+4+5", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(11L, 21L)), "0+5", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(11L, 21L)), "0+5+6", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(7L, 17L)), "0+3+5+6", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(6L, 16L)), "0+3+4+5+6", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(12L, 22L)), "0+6", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(17L, 27L)), "0+7", 27L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(17L, 27L)), "0+7", 27L));
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
    }

    @Test
    public void shouldRestoreAfterJoinRestart() throws Exception {
        this.produceMessages(this.streamOneInput, new KeyValueTimestamp<String, String>("A", "L1", 0L), new KeyValueTimestamp<String, String>("A", "L2", 5L), new KeyValueTimestamp<String, String>("B", "L3", 11L), new KeyValueTimestamp<String, String>("B", "L4", 15L), new KeyValueTimestamp<String, String>("C", "L5", 25L));
        this.produceMessages(this.streamTwoInput, new KeyValueTimestamp<String, String>("A", "R1", 0L), new KeyValueTimestamp<String, String>("A", "R2", 5L), new KeyValueTimestamp<String, String>("B", "R3", 11L), new KeyValueTimestamp<String, String>("B", "R4", 15L), new KeyValueTimestamp<String, String>("C", "R5", 25L));
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)10L);
        KStream streamOne = this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream streamTwo = this.builder.stream(this.streamTwoInput, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream joinedStream = streamOne.join(streamTwo, (v1, v2) -> v1 + "," + v2, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(2L)));
        joinedStream.groupByKey().windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(10L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.getMaterialized()).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)new Serdes.StringSerde()));
        this.startStreams();
        List windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(10L)), (Deserializer)new StringDeserializer(), 10L, String.class, this.emitFinal ? 5 : 7);
        List<KeyValueTimestamp> expectResult = this.emitFinal ? Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+L1,R1+L2,R2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(1L, 11L)), "0+L2,R2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(1L, 11L)), "0+L3,R3", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+L3,R3+L4,R4", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(12L, 22L)), "0+L4,R4", 15L)) : Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+L1,R1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(1L, 11L)), "0+L2,R2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+L1,R1+L2,R2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(1L, 11L)), "0+L3,R3", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(12L, 22L)), "0+L4,R4", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+L3,R3+L4,R4", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(15L, 25L)), "0+L5,R5", 25L));
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        this.produceMessages(this.streamOneInput, new KeyValueTimestamp<String, String>("C", "L6", 35L));
        this.produceMessages(this.streamTwoInput, new KeyValueTimestamp<String, String>("C", "R6", 35L));
        this.startStreams();
        windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(10L)), (Deserializer)new StringDeserializer(), 10L, String.class, this.emitFinal ? 1 : 2);
        expectResult = this.emitFinal ? Collections.singletonList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(15L, 25L)), "0+L5,R5", 25L)) : Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(26L, 36L)), "0+L6,R6", 35L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(25L, 35L)), "0+L5,R5+L6,R6", 35L));
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
    }

    private void produceMessages(String topic, KeyValueTimestamp<String, String> ... records) {
        IntegrationTestUtils.produceSynchronously(TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), false, topic, Optional.empty(), Arrays.asList(records));
    }

    private Materialized getMaterialized() {
        if (this.withCache) {
            return Materialized.with(null, (Serde)new Serdes.StringSerde()).withCachingEnabled();
        }
        return Materialized.with(null, (Serde)new Serdes.StringSerde()).withCachingDisabled();
    }

    private void createTopics() throws InterruptedException {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamOneInput = "stream-one-" + safeTestName;
        this.streamTwoInput = "stream-two-" + safeTestName;
        this.outputTopic = "output-" + safeTestName;
        CLUSTER.createTopic(this.streamOneInput, 1, 1);
        CLUSTER.createTopic(this.streamTwoInput, 1, 1);
        CLUSTER.createTopic(this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, long windowSize, Class innerClass, int numMessages) throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "group-" + safeTestName);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keyDeserializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        consumerProperties.put("window.size.ms", (Object)windowSize);
        if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
            consumerProperties.setProperty("windowed.inner.class.serde", Serdes.serdeFrom((Class)innerClass).getClass().getName());
        }
        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(consumerProperties, this.outputTopic, numMessages, 60000L);
    }
}

