/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Category(value={IntegrationTest.class})
@RunWith(value=Parameterized.class)
public class TimeWindowedKStreamIntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"log.retention.hours", (Object)"-1"), Utils.mkEntry((Object)"log.retention.bytes", (Object)"-1")})));
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String streamTwoInput;
    private String outputTopic;
    @Rule
    public TestName testName = new TestName();
    @Parameterized.Parameter
    public EmitStrategy.StrategyType type;
    @Parameterized.Parameter(value=1)
    public boolean withCache;
    @Parameterized.Parameter(value=2)
    public EmitStrategy emitStrategy;
    private boolean emitFinal;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Parameterized.Parameters(name="{0}_{1}")
    public static Collection<Object[]> getEmitStrategy() {
        return Arrays.asList({EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true, EmitStrategy.onWindowUpdate()}, {EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false, EmitStrategy.onWindowUpdate()}, {EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true, EmitStrategy.onWindowClose()}, {EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false, EmitStrategy.onWindowClose()});
    }

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfiguration.put("application.id", "app-" + safeTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("statestore.cache.max.bytes", (Object)0);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100L);
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("__emit.interval.ms.kstreams.windowed.aggregation__", (Object)0);
        this.streamsConfiguration.put("windowstore.changelog.additional.retention.ms", (Object)Long.MAX_VALUE);
        this.emitFinal = this.emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE;
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
            this.kafkaStreams.cleanUp();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldAggregateWindowedWithNoGrace() throws Exception {
        this.produceMessages(this.streamOneInput, new KeyValueTimestamp<String, String>("A", "1", 0L), new KeyValueTimestamp<String, String>("A", "2", 5L), new KeyValueTimestamp<String, String>("A", "3", 10L), new KeyValueTimestamp<String, String>("B", "4", 6L), new KeyValueTimestamp<String, String>("B", "5", 11L), new KeyValueTimestamp<String, String>("B", "6", 15L), new KeyValueTimestamp<String, String>("C", "7", 25L));
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)10L);
        this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey().windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.getMaterialized()).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)new Serdes.StringSerde()));
        this.startStreams();
        List windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(10L)), (Deserializer)new StringDeserializer(), 10L, String.class, this.emitFinal ? 4 : 12);
        List<KeyValueTimestamp> expectResult = this.emitFinal ? Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+2+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+4+5", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(15L, 25L)), "0+6", 15L)) : Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+2+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(10L, 20L)), "0+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+4", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+4+5", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+5", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+5+6", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(15L, 25L)), "0+6", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(20L, 30L)), "0+7", 25L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(25L, 35L)), "0+7", 25L));
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
    }

    @Test
    public void shouldAggregateWindowedWithGrace() throws Exception {
        this.produceMessages(this.streamOneInput, new KeyValueTimestamp<String, String>("A", "1", 0L), new KeyValueTimestamp<String, String>("A", "2", 5L), new KeyValueTimestamp<String, String>("A", "3", 10L), new KeyValueTimestamp<String, String>("B", "4", 6L), new KeyValueTimestamp<String, String>("B", "5", 11L), new KeyValueTimestamp<String, String>("B", "6", 15L), new KeyValueTimestamp<String, String>("C", "7", 25L));
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)10L);
        this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey().windowedBy((Windows)TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(5L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.getMaterialized()).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)new Serdes.StringSerde()));
        this.startStreams();
        List windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(10L)), (Deserializer)new StringDeserializer(), 10L, String.class, this.emitFinal ? 4 : 13);
        List<KeyValueTimestamp> expectResult = this.emitFinal ? Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+4", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(10L, 20L)), "0+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+5+6", 15L)) : Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+2", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+2+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(10L, 20L)), "0+3", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+4", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+4", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+4+5", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+5", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+5+6", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(15L, 25L)), "0+6", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(20L, 30L)), "0+7", 25L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(25L, 35L)), "0+7", 25L));
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
    }

    @Test
    public void shouldRestoreAfterJoinRestart() throws Exception {
        this.produceMessages(this.streamOneInput, new KeyValueTimestamp<String, String>("A", "L1", 0L), new KeyValueTimestamp<String, String>("A", "L1", 5L), new KeyValueTimestamp<String, String>("B", "L2", 11L), new KeyValueTimestamp<String, String>("B", "L2", 15L), new KeyValueTimestamp<String, String>("C", "L3", 25L));
        this.produceMessages(this.streamTwoInput, new KeyValueTimestamp<String, String>("A", "R1", 0L), new KeyValueTimestamp<String, String>("A", "R1", 5L), new KeyValueTimestamp<String, String>("B", "R2", 11L), new KeyValueTimestamp<String, String>("B", "R2", 15L), new KeyValueTimestamp<String, String>("C", "R3", 25L));
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)10L);
        KStream streamOne = this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream streamTwo = this.builder.stream(this.streamTwoInput, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream joinedStream = streamOne.join(streamTwo, (v1, v2) -> v1 + "," + v2, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(2L)));
        joinedStream.groupByKey().windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.getMaterialized()).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)new Serdes.StringSerde()));
        this.startStreams();
        List windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(10L)), (Deserializer)new StringDeserializer(), 10L, String.class, this.emitFinal ? 4 : 9);
        List<KeyValueTimestamp> expectResult = this.emitFinal ? Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+L1,R1+L1,R1", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+L1,R1", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+L2,R2", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(15L, 25L)), "0+L2,R2", 15L)) : Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+L1,R1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+L1,R1+L1,R1", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+L1,R1", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+L2,R2", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+L2,R2", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+L2,R2+L2,R2", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(15L, 25L)), "0+L2,R2", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(20L, 30L)), "0+L3,R3", 25L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(25L, 35L)), "0+L3,R3", 25L));
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        this.produceMessages(this.streamOneInput, new KeyValueTimestamp<String, String>("C", "L3", 35L));
        this.produceMessages(this.streamTwoInput, new KeyValueTimestamp<String, String>("C", "R3", 35L));
        this.startStreams();
        if (this.emitFinal) {
            windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(10L)), (Deserializer)new StringDeserializer(), 10L, String.class, 1);
            expectResult = Collections.singletonList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(25L, 35L)), "0+L3,R3", 25L));
        } else {
            windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(10L)), (Deserializer)new StringDeserializer(), 10L, String.class, 2);
            expectResult = Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(30L, 40L)), "0+L3,R3", 35L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(35L, 45L)), "0+L3,R3", 35L));
        }
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
    }

    @Test
    public void shouldThrowUnlimitedWindows() {
        TimeWindowedKStream windowedStream = this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey().windowedBy((Windows)UnlimitedWindows.of().startOn(Instant.ofEpochMilli(0L)));
        if (this.emitFinal) {
            Assert.assertThrows(IllegalArgumentException.class, () -> windowedStream.emitStrategy(this.emitStrategy));
        } else {
            windowedStream.emitStrategy(this.emitStrategy);
        }
    }

    private void produceMessages(String topic, KeyValueTimestamp<String, String> ... records) {
        IntegrationTestUtils.produceSynchronously(TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), false, topic, Optional.empty(), Arrays.asList(records));
    }

    private Materialized getMaterialized() {
        if (this.withCache) {
            return Materialized.with(null, (Serde)new Serdes.StringSerde()).withCachingEnabled();
        }
        return Materialized.with(null, (Serde)new Serdes.StringSerde()).withCachingDisabled();
    }

    private void createTopics() throws InterruptedException {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamOneInput = "stream-one-" + safeTestName;
        this.streamTwoInput = "stream-two-" + safeTestName;
        this.outputTopic = "output-" + safeTestName;
        CLUSTER.createTopic(this.streamOneInput, 1, 1);
        CLUSTER.createTopic(this.streamTwoInput, 1, 1);
        CLUSTER.createTopic(this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, long windowSize, Class innerClass, int numMessages) throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "group-" + safeTestName);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keyDeserializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        consumerProperties.put("window.size.ms", (Object)windowSize);
        if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
            consumerProperties.setProperty("windowed.inner.class.serde", Serdes.serdeFrom((Class)innerClass).getClass().getName());
        }
        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(consumerProperties, this.outputTopic, numMessages, 60000L);
    }
}

