/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={IntegrationTest.class})
public class WindowedChangelogRetentionIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final Duration DEFAULT_RETENTION = Duration.ofDays(1L);
    private static final String STREAM_ONE_INPUT = "stream-one";
    private static final String STREAM_TWO_INPUT = "stream-two";
    private static final String OUTPUT_TOPIC = "output";
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private KGroupedStream<String, String> groupedStream;
    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void createTopics() throws InterruptedException {
        CLUSTER.createTopic(STREAM_ONE_INPUT, 3, 1);
        CLUSTER.createTopic(STREAM_TWO_INPUT, 3, 1);
        CLUSTER.createTopics(OUTPUT_TOPIC);
    }

    @Before
    public void before() {
        this.builder = new StreamsBuilder();
        this.streamsConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfiguration.put("application.id", "app-" + safeTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100);
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        KeyValueMapper mapper = MockMapper.selectValueMapper();
        KStream stream = this.builder.stream(STREAM_ONE_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.groupedStream = stream.groupBy(mapper, Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
    }

    @After
    public void whenShuttingDown() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void timeWindowedChangelogShouldHaveRetentionOfWindowSizeIfWindowSizeLargerThanDefaultRetention() throws Exception {
        Duration windowSize;
        Duration expectedRetention = windowSize = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        this.runAndVerifyTimeWindows((Windows<TimeWindow>)TimeWindows.of((Duration)windowSize), null, expectedRetention);
    }

    @Test
    public void timeWindowedChangelogShouldHaveDefaultRetentionIfWindowSizeLessThanDefaultRetention() throws Exception {
        Duration windowSize = DEFAULT_RETENTION.minus(Duration.ofHours(1L));
        Duration expectedRetention = DEFAULT_RETENTION;
        this.runAndVerifyTimeWindows((Windows<TimeWindow>)TimeWindows.of((Duration)windowSize), null, expectedRetention);
    }

    @Test
    public void timeWindowedChangelogShouldHaveRetentionOfWindowSizePlusGraceIfWindowSizePlusGraceLargerThanDefaultRetention() throws Exception {
        Duration windowSize = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        Duration grace = Duration.ofHours(12L);
        Duration expectedRetention = windowSize.plus(grace);
        this.runAndVerifyTimeWindows((Windows<TimeWindow>)TimeWindows.of((Duration)windowSize).grace(grace), null, expectedRetention);
    }

    @Test
    public void timeWindowedChangelogShouldHaveDefaultRetentionIfWindowSizePlusGraceLessThanDefaultRetention() throws Exception {
        Duration grace = Duration.ofMillis(1000L);
        Duration windowSize = DEFAULT_RETENTION.minus(grace).minus(Duration.ofMillis(500L));
        Duration expectedRetention = DEFAULT_RETENTION;
        this.runAndVerifyTimeWindows((Windows<TimeWindow>)TimeWindows.of((Duration)windowSize).grace(grace), null, expectedRetention);
    }

    @Test
    public void timeWindowedChangelogShouldHaveUserSpecifiedRetentionIfUserSpecifiedRetentionEvenIfLessThanDefaultRetention() throws Exception {
        Duration userSpecifiedRetention;
        Duration grace = Duration.ofHours(6L);
        Duration windowSize = DEFAULT_RETENTION.minus(grace).minus(Duration.ofHours(1L));
        Duration expectedRetention = userSpecifiedRetention = windowSize.plus(grace).plus(Duration.ofHours(1L));
        this.runAndVerifyTimeWindows((Windows<TimeWindow>)TimeWindows.of((Duration)windowSize).grace(grace), userSpecifiedRetention, expectedRetention);
    }

    private void runAndVerifyTimeWindows(Windows<TimeWindow> window, Duration userSpecifiedRetention, Duration expectedRetention) throws Exception {
        String storeName = "windowed-store";
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, (long)window.size());
        this.groupedStream.windowedBy(window).count(userSpecifiedRetention != null ? Materialized.as((String)"windowed-store").withRetention(userSpecifiedRetention) : Materialized.as((String)"windowed-store")).toStream().to(OUTPUT_TOPIC, Produced.with((Serde)windowedSerde, (Serde)Serdes.Long()));
        this.startStreams();
        this.verifyChangelogRetentionOfWindowedStore("windowed-store", expectedRetention);
    }

    @Test
    public void sessionWindowedChangelogShouldHaveRetentionOfGapIfGapLargerThanDefaultRetention() throws Exception {
        Duration gap;
        Duration expectedRetention = gap = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        this.runAndVerifySessionWindows(SessionWindows.with((Duration)gap), null, expectedRetention);
    }

    @Test
    public void sessionWindowedChangelogShouldHaveDefaultRetentionIfGapLessThanDefaultRetention() throws Exception {
        Duration gap = DEFAULT_RETENTION.minus(Duration.ofHours(1L));
        Duration expectedRetention = DEFAULT_RETENTION;
        this.runAndVerifySessionWindows(SessionWindows.with((Duration)gap), null, expectedRetention);
    }

    @Test
    public void sessionWindowedChangelogShouldHaveDefaultRetentionIfGapPlusGraceLessThanDefaultRetention() throws Exception {
        Duration grace = Duration.ofHours(1L);
        Duration gap = DEFAULT_RETENTION.minus(grace).minus(Duration.ofHours(1L));
        Duration expectedRetention = DEFAULT_RETENTION;
        this.runAndVerifySessionWindows(SessionWindows.with((Duration)gap).grace(grace), null, expectedRetention);
    }

    @Test
    public void sessionWindowedChangelogShouldHaveUserSpecifiedRetentionIfUserSpecifiedRetentionEvenIfLessThanDefaultRetention() throws Exception {
        Duration userSpecifiedRetention;
        Duration grace = Duration.ofHours(6L);
        Duration gap = DEFAULT_RETENTION.minus(grace).minus(Duration.ofHours(1L));
        Duration expectedRetention = userSpecifiedRetention = gap.plus(grace).plus(Duration.ofHours(1L));
        this.runAndVerifySessionWindows(SessionWindows.with((Duration)gap).grace(grace), userSpecifiedRetention, expectedRetention);
    }

    private void runAndVerifySessionWindows(SessionWindows window, Duration userSpecifiedRetention, Duration expectedRetention) throws Exception {
        String storeName = "windowed-store";
        Serde windowedSerde = WindowedSerdes.sessionWindowedSerdeFrom(String.class);
        this.groupedStream.windowedBy(window).count(userSpecifiedRetention != null ? Materialized.as((String)"windowed-store").withRetention(userSpecifiedRetention) : Materialized.as((String)"windowed-store")).toStream().to(OUTPUT_TOPIC, Produced.with((Serde)windowedSerde, (Serde)Serdes.Long()));
        this.startStreams();
        this.verifyChangelogRetentionOfWindowedStore("windowed-store", expectedRetention);
    }

    @Test
    public void joinWindowedChangelogShouldHaveRetentionOfDoubleWindowSizeIfWindowSizeLargerThanDefaultRetention() throws Exception {
        Duration windowSize = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        Duration expectedRetention = windowSize.multipliedBy(2L);
        this.runAndVerifyJoinWindows(JoinWindows.of((Duration)windowSize), expectedRetention);
    }

    @Test
    public void joinWindowedChangelogShouldHaveDefaultRetentionIfDoubleWindowSizeLessThanDefaultRetention() throws Exception {
        Duration windowSize = DEFAULT_RETENTION.dividedBy(2L).minus(Duration.ofHours(1L));
        Duration expectedRetention = DEFAULT_RETENTION;
        this.runAndVerifyJoinWindows(JoinWindows.of((Duration)windowSize), expectedRetention);
    }

    @Test
    public void joinWindowedChangelogShouldHaveRetentionOfDoubleWindowSizePlusGraceIfDoubleWindowSizePlusGraceLessThanDefaultRetention() throws Exception {
        Duration grace = Duration.ofHours(3L);
        Duration windowSize = DEFAULT_RETENTION.dividedBy(2L).minus(grace).minus(Duration.ofHours(1L));
        Duration expectedRetention = windowSize.multipliedBy(2L).plus(grace);
        this.runAndVerifyJoinWindows(JoinWindows.of((Duration)windowSize).grace(grace), expectedRetention);
    }

    @Test
    public void joinWindowedChangelogShouldHaveRetentionOfDoubleWindowSizePlusGraceIfDoubleWindowSizePlusGraceGreaterThanDefaultRetention() throws Exception {
        Duration windowSize = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        Duration grace = Duration.ofHours(3L);
        Duration expectedRetention = windowSize.multipliedBy(2L).plus(grace);
        this.runAndVerifyJoinWindows(JoinWindows.of((Duration)windowSize).grace(grace), expectedRetention);
    }

    @Test
    public void joinWindowedChangelogShouldHaveRetentionOfBeforePlusAfterPlusGraceIfBeforePlusAfterPlusGraceGreaterThanDefaultRetention() throws Exception {
        Duration before = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        Duration after = DEFAULT_RETENTION.minus(Duration.ofHours(1L));
        Duration grace = Duration.ofHours(3L);
        Duration expectedRetention = before.plus(after).plus(grace);
        this.runAndVerifyJoinWindows(JoinWindows.of((Duration)before).after(after).grace(grace), expectedRetention);
    }

    @Test
    public void joinWindowedChangelogShouldHaveRetentionOfBeforePlusAfterPlusGraceIfBeforePlusAfterePlusGraceLessThanDefaultRetention() throws Exception {
        Duration grace = Duration.ofHours(3L);
        Duration before = DEFAULT_RETENTION.dividedBy(2L).minus(grace).minus(Duration.ofHours(1L));
        Duration after = DEFAULT_RETENTION.dividedBy(2L).minus(grace).minus(Duration.ofHours(4L));
        Duration expectedRetention = before.plus(after).plus(grace);
        this.runAndVerifyJoinWindows(JoinWindows.of((Duration)before).after(after).grace(grace), expectedRetention);
    }

    private void runAndVerifyJoinWindows(JoinWindows window, Duration expectedRetention) throws Exception {
        String joinName = "testjoin";
        String thisStoreName = "testjoin-this-join-store";
        String otherStoreName = "testjoin-other-join-store";
        KStream stream1 = this.builder.stream(STREAM_ONE_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        KStream stream2 = this.builder.stream(STREAM_TWO_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        stream1.join(stream2, (left, right) -> left, window, StreamJoined.as((String)"testjoin")).to(OUTPUT_TOPIC, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.startStreams();
        this.verifyChangelogRetentionOfWindowedStore("testjoin-this-join-store", expectedRetention);
        this.verifyChangelogRetentionOfWindowedStore("testjoin-other-join-store", expectedRetention);
    }

    private void startStreams() throws Exception {
        Topology topology = this.builder.build();
        System.out.println(topology.describe().toString());
        this.kafkaStreams = new KafkaStreams(topology, this.streamsConfiguration);
        this.kafkaStreams.start();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.RUNNING, Duration.ofSeconds(30L));
    }

    private void verifyChangelogRetentionOfWindowedStore(String storeName, Duration retention) {
        Duration windowStoreChangelogAdditionalRetention = Duration.ofDays(1L);
        Properties logConfig = CLUSTER.getLogConfig(this.streamsConfiguration.getProperty("application.id") + "-" + storeName + "-changelog");
        MatcherAssert.assertThat((Object)Long.parseLong(logConfig.getProperty("retention.ms")), (Matcher)Is.is((Object)(retention.toMillis() + windowStoreChangelogAdditionalRetention.toMillis())));
    }
}

