/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.KeyValueSegments;
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.WindowBytesStoreTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

public class RocksDBWindowStoreTest
extends WindowBytesStoreTest {
    private static final String STORE_NAME = "rocksDB window store";
    private static final String METRICS_SCOPE = "test-state-id";
    private final KeyValueSegments segments = new KeyValueSegments("rocksDB window store", "test-state-id", 120000L, 60000L);

    @Override
    <K, V> WindowStore<K, V> buildWindowStore(long retentionPeriod, long windowSize, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) {
        return (WindowStore)Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(retentionPeriod), (Duration)Duration.ofMillis(windowSize), (boolean)retainDuplicates), keySerde, valueSerde).build();
    }

    @Override
    String getMetricsScope() {
        return new RocksDbWindowBytesStoreSupplier(null, 0L, 0L, 0L, false, false).metricsScope();
    }

    @Override
    void setClassLoggerToDebug() {
        LogCaptureAppender.setClassLoggerToDebug(AbstractRocksDBSegmentedBytesStore.class);
    }

    @Test
    public void shouldOnlyIterateOpenSegments() {
        long currentTime = 0L;
        this.setCurrentTime(currentTime);
        this.windowStore.put((Object)1, (Object)"one");
        this.setCurrentTime(currentTime += 60000L);
        this.windowStore.put((Object)1, (Object)"two");
        this.setCurrentTime(currentTime += 60000L);
        this.windowStore.put((Object)1, (Object)"three");
        WindowStoreIterator iterator = this.windowStore.fetch((Object)1, 0L, currentTime);
        this.setCurrentTime(currentTime += 60000L);
        this.windowStore.put((Object)1, (Object)"four");
        Assert.assertEquals((Object)new KeyValue((Object)60000L, (Object)"two"), (Object)iterator.next());
        Assert.assertEquals((Object)new KeyValue((Object)120000L, (Object)"three"), (Object)iterator.next());
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void testRolling() {
        long startTime = 120000L;
        long increment = 30000L;
        this.setCurrentTime(120000L);
        this.windowStore.put((Object)0, (Object)"zero");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(2L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(150000L);
        this.windowStore.put((Object)1, (Object)"one");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(2L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(180000L);
        this.windowStore.put((Object)2, (Object)"two");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(2L), this.segments.segmentName(3L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(240000L);
        this.windowStore.put((Object)4, (Object)"four");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(2L), this.segments.segmentName(3L), this.segments.segmentName(4L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(270000L);
        this.windowStore.put((Object)5, (Object)"five");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(2L), this.segments.segmentName(3L), this.segments.segmentName(4L)}), this.segmentDirs(this.baseDir));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("zero")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("one")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        this.setCurrentTime(300000L);
        this.windowStore.put((Object)6, (Object)"six");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(3L), this.segments.segmentName(4L), this.segments.segmentName(5L)}), this.segmentDirs(this.baseDir));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("six")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        this.setCurrentTime(330000L);
        this.windowStore.put((Object)7, (Object)"seven");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(3L), this.segments.segmentName(4L), this.segments.segmentName(5L)}), this.segmentDirs(this.baseDir));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("six")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("seven")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        this.setCurrentTime(360000L);
        this.windowStore.put((Object)8, (Object)"eight");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), this.segmentDirs(this.baseDir));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("six")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("seven")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("eight")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        this.windowStore.flush();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), this.segmentDirs(this.baseDir));
    }

    @Test
    public void testSegmentMaintenance() {
        this.windowStore = this.buildWindowStore(120000L, 3L, true, Serdes.Integer(), Serdes.String());
        this.windowStore.init((ProcessorContext)this.context, (StateStore)this.windowStore);
        this.context.setTime(0L);
        this.setCurrentTime(0L);
        this.windowStore.put((Object)0, (Object)"v");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(0L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(59999L);
        this.windowStore.put((Object)0, (Object)"v");
        this.windowStore.put((Object)0, (Object)"v");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(0L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(60000L);
        this.windowStore.put((Object)0, (Object)"v");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(0L), this.segments.segmentName(1L)}), this.segmentDirs(this.baseDir));
        WindowStoreIterator iter = this.windowStore.fetch((Object)0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(240000L));
        int fetchedCount = 0;
        while (iter.hasNext()) {
            iter.next();
            ++fetchedCount;
        }
        Assert.assertEquals((long)4L, (long)fetchedCount);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(0L), this.segments.segmentName(1L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(180000L);
        this.windowStore.put((Object)0, (Object)"v");
        iter = this.windowStore.fetch((Object)0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(240000L));
        fetchedCount = 0;
        while (iter.hasNext()) {
            iter.next();
            ++fetchedCount;
        }
        Assert.assertEquals((long)2L, (long)fetchedCount);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(1L), this.segments.segmentName(3L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(300000L);
        this.windowStore.put((Object)0, (Object)"v");
        iter = this.windowStore.fetch((Object)0, Instant.ofEpochMilli(240000L), Instant.ofEpochMilli(600000L));
        fetchedCount = 0;
        while (iter.hasNext()) {
            iter.next();
            ++fetchedCount;
        }
        Assert.assertEquals((long)1L, (long)fetchedCount);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(3L), this.segments.segmentName(5L)}), this.segmentDirs(this.baseDir));
    }

    @Test
    public void testInitialLoading() {
        File storeDir = new File(this.baseDir, STORE_NAME);
        new File(storeDir, this.segments.segmentName(0L)).mkdir();
        new File(storeDir, this.segments.segmentName(1L)).mkdir();
        new File(storeDir, this.segments.segmentName(2L)).mkdir();
        new File(storeDir, this.segments.segmentName(3L)).mkdir();
        new File(storeDir, this.segments.segmentName(4L)).mkdir();
        new File(storeDir, this.segments.segmentName(5L)).mkdir();
        new File(storeDir, this.segments.segmentName(6L)).mkdir();
        this.windowStore.close();
        this.windowStore = this.buildWindowStore(120000L, 3L, false, Serdes.Integer(), Serdes.String());
        this.windowStore.init((ProcessorContext)this.context, (StateStore)this.windowStore);
        this.windowStore.put((Object)1, (Object)"v", 360000L);
        List<String> expected = Arrays.asList(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L));
        expected.sort(String::compareTo);
        List actual = Utils.toList(this.segmentDirs(this.baseDir).iterator());
        actual.sort(String::compareTo);
        Assert.assertEquals(expected, (Object)actual);
        try (WindowStoreIterator iter = this.windowStore.fetch((Object)0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000000L));){
            while (iter.hasNext()) {
                iter.next();
            }
        }
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), this.segmentDirs(this.baseDir));
    }

    @Test
    public void testRestore() throws Exception {
        long startTime = 120000L;
        long increment = 30000L;
        this.setCurrentTime(120000L);
        this.windowStore.put((Object)0, (Object)"zero");
        this.setCurrentTime(150000L);
        this.windowStore.put((Object)1, (Object)"one");
        this.setCurrentTime(180000L);
        this.windowStore.put((Object)2, (Object)"two");
        this.setCurrentTime(210000L);
        this.windowStore.put((Object)3, (Object)"three");
        this.setCurrentTime(240000L);
        this.windowStore.put((Object)4, (Object)"four");
        this.setCurrentTime(270000L);
        this.windowStore.put((Object)5, (Object)"five");
        this.setCurrentTime(300000L);
        this.windowStore.put((Object)6, (Object)"six");
        this.setCurrentTime(330000L);
        this.windowStore.put((Object)7, (Object)"seven");
        this.setCurrentTime(360000L);
        this.windowStore.put((Object)8, (Object)"eight");
        this.windowStore.flush();
        this.windowStore.close();
        Utils.delete((File)this.baseDir);
        this.windowStore = this.buildWindowStore(120000L, 3L, false, Serdes.Integer(), Serdes.String());
        this.windowStore.init((ProcessorContext)this.context, (StateStore)this.windowStore);
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        ArrayList<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
        for (ProducerRecord<Object, Object> record : this.recordCollector.collected()) {
            changeLog.add((KeyValue<byte[], byte[]>)new KeyValue((Object)((Bytes)record.key()).get(), (Object)((byte[])record.value())));
        }
        this.context.restore(STORE_NAME, changeLog);
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("six")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("seven")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("eight")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        this.windowStore.flush();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), this.segmentDirs(this.baseDir));
    }

    private Set<String> segmentDirs(File baseDir) {
        File windowDir = new File(baseDir, this.windowStore.name());
        return new HashSet<Object>(Arrays.asList((Object[])Objects.requireNonNull(windowDir.list())));
    }
}

