/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.KeyValueSegments;
import org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.SerdeThatDoesntHandleNull;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RocksDBWindowStoreTest {
    private static final long DEFAULT_CACHE_SIZE_BYTES = 0x100000L;
    private final int numSegments = 3;
    private final long windowSize = 3L;
    private final long segmentInterval = 60000L;
    private final long retentionPeriod = 120000L;
    private final String windowName = "window";
    private final KeyValueSegments segments = new KeyValueSegments("window", 120000L, 60000L);
    private final StateSerdes<Integer, String> serdes = new StateSerdes("", Serdes.Integer(), Serdes.String());
    private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
    private final ThreadCache cache = new ThreadCache(new LogContext("TestCache "), 0x100000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
    private final Producer<byte[], byte[]> producer = new MockProducer(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
    private final RecordCollector recordCollector = new RecordCollectorImpl("RocksDBWindowStoreTestTask", new LogContext("RocksDBWindowStoreTestTask "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")){

        public <K1, V1> void send(String topic, K1 key, V1 value, Headers headers, Integer partition, Long timestamp, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
            RocksDBWindowStoreTest.this.changeLog.add(new KeyValue((Object)keySerializer.serialize(topic, headers, key), (Object)valueSerializer.serialize(topic, headers, value)));
        }
    };
    private final File baseDir = TestUtils.tempDirectory((String)"test");
    private final InternalMockProcessorContext context = new InternalMockProcessorContext(this.baseDir, Serdes.ByteArray(), Serdes.ByteArray(), this.recordCollector, this.cache);
    private WindowStore<Integer, String> windowStore;

    private WindowStore<Integer, String> createWindowStore(ProcessorContext context, boolean retainDuplicates) {
        WindowStore store = (WindowStore)Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)"window", (Duration)Duration.ofMillis(120000L), (Duration)Duration.ofMillis(3L), (boolean)retainDuplicates), (Serde)Serdes.Integer(), (Serde)Serdes.String()).build();
        store.init(context, (StateStore)store);
        return store;
    }

    @Before
    public void initRecordCollector() {
        this.recordCollector.init(this.producer);
    }

    @After
    public void closeStore() {
        if (this.windowStore != null) {
            this.windowStore.close();
        }
    }

    @Test
    public void shouldOnlyIterateOpenSegments() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        long currentTime = 0L;
        this.setCurrentTime(currentTime);
        this.windowStore.put((Object)1, (Object)"one");
        this.setCurrentTime(currentTime += 60000L);
        this.windowStore.put((Object)1, (Object)"two");
        this.setCurrentTime(currentTime += 60000L);
        this.windowStore.put((Object)1, (Object)"three");
        WindowStoreIterator iterator = this.windowStore.fetch((Object)1, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(currentTime));
        this.setCurrentTime(currentTime += 60000L);
        this.windowStore.put((Object)1, (Object)"four");
        Assert.assertEquals((Object)new KeyValue((Object)60000L, (Object)"two"), (Object)iterator.next());
        Assert.assertEquals((Object)new KeyValue((Object)120000L, (Object)"three"), (Object)iterator.next());
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    private void setCurrentTime(long currentTime) {
        this.context.setRecordContext(this.createRecordContext(currentTime));
    }

    private ProcessorRecordContext createRecordContext(long time) {
        return new ProcessorRecordContext(time, 0L, 0, "topic", null);
    }

    @Test
    public void testRangeAndSinglePointFetch() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals((Object)"zero", (Object)this.windowStore.fetch((Object)0, 59996L));
        Assert.assertEquals((Object)"one", (Object)this.windowStore.fetch((Object)1, 59997L));
        Assert.assertEquals((Object)"two", (Object)this.windowStore.fetch((Object)2, 59998L));
        Assert.assertEquals((Object)"four", (Object)this.windowStore.fetch((Object)4, 60000L));
        Assert.assertEquals((Object)"five", (Object)this.windowStore.fetch((Object)5, 60001L));
        Assert.assertEquals(Collections.singletonList("zero"), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        this.putSecondBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals((Object)"two+1", (Object)this.windowStore.fetch((Object)2, 59999L));
        Assert.assertEquals((Object)"two+2", (Object)this.windowStore.fetch((Object)2, 60000L));
        Assert.assertEquals((Object)"two+3", (Object)this.windowStore.fetch((Object)2, 60001L));
        Assert.assertEquals((Object)"two+4", (Object)this.windowStore.fetch((Object)2, 60002L));
        Assert.assertEquals((Object)"two+5", (Object)this.windowStore.fetch((Object)2, 60003L));
        Assert.assertEquals((Object)"two+6", (Object)this.windowStore.fetch((Object)2, 60004L));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59991L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(Collections.singletonList("two"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59992L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(Arrays.asList("two", "two+1"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList("two", "two+1", "two+2"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(Arrays.asList("two", "two+1", "two+2", "two+3"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(Arrays.asList("two", "two+1", "two+2", "two+3", "two+4"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(Arrays.asList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(Arrays.asList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Arrays.asList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60005L))));
        Assert.assertEquals(Arrays.asList("two+2", "two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60006L))));
        Assert.assertEquals(Arrays.asList("two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60007L))));
        Assert.assertEquals(Arrays.asList("two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60002L), Instant.ofEpochMilli(60008L))));
        Assert.assertEquals(Arrays.asList("two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60003L), Instant.ofEpochMilli(60009L))));
        Assert.assertEquals(Collections.singletonList("two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60004L), Instant.ofEpochMilli(60010L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60005L), Instant.ofEpochMilli(60011L))));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
        Assert.assertNull(entriesByKey.get(3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void shouldGetAll() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        KeyValue<Windowed<Integer>, String> zero = this.windowedPair(0, "zero", 59996L);
        KeyValue<Windowed<Integer>, String> one = this.windowedPair(1, "one", 59997L);
        KeyValue<Windowed<Integer>, String> two = this.windowedPair(2, "two", 59998L);
        KeyValue<Windowed<Integer>, String> four = this.windowedPair(4, "four", 60000L);
        KeyValue<Windowed<Integer>, String> five = this.windowedPair(5, "five", 60001L);
        Assert.assertEquals(Arrays.asList(zero, one, two, four, five), StreamsTestUtils.toList(this.windowStore.all()));
    }

    @Test
    public void shouldFetchAllInTimeRange() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        KeyValue<Windowed<Integer>, String> zero = this.windowedPair(0, "zero", 59996L);
        KeyValue<Windowed<Integer>, String> one = this.windowedPair(1, "one", 59997L);
        KeyValue<Windowed<Integer>, String> two = this.windowedPair(2, "two", 59998L);
        KeyValue<Windowed<Integer>, String> four = this.windowedPair(4, "four", 60000L);
        KeyValue<Windowed<Integer>, String> five = this.windowedPair(5, "five", 60001L);
        Assert.assertEquals(Arrays.asList(one, two, four), StreamsTestUtils.toList(this.windowStore.fetchAll(Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(Arrays.asList(zero, one, two), StreamsTestUtils.toList(this.windowStore.fetchAll(Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(one, two, four, five), StreamsTestUtils.toList(this.windowStore.fetchAll(Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60001L))));
    }

    @Test
    public void testFetchRange() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        KeyValue<Windowed<Integer>, String> zero = this.windowedPair(0, "zero", 59996L);
        KeyValue<Windowed<Integer>, String> one = this.windowedPair(1, "one", 59997L);
        KeyValue<Windowed<Integer>, String> two = this.windowedPair(2, "two", 59998L);
        KeyValue<Windowed<Integer>, String> four = this.windowedPair(4, "four", 60000L);
        KeyValue<Windowed<Integer>, String> five = this.windowedPair(5, "five", 60001L);
        Assert.assertEquals(Arrays.asList(zero, one), StreamsTestUtils.toList(this.windowStore.fetch((Object)0, (Object)1, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Collections.singletonList(one), StreamsTestUtils.toList(this.windowStore.fetch((Object)1, (Object)1, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(one, two), StreamsTestUtils.toList(this.windowStore.fetch((Object)1, (Object)3, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(zero, one, two), StreamsTestUtils.toList(this.windowStore.fetch((Object)0, (Object)5, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(zero, one, two, four, five), StreamsTestUtils.toList(this.windowStore.fetch((Object)0, (Object)5, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Arrays.asList(two, four, five), StreamsTestUtils.toList(this.windowStore.fetch((Object)0, (Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Collections.emptyList(), StreamsTestUtils.toList(this.windowStore.fetch((Object)4, (Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Collections.emptyList(), StreamsTestUtils.toList(this.windowStore.fetch((Object)0, (Object)3, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60004L))));
    }

    @Test
    public void testPutAndFetchBefore() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(Collections.singletonList("zero"), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59996L))));
        Assert.assertEquals(Collections.singletonList("one"), this.toList(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(Collections.singletonList("two"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Collections.singletonList("four"), this.toList(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(Collections.singletonList("five"), this.toList(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        this.putSecondBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59992L), Instant.ofEpochMilli(59995L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59996L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(Collections.singletonList("two"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(Arrays.asList("two", "two+1"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList("two", "two+1", "two+2"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(Arrays.asList("two", "two+1", "two+2", "two+3"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(Arrays.asList("two+1", "two+2", "two+3", "two+4"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(Arrays.asList("two+2", "two+3", "two+4", "two+5"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(Arrays.asList("two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Arrays.asList("two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60002L), Instant.ofEpochMilli(60005L))));
        Assert.assertEquals(Arrays.asList("two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60003L), Instant.ofEpochMilli(60006L))));
        Assert.assertEquals(Collections.singletonList("two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60004L), Instant.ofEpochMilli(60007L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60005L), Instant.ofEpochMilli(60008L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60006L), Instant.ofEpochMilli(60009L))));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
        Assert.assertNull(entriesByKey.get(3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void testPutAndFetchAfter() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(Collections.singletonList("zero"), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Collections.singletonList("one"), this.toList(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(Collections.singletonList("two"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(Collections.singletonList("four"), this.toList(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(Collections.singletonList("five"), this.toList(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60004L))));
        this.putSecondBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(Collections.singletonList("two"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(Arrays.asList("two", "two+1"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList("two", "two+1", "two+2"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(Arrays.asList("two", "two+1", "two+2", "two+3"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(Arrays.asList("two+1", "two+2", "two+3", "two+4"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(Arrays.asList("two+2", "two+3", "two+4", "two+5"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(Arrays.asList("two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Arrays.asList("two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60002L), Instant.ofEpochMilli(60005L))));
        Assert.assertEquals(Arrays.asList("two+5", "two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60003L), Instant.ofEpochMilli(60006L))));
        Assert.assertEquals(Collections.singletonList("two+6"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60004L), Instant.ofEpochMilli(60007L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60005L), Instant.ofEpochMilli(60008L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60006L), Instant.ofEpochMilli(60009L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60007L), Instant.ofEpochMilli(60010L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60008L), Instant.ofEpochMilli(60011L))));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
        Assert.assertNull(entriesByKey.get(3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void testPutSameKeyTimestamp() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, true);
        long startTime = 59996L;
        this.setCurrentTime(59996L);
        this.windowStore.put((Object)0, (Object)"zero");
        Assert.assertEquals(Collections.singletonList("zero"), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        this.windowStore.put((Object)0, (Object)"zero");
        this.windowStore.put((Object)0, (Object)"zero+");
        this.windowStore.put((Object)0, (Object)"zero++");
        Assert.assertEquals(Arrays.asList("zero", "zero", "zero+", "zero++"), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList("zero", "zero", "zero+", "zero++"), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(Arrays.asList("zero", "zero", "zero+", "zero++"), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(Arrays.asList("zero", "zero", "zero+", "zero++"), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60003L))));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0", "zero@0", "zero+@0", "zero++@0"}), entriesByKey.get(0));
    }

    @Test
    public void testRolling() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        long startTime = 120000L;
        long increment = 30000L;
        this.setCurrentTime(120000L);
        this.windowStore.put((Object)0, (Object)"zero");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(2L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(150000L);
        this.windowStore.put((Object)1, (Object)"one");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(2L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(180000L);
        this.windowStore.put((Object)2, (Object)"two");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(2L), this.segments.segmentName(3L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(240000L);
        this.windowStore.put((Object)4, (Object)"four");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(2L), this.segments.segmentName(3L), this.segments.segmentName(4L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(270000L);
        this.windowStore.put((Object)5, (Object)"five");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(2L), this.segments.segmentName(3L), this.segments.segmentName(4L)}), this.segmentDirs(this.baseDir));
        Assert.assertEquals(Collections.singletonList("zero"), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(Collections.singletonList("one"), this.toList(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(Collections.singletonList("two"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(Collections.singletonList("four"), this.toList(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(Collections.singletonList("five"), this.toList(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        this.setCurrentTime(300000L);
        this.windowStore.put((Object)6, (Object)"six");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(3L), this.segments.segmentName(4L), this.segments.segmentName(5L)}), this.segmentDirs(this.baseDir));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(Collections.singletonList("two"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(Collections.singletonList("four"), this.toList(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(Collections.singletonList("five"), this.toList(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(Collections.singletonList("six"), this.toList(this.windowStore.fetch((Object)6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        this.setCurrentTime(330000L);
        this.windowStore.put((Object)7, (Object)"seven");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(3L), this.segments.segmentName(4L), this.segments.segmentName(5L)}), this.segmentDirs(this.baseDir));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(Collections.singletonList("two"), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(Collections.singletonList("four"), this.toList(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(Collections.singletonList("five"), this.toList(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(Collections.singletonList("six"), this.toList(this.windowStore.fetch((Object)6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(Collections.singletonList("seven"), this.toList(this.windowStore.fetch((Object)7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        this.setCurrentTime(360000L);
        this.windowStore.put((Object)8, (Object)"eight");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), this.segmentDirs(this.baseDir));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(Collections.singletonList("four"), this.toList(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(Collections.singletonList("five"), this.toList(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(Collections.singletonList("six"), this.toList(this.windowStore.fetch((Object)6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(Collections.singletonList("seven"), this.toList(this.windowStore.fetch((Object)7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assert.assertEquals(Collections.singletonList("eight"), this.toList(this.windowStore.fetch((Object)8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        this.windowStore.flush();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), this.segmentDirs(this.baseDir));
    }

    @Test
    public void testRestore() throws Exception {
        long startTime = 120000L;
        long increment = 30000L;
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        this.setCurrentTime(120000L);
        this.windowStore.put((Object)0, (Object)"zero");
        this.setCurrentTime(150000L);
        this.windowStore.put((Object)1, (Object)"one");
        this.setCurrentTime(180000L);
        this.windowStore.put((Object)2, (Object)"two");
        this.setCurrentTime(210000L);
        this.windowStore.put((Object)3, (Object)"three");
        this.setCurrentTime(240000L);
        this.windowStore.put((Object)4, (Object)"four");
        this.setCurrentTime(270000L);
        this.windowStore.put((Object)5, (Object)"five");
        this.setCurrentTime(300000L);
        this.windowStore.put((Object)6, (Object)"six");
        this.setCurrentTime(330000L);
        this.windowStore.put((Object)7, (Object)"seven");
        this.setCurrentTime(360000L);
        this.windowStore.put((Object)8, (Object)"eight");
        this.windowStore.flush();
        this.windowStore.close();
        Utils.delete((File)this.baseDir);
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        this.context.restore("window", this.changeLog);
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(Collections.emptyList(), this.toList(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(Collections.singletonList("four"), this.toList(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(Collections.singletonList("five"), this.toList(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(Collections.singletonList("six"), this.toList(this.windowStore.fetch((Object)6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(Collections.singletonList("seven"), this.toList(this.windowStore.fetch((Object)7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assert.assertEquals(Collections.singletonList("eight"), this.toList(this.windowStore.fetch((Object)8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        this.windowStore.flush();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), this.segmentDirs(this.baseDir));
    }

    @Test
    public void testSegmentMaintenance() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, true);
        this.context.setTime(0L);
        this.setCurrentTime(0L);
        this.windowStore.put((Object)0, (Object)"v");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(0L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(59999L);
        this.windowStore.put((Object)0, (Object)"v");
        this.windowStore.put((Object)0, (Object)"v");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(0L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(60000L);
        this.windowStore.put((Object)0, (Object)"v");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(0L), this.segments.segmentName(1L)}), this.segmentDirs(this.baseDir));
        WindowStoreIterator iter = this.windowStore.fetch((Object)0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(240000L));
        int fetchedCount = 0;
        while (iter.hasNext()) {
            iter.next();
            ++fetchedCount;
        }
        Assert.assertEquals((long)4L, (long)fetchedCount);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(0L), this.segments.segmentName(1L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(180000L);
        this.windowStore.put((Object)0, (Object)"v");
        iter = this.windowStore.fetch((Object)0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(240000L));
        fetchedCount = 0;
        while (iter.hasNext()) {
            iter.next();
            ++fetchedCount;
        }
        Assert.assertEquals((long)2L, (long)fetchedCount);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(1L), this.segments.segmentName(3L)}), this.segmentDirs(this.baseDir));
        this.setCurrentTime(300000L);
        this.windowStore.put((Object)0, (Object)"v");
        iter = this.windowStore.fetch((Object)0, Instant.ofEpochMilli(240000L), Instant.ofEpochMilli(600000L));
        fetchedCount = 0;
        while (iter.hasNext()) {
            iter.next();
            ++fetchedCount;
        }
        Assert.assertEquals((long)1L, (long)fetchedCount);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(3L), this.segments.segmentName(5L)}), this.segmentDirs(this.baseDir));
    }

    @Test
    public void testInitialLoading() {
        File storeDir = new File(this.baseDir, "window");
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        new File(storeDir, this.segments.segmentName(0L)).mkdir();
        new File(storeDir, this.segments.segmentName(1L)).mkdir();
        new File(storeDir, this.segments.segmentName(2L)).mkdir();
        new File(storeDir, this.segments.segmentName(3L)).mkdir();
        new File(storeDir, this.segments.segmentName(4L)).mkdir();
        new File(storeDir, this.segments.segmentName(5L)).mkdir();
        new File(storeDir, this.segments.segmentName(6L)).mkdir();
        this.windowStore.close();
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        this.windowStore.put((Object)1, (Object)"v", 360000L);
        List<String> expected = Arrays.asList(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L));
        expected.sort(String::compareTo);
        List actual = Utils.toList(this.segmentDirs(this.baseDir).iterator());
        actual.sort(String::compareTo);
        Assert.assertEquals(expected, (Object)actual);
        try (WindowStoreIterator iter = this.windowStore.fetch((Object)0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000000L));){
            while (iter.hasNext()) {
                iter.next();
            }
        }
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), this.segmentDirs(this.baseDir));
    }

    @Test
    public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        this.setCurrentTime(0L);
        this.windowStore.put((Object)1, (Object)"one", 1L);
        this.windowStore.put((Object)1, (Object)"two", 2L);
        this.windowStore.put((Object)1, (Object)"three", 3L);
        WindowStoreIterator iterator = this.windowStore.fetch((Object)1, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(3L));
        Assert.assertTrue((boolean)iterator.hasNext());
        this.windowStore.close();
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldFetchAndIterateOverExactKeys() {
        long windowSize = 0x7A00000000000000L;
        long retentionPeriod = 0x7A00000000000000L;
        WindowStore windowStore = (WindowStore)Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)"window", (Duration)Duration.ofMillis(0x7A00000000000000L), (Duration)Duration.ofMillis(0x7A00000000000000L), (boolean)true), (Serde)Serdes.String(), (Serde)Serdes.String()).build();
        windowStore.init((ProcessorContext)this.context, (StateStore)windowStore);
        windowStore.put((Object)"a", (Object)"0001", 0L);
        windowStore.put((Object)"aa", (Object)"0002", 0L);
        windowStore.put((Object)"a", (Object)"0003", 1L);
        windowStore.put((Object)"aa", (Object)"0004", 1L);
        windowStore.put((Object)"a", (Object)"0005", 0x79FFFFFFFFFFFFFFL);
        List<String> expected = Arrays.asList("0001", "0003", "0005");
        MatcherAssert.assertThat(this.toList(windowStore.fetch((Object)"a", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expected));
        List list = StreamsTestUtils.toList(windowStore.fetch((Object)"a", (Object)"a", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE)));
        MatcherAssert.assertThat(list, (Matcher)CoreMatchers.equalTo(Arrays.asList(RocksDBWindowStoreTest.windowedPair("a", "0001", 0L, 0x7A00000000000000L), RocksDBWindowStoreTest.windowedPair("a", "0003", 1L, 0x7A00000000000000L), RocksDBWindowStoreTest.windowedPair("a", "0005", 0x79FFFFFFFFFFFFFFL, 0x7A00000000000000L))));
        list = StreamsTestUtils.toList(windowStore.fetch((Object)"aa", (Object)"aa", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE)));
        MatcherAssert.assertThat(list, (Matcher)CoreMatchers.equalTo(Arrays.asList(RocksDBWindowStoreTest.windowedPair("aa", "0002", 0L, 0x7A00000000000000L), RocksDBWindowStoreTest.windowedPair("aa", "0004", 1L, 0x7A00000000000000L))));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        this.windowStore.put(null, (Object)"anyValue");
    }

    @Test
    public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        this.windowStore.put((Object)1, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnGetNullKey() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        this.windowStore.fetch(null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        this.windowStore.fetch(null, (Object)2, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
        this.windowStore = this.createWindowStore((ProcessorContext)this.context, false);
        this.windowStore.fetch((Object)1, null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L));
    }

    @Test
    public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() {
        this.windowStore = new RocksDBWindowStore((SegmentedBytesStore)new RocksDBSegmentedBytesStore("window", "metrics-scope", 120000L, 60000L, (SegmentedBytesStore.KeySchema)new WindowKeySchema()), Serdes.Integer(), (Serde)new SerdeThatDoesntHandleNull(), false, 3L);
        this.windowStore.init((ProcessorContext)this.context, this.windowStore);
        Assert.assertNull((Object)this.windowStore.fetch((Object)1, 0L));
    }

    @Test
    public void shouldFetchAndIterateOverExactBinaryKeys() {
        WindowStore windowStore = (WindowStore)Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)"window", (Duration)Duration.ofMillis(60000L), (Duration)Duration.ofMillis(60000L), (boolean)true), (Serde)Serdes.Bytes(), (Serde)Serdes.String()).build();
        windowStore.init((ProcessorContext)this.context, (StateStore)windowStore);
        Bytes key1 = Bytes.wrap((byte[])new byte[]{0});
        Bytes key2 = Bytes.wrap((byte[])new byte[]{0, 0});
        Bytes key3 = Bytes.wrap((byte[])new byte[]{0, 0, 0});
        windowStore.put((Object)key1, (Object)"1", 0L);
        windowStore.put((Object)key2, (Object)"2", 0L);
        windowStore.put((Object)key3, (Object)"3", 0L);
        windowStore.put((Object)key1, (Object)"4", 1L);
        windowStore.put((Object)key2, (Object)"5", 1L);
        windowStore.put((Object)key3, (Object)"6", 59999L);
        windowStore.put((Object)key1, (Object)"7", 59999L);
        windowStore.put((Object)key2, (Object)"8", 59999L);
        windowStore.put((Object)key3, (Object)"9", 59999L);
        List<String> expectedKey1 = Arrays.asList("1", "4", "7");
        MatcherAssert.assertThat(this.toList(windowStore.fetch((Object)key1, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expectedKey1));
        List<String> expectedKey2 = Arrays.asList("2", "5", "8");
        MatcherAssert.assertThat(this.toList(windowStore.fetch((Object)key2, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expectedKey2));
        List<String> expectedKey3 = Arrays.asList("3", "6", "9");
        MatcherAssert.assertThat(this.toList(windowStore.fetch((Object)key3, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expectedKey3));
    }

    private void putFirstBatch(WindowStore<Integer, String> store, long startTime, InternalMockProcessorContext context) {
        context.setRecordContext(this.createRecordContext(startTime));
        store.put((Object)0, (Object)"zero");
        context.setRecordContext(this.createRecordContext(startTime + 1L));
        store.put((Object)1, (Object)"one");
        context.setRecordContext(this.createRecordContext(startTime + 2L));
        store.put((Object)2, (Object)"two");
        context.setRecordContext(this.createRecordContext(startTime + 4L));
        store.put((Object)4, (Object)"four");
        context.setRecordContext(this.createRecordContext(startTime + 5L));
        store.put((Object)5, (Object)"five");
    }

    private void putSecondBatch(WindowStore<Integer, String> store, long startTime, InternalMockProcessorContext context) {
        context.setRecordContext(this.createRecordContext(startTime + 3L));
        store.put((Object)2, (Object)"two+1");
        context.setRecordContext(this.createRecordContext(startTime + 4L));
        store.put((Object)2, (Object)"two+2");
        context.setRecordContext(this.createRecordContext(startTime + 5L));
        store.put((Object)2, (Object)"two+3");
        context.setRecordContext(this.createRecordContext(startTime + 6L));
        store.put((Object)2, (Object)"two+4");
        context.setRecordContext(this.createRecordContext(startTime + 7L));
        store.put((Object)2, (Object)"two+5");
        context.setRecordContext(this.createRecordContext(startTime + 8L));
        store.put((Object)2, (Object)"two+6");
    }

    private <E> List<E> toList(WindowStoreIterator<E> iterator) {
        ArrayList<Object> list = new ArrayList<Object>();
        while (iterator.hasNext()) {
            list.add(((KeyValue)iterator.next()).value);
        }
        return list;
    }

    private Set<String> segmentDirs(File baseDir) {
        File windowDir = new File(baseDir, "window");
        return new HashSet<Object>(Arrays.asList((Object[])Objects.requireNonNull(windowDir.list())));
    }

    private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {
        HashMap<Integer, Set<String>> entriesByKey = new HashMap<Integer, Set<String>>();
        for (KeyValue<byte[], byte[]> entry : changeLog) {
            long timestamp = WindowKeySchema.extractStoreTimestamp((byte[])((byte[])entry.key));
            Integer key = (Integer)WindowKeySchema.extractStoreKey((byte[])((byte[])entry.key), this.serdes);
            String value = entry.value == null ? null : (String)this.serdes.valueFrom((byte[])entry.value);
            Set entries = entriesByKey.computeIfAbsent(key, k -> new HashSet());
            entries.add(value + "@" + (timestamp - startTime));
        }
        return entriesByKey;
    }

    private <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
        return RocksDBWindowStoreTest.windowedPair(key, value, timestamp, 3L);
    }

    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
        return KeyValue.pair((Object)new Windowed(key, (Window)WindowKeySchema.timeWindowForSize((long)timestamp, (long)windowSize)), value);
    }
}

