/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.CachingKeyValueStoreTest;
import org.apache.kafka.streams.state.internals.CachingWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCacheTest;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CachingWindowStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 150;
    private static final long DEFAULT_TIMESTAMP = 10L;
    private static final Long WINDOW_SIZE = 10L;
    private static final long SEGMENT_INTERVAL = 100L;
    private static final String TOPIC = "topic";
    private static final String CACHE_NAMESPACE = "0_0-store-name";
    private InternalMockProcessorContext context;
    private RocksDBSegmentedBytesStore bytesStore;
    private WindowStore<Bytes, byte[]> underlyingStore;
    private CachingWindowStore cachingStore;
    private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> cacheListener;
    private ThreadCache cache;
    private WindowKeySchema keySchema;

    @Before
    public void setUp() {
        this.keySchema = new WindowKeySchema();
        this.bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, 100L, (SegmentedBytesStore.KeySchema)this.keySchema);
        this.underlyingStore = new RocksDBWindowStore((SegmentedBytesStore)this.bytesStore, false, WINDOW_SIZE.longValue());
        TimeWindowedDeserializer keyDeserializer = new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), WINDOW_SIZE.longValue());
        keyDeserializer.setIsChangelogTopic(true);
        this.cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub(keyDeserializer, new StringDeserializer());
        this.cachingStore = new CachingWindowStore(this.underlyingStore, WINDOW_SIZE.longValue(), 100L);
        this.cachingStore.setFlushListener(this.cacheListener, false);
        this.cache = new ThreadCache(new LogContext("testCache "), 150L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, TOPIC, null));
        this.cachingStore.init((ProcessorContext)this.context, (StateStore)this.cachingStore);
    }

    @After
    public void closeStore() {
        this.cachingStore.close();
    }

    @Test
    public void shouldNotReturnDuplicatesInRanges() {
        int i;
        StreamsBuilder builder = new StreamsBuilder();
        StoreBuilder storeBuilder = Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)"store-name", (Duration)Duration.ofHours(1L), (Duration)Duration.ofMinutes(1L), (boolean)false), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled();
        builder.addStateStore(storeBuilder);
        builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).transform(() -> new Transformer<String, String, KeyValue<String, String>>(){
            private int numRecordsProcessed;
            private ProcessorContext context;
            private WindowStore store;

            public void init(ProcessorContext processorContext) {
                this.context = processorContext;
                this.store = (WindowStore)processorContext.getStateStore("store-name");
                int count = 0;
                KeyValueIterator all = this.store.all();
                while (all.hasNext()) {
                    ++count;
                    all.next();
                }
                MatcherAssert.assertThat((Object)count, (Matcher)CoreMatchers.equalTo((Object)0));
            }

            public KeyValue<String, String> transform(String key, String value) {
                int count = 0;
                KeyValueIterator all = this.store.all();
                while (all.hasNext()) {
                    ++count;
                    all.next();
                }
                MatcherAssert.assertThat((Object)count, (Matcher)CoreMatchers.equalTo((Object)this.numRecordsProcessed));
                this.store.put((Object)value, (Object)value, this.context.timestamp());
                ++this.numRecordsProcessed;
                return new KeyValue((Object)key, (Object)value);
            }

            public void close() {
            }
        }, new String[]{"store-name"});
        String bootstrapServers = "localhost:9092";
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("auto.offset.reset", "earliest");
        streamsConfiguration.put("application.id", "test-app");
        streamsConfiguration.put("bootstrap.servers", "localhost:9092");
        streamsConfiguration.put("default.key.serde", Serdes.String().getClass().getName());
        streamsConfiguration.put("default.value.serde", Serdes.String().getClass().getName());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("commit.interval.ms", (Object)10000);
        Instant initialWallClockTime = Instant.ofEpochMilli(0L);
        TopologyTestDriver driver = new TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime);
        TestInputTopic inputTopic = driver.createInputTopic(TOPIC, Serdes.String().serializer(), Serdes.String().serializer(), initialWallClockTime, Duration.ZERO);
        for (i = 0; i < 5; ++i) {
            inputTopic.pipeInput((Object)UUID.randomUUID().toString(), (Object)UUID.randomUUID().toString());
        }
        driver.advanceWallClockTime(Duration.ofSeconds(10L));
        inputTopic.advanceTime(Duration.ofSeconds(10L));
        for (i = 0; i < 5; ++i) {
            inputTopic.pipeInput((Object)UUID.randomUUID().toString(), (Object)UUID.randomUUID().toString());
        }
        driver.advanceWallClockTime(Duration.ofSeconds(10L));
        inputTopic.advanceTime(Duration.ofSeconds(10L));
        for (i = 0; i < 5; ++i) {
            inputTopic.pipeInput((Object)UUID.randomUUID().toString(), (Object)UUID.randomUUID().toString());
        }
        driver.advanceWallClockTime(Duration.ofSeconds(10L));
        inputTopic.advanceTime(Duration.ofSeconds(10L));
        for (i = 0; i < 5; ++i) {
            inputTopic.pipeInput((Object)UUID.randomUUID().toString(), (Object)UUID.randomUUID().toString());
        }
    }

    @Test
    public void shouldPutFetchFromCache() {
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("a"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("b"), CachingWindowStoreTest.bytesValue("b"));
        MatcherAssert.assertThat((Object)this.cachingStore.fetch(CachingWindowStoreTest.bytesKey("a"), 10L), (Matcher)CoreMatchers.equalTo((Object)CachingWindowStoreTest.bytesValue("a")));
        MatcherAssert.assertThat((Object)this.cachingStore.fetch(CachingWindowStoreTest.bytesKey("b"), 10L), (Matcher)CoreMatchers.equalTo((Object)CachingWindowStoreTest.bytesValue("b")));
        MatcherAssert.assertThat((Object)this.cachingStore.fetch(CachingWindowStoreTest.bytesKey("c"), 10L), (Matcher)CoreMatchers.equalTo(null));
        MatcherAssert.assertThat((Object)this.cachingStore.fetch(CachingWindowStoreTest.bytesKey("a"), 0L), (Matcher)CoreMatchers.equalTo(null));
        WindowStoreIterator a = this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("a"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L));
        WindowStoreIterator b = this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("b"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L));
        this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)a.next()), 10L, "a");
        this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)b.next()), 10L, "b");
        Assert.assertFalse((boolean)a.hasNext());
        Assert.assertFalse((boolean)b.hasNext());
        Assert.assertEquals((long)2L, (long)this.cache.size());
    }

    private void verifyKeyValue(KeyValue<Long, byte[]> next, long expectedKey, String expectedValue) {
        MatcherAssert.assertThat((Object)next.key, (Matcher)CoreMatchers.equalTo((Object)expectedKey));
        MatcherAssert.assertThat((Object)next.value, (Matcher)CoreMatchers.equalTo((Object)CachingWindowStoreTest.bytesValue(expectedValue)));
    }

    private static byte[] bytesValue(String value) {
        return value.getBytes();
    }

    private static Bytes bytesKey(String key) {
        return Bytes.wrap((byte[])key.getBytes());
    }

    private String stringFrom(byte[] from) {
        return (String)Serdes.String().deserializer().deserialize("", from);
    }

    @Test
    public void shouldPutFetchRangeFromCache() {
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("a"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("b"), CachingWindowStoreTest.bytesValue("b"));
        KeyValueIterator iterator = this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("a"), (Object)CachingWindowStoreTest.bytesKey("b"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L));
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator.next()), (Windowed<Bytes>)new Windowed((Object)CachingWindowStoreTest.bytesKey("a"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), "a");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator.next()), (Windowed<Bytes>)new Windowed((Object)CachingWindowStoreTest.bytesKey("b"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), "b");
        Assert.assertFalse((boolean)iterator.hasNext());
        Assert.assertEquals((long)2L, (long)this.cache.size());
    }

    @Test
    public void shouldGetAllFromCache() {
        String[] array;
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("a"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("b"), CachingWindowStoreTest.bytesValue("b"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("c"), CachingWindowStoreTest.bytesValue("c"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("d"), CachingWindowStoreTest.bytesValue("d"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("e"), CachingWindowStoreTest.bytesValue("e"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("f"), CachingWindowStoreTest.bytesValue("f"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("g"), CachingWindowStoreTest.bytesValue("g"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("h"), CachingWindowStoreTest.bytesValue("h"));
        KeyValueIterator iterator = this.cachingStore.all();
        for (String s : array = new String[]{"a", "b", "c", "d", "e", "f", "g", "h"}) {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator.next()), (Windowed<Bytes>)new Windowed((Object)CachingWindowStoreTest.bytesKey(s), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), s);
        }
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldFetchAllWithinTimestampRange() {
        String[] array = new String[]{"a", "b", "c", "d", "e", "f", "g", "h"};
        for (int i = 0; i < array.length; ++i) {
            this.context.setTime(i);
            this.cachingStore.put(CachingWindowStoreTest.bytesKey(array[i]), CachingWindowStoreTest.bytesValue(array[i]));
        }
        KeyValueIterator iterator = this.cachingStore.fetchAll(Instant.ofEpochMilli(0L), Instant.ofEpochMilli(7L));
        for (int i = 0; i < array.length; ++i) {
            String str = array[i];
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator.next()), (Windowed<Bytes>)new Windowed((Object)CachingWindowStoreTest.bytesKey(str), (Window)new TimeWindow((long)i, (long)i + WINDOW_SIZE)), str);
        }
        Assert.assertFalse((boolean)iterator.hasNext());
        KeyValueIterator iterator1 = this.cachingStore.fetchAll(Instant.ofEpochMilli(2L), Instant.ofEpochMilli(4L));
        for (int i = 2; i <= 4; ++i) {
            String str = array[i];
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator1.next()), (Windowed<Bytes>)new Windowed((Object)CachingWindowStoreTest.bytesKey(str), (Window)new TimeWindow((long)i, (long)i + WINDOW_SIZE)), str);
        }
        Assert.assertFalse((boolean)iterator1.hasNext());
        KeyValueIterator iterator2 = this.cachingStore.fetchAll(Instant.ofEpochMilli(5L), Instant.ofEpochMilli(7L));
        for (int i = 5; i <= 7; ++i) {
            String str = array[i];
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator2.next()), (Windowed<Bytes>)new Windowed((Object)CachingWindowStoreTest.bytesKey(str), (Window)new TimeWindow((long)i, (long)i + WINDOW_SIZE)), str);
        }
        Assert.assertFalse((boolean)iterator2.hasNext());
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() {
        int added = this.addItemsToCache();
        KeyValueIterator iter = this.bytesStore.fetch(Bytes.wrap((byte[])"0".getBytes(StandardCharsets.UTF_8)), 10L, 10L);
        KeyValue next = (KeyValue)iter.next();
        Assert.assertEquals((long)10L, (long)this.keySchema.segmentTimestamp((Bytes)next.key));
        Assert.assertArrayEquals((byte[])"0".getBytes(), (byte[])((byte[])next.value));
        Assert.assertFalse((boolean)iter.hasNext());
        Assert.assertEquals((long)(added - 1), (long)this.cache.size());
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() {
        Windowed windowedKey = new Windowed((Object)"1", (Window)new TimeWindow(10L, 10L + WINDOW_SIZE));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("a"));
        this.cachingStore.flush();
        Assert.assertEquals((Object)"a", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
    }

    @Test
    public void shouldSetFlushListener() {
        Assert.assertTrue((boolean)this.cachingStore.setFlushListener(null, true));
        Assert.assertTrue((boolean)this.cachingStore.setFlushListener(null, false));
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() {
        this.cachingStore.setFlushListener(this.cacheListener, true);
        Windowed windowedKey = new Windowed((Object)"1", (Window)new TimeWindow(10L, 10L + WINDOW_SIZE));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("a"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("b"));
        this.cachingStore.flush();
        Assert.assertEquals((Object)"b", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cacheListener.forwarded.clear();
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("c"));
        this.cachingStore.flush();
        Assert.assertEquals((Object)"c", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertEquals((Object)"b", (Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), null);
        this.cachingStore.flush();
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertEquals((Object)"c", (Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cacheListener.forwarded.clear();
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("a"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("b"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), null);
        this.cachingStore.flush();
        Assert.assertNull(this.cacheListener.forwarded.get(windowedKey));
        this.cacheListener.forwarded.clear();
    }

    @Test
    public void shouldForwardOldValuesWhenDisabled() {
        Windowed windowedKey = new Windowed((Object)"1", (Window)new TimeWindow(10L, 10L + WINDOW_SIZE));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("a"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("b"));
        this.cachingStore.flush();
        Assert.assertEquals((Object)"b", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("c"));
        this.cachingStore.flush();
        Assert.assertEquals((Object)"c", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), null);
        this.cachingStore.flush();
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cacheListener.forwarded.clear();
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("a"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("b"));
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), null);
        this.cachingStore.flush();
        Assert.assertNull(this.cacheListener.forwarded.get(windowedKey));
        this.cacheListener.forwarded.clear();
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() {
        int numRecords = this.addItemsToCache();
        Assert.assertEquals((long)numRecords, (long)this.cacheListener.forwarded.size());
    }

    @Test
    public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() {
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.flush();
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("b"), 10L);
        WindowStoreIterator fetch = this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("1"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L));
        this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L, "b");
        Assert.assertFalse((boolean)fetch.hasNext());
    }

    @Test
    public void shouldIterateAcrossWindows() {
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("1"), CachingWindowStoreTest.bytesValue("b"), 10L + WINDOW_SIZE);
        WindowStoreIterator fetch = this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("1"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L + WINDOW_SIZE));
        this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L, "a");
        this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L + WINDOW_SIZE, "b");
        Assert.assertFalse((boolean)fetch.hasNext());
    }

    @Test
    public void shouldIterateCacheAndStore() {
        Bytes key = Bytes.wrap((byte[])"1".getBytes());
        this.bytesStore.put(WindowKeySchema.toStoreKeyBinary((Bytes)key, (long)10L, (int)0), "a".getBytes());
        this.cachingStore.put(key, CachingWindowStoreTest.bytesValue("b"), 10L + WINDOW_SIZE);
        WindowStoreIterator fetch = this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("1"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L + WINDOW_SIZE));
        this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L, "a");
        this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L + WINDOW_SIZE, "b");
        Assert.assertFalse((boolean)fetch.hasNext());
    }

    @Test
    public void shouldIterateCacheAndStoreKeyRange() {
        Bytes key = Bytes.wrap((byte[])"1".getBytes());
        this.bytesStore.put(WindowKeySchema.toStoreKeyBinary((Bytes)key, (long)10L, (int)0), "a".getBytes());
        this.cachingStore.put(key, CachingWindowStoreTest.bytesValue("b"), 10L + WINDOW_SIZE);
        KeyValueIterator fetchRange = this.cachingStore.fetch((Object)key, (Object)CachingWindowStoreTest.bytesKey("2"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L + WINDOW_SIZE));
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)fetchRange.next()), (Windowed<Bytes>)new Windowed((Object)key, (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), "a");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)fetchRange.next()), (Windowed<Bytes>)new Windowed((Object)key, (Window)new TimeWindow(10L + WINDOW_SIZE, 10L + WINDOW_SIZE + WINDOW_SIZE)), "b");
        Assert.assertFalse((boolean)fetchRange.hasNext());
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() {
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("a"));
        Assert.assertEquals((long)1L, (long)this.cache.size());
        this.cachingStore.close();
        Assert.assertEquals((long)0L, (long)this.cache.size());
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
        this.cachingStore.close();
        this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10L));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFetchRangeFromClosedCachingStore() {
        this.cachingStore.close();
        this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("a"), (Object)CachingWindowStoreTest.bytesKey("b"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10L));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToWriteToClosedCachingStore() {
        this.cachingStore.close();
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("a"));
    }

    @Test
    public void shouldFetchAndIterateOverExactKeys() {
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("0001"), 0L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("aa"), CachingWindowStoreTest.bytesValue("0002"), 0L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("0003"), 1L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("aa"), CachingWindowStoreTest.bytesValue("0004"), 1L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("0005"), 100L);
        List expected = Arrays.asList(KeyValue.pair((Object)0L, (Object)CachingWindowStoreTest.bytesValue("0001")), KeyValue.pair((Object)1L, (Object)CachingWindowStoreTest.bytesValue("0003")), KeyValue.pair((Object)100L, (Object)CachingWindowStoreTest.bytesValue("0005")));
        List actual = StreamsTestUtils.toList(this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE)));
        StreamsTestUtils.verifyKeyValueList(expected, actual);
    }

    @Test
    public void shouldFetchAndIterateOverKeyRange() {
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("0001"), 0L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("aa"), CachingWindowStoreTest.bytesValue("0002"), 0L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("0003"), 1L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("aa"), CachingWindowStoreTest.bytesValue("0004"), 1L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("0005"), 100L);
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(CachingWindowStoreTest.windowedPair("a", "0001", 0L), CachingWindowStoreTest.windowedPair("a", "0003", 1L), CachingWindowStoreTest.windowedPair("a", "0005", 100L)), StreamsTestUtils.toList(this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("a"), (Object)CachingWindowStoreTest.bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(CachingWindowStoreTest.windowedPair("aa", "0002", 0L), CachingWindowStoreTest.windowedPair("aa", "0004", 1L)), StreamsTestUtils.toList(this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("aa"), (Object)CachingWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(CachingWindowStoreTest.windowedPair("a", "0001", 0L), CachingWindowStoreTest.windowedPair("a", "0003", 1L), CachingWindowStoreTest.windowedPair("aa", "0002", 0L), CachingWindowStoreTest.windowedPair("aa", "0004", 1L), CachingWindowStoreTest.windowedPair("a", "0005", 100L)), StreamsTestUtils.toList(this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("a"), (Object)CachingWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() {
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), CachingWindowStoreTest.bytesValue("0001"), 0L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("aa"), CachingWindowStoreTest.bytesValue("0002"), 1L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("aa"), CachingWindowStoreTest.bytesValue("0003"), 2L);
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("aaa"), CachingWindowStoreTest.bytesValue("0004"), 3L);
        WindowStoreIterator singleKeyIterator = this.cachingStore.fetch(CachingWindowStoreTest.bytesKey("aa"), 0L, 5L);
        KeyValueIterator keyRangeIterator = this.cachingStore.fetch(CachingWindowStoreTest.bytesKey("aa"), CachingWindowStoreTest.bytesKey("aa"), 0L, 5L);
        Assert.assertEquals((Object)this.stringFrom((byte[])((KeyValue)singleKeyIterator.next()).value), (Object)this.stringFrom((byte[])((KeyValue)keyRangeIterator.next()).value));
        Assert.assertEquals((Object)this.stringFrom((byte[])((KeyValue)singleKeyIterator.next()).value), (Object)this.stringFrom((byte[])((KeyValue)keyRangeIterator.next()).value));
        Assert.assertFalse((boolean)singleKeyIterator.hasNext());
        Assert.assertFalse((boolean)keyRangeIterator.hasNext());
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        this.cachingStore.put(null, CachingWindowStoreTest.bytesValue("anyValue"));
    }

    @Test
    public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
        this.cachingStore.put(CachingWindowStoreTest.bytesKey("a"), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnFetchNullKey() {
        this.cachingStore.fetch(null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
        this.cachingStore.fetch(null, (Object)CachingWindowStoreTest.bytesKey("anyTo"), Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
        this.cachingStore.fetch((Object)CachingWindowStoreTest.bytesKey("anyFrom"), null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L));
    }

    @Test
    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
        Bytes keyFrom = Bytes.wrap((byte[])Serdes.Integer().serializer().serialize("", (Object)-1));
        Bytes keyTo = Bytes.wrap((byte[])Serdes.Integer().serializer().serialize("", (Object)1));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(CachingWindowStore.class);){
            KeyValueIterator iterator = this.cachingStore.fetch(keyFrom, keyTo, 0L, 10L);
            Assert.assertFalse((boolean)iterator.hasNext());
            List<String> messages = appender.getMessages();
            MatcherAssert.assertThat(messages, (Matcher)CoreMatchers.hasItem((Object)"Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
        }
    }

    @Test
    public void shouldCloseCacheAndWrappedStoreAfterErrorDuringCacheFlush() {
        this.setUpCloseTests();
        EasyMock.reset((Object[])new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("Simulating an error on flush"));
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.replay((Object[])new Object[]{this.cache});
        EasyMock.reset((Object[])new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        Assert.assertThrows(RuntimeException.class, () -> ((CachingWindowStore)this.cachingStore).close());
        EasyMock.verify((Object[])new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() {
        this.setUpCloseTests();
        EasyMock.reset((Object[])new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("Simulating an error on close"));
        EasyMock.replay((Object[])new Object[]{this.cache});
        EasyMock.reset((Object[])new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        Assert.assertThrows(RuntimeException.class, () -> ((CachingWindowStore)this.cachingStore).close());
        EasyMock.verify((Object[])new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseCacheAfterErrorDuringStateStoreClose() {
        this.setUpCloseTests();
        EasyMock.reset((Object[])new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.replay((Object[])new Object[]{this.cache});
        EasyMock.reset((Object[])new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("Simulating an error on close"));
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        Assert.assertThrows(RuntimeException.class, () -> ((CachingWindowStore)this.cachingStore).close());
        EasyMock.verify((Object[])new Object[]{this.cache, this.underlyingStore});
    }

    private void setUpCloseTests() {
        this.underlyingStore = (WindowStore)EasyMock.createNiceMock(WindowStore.class);
        EasyMock.expect((Object)this.underlyingStore.name()).andStubReturn((Object)"store-name");
        EasyMock.expect((Object)this.underlyingStore.isOpen()).andStubReturn((Object)true);
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        this.cachingStore = new CachingWindowStore(this.underlyingStore, WINDOW_SIZE.longValue(), 100L);
        this.cache = (ThreadCache)EasyMock.createNiceMock(ThreadCache.class);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, TOPIC, null));
        this.cachingStore.init((ProcessorContext)this.context, (StateStore)this.cachingStore);
    }

    private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(String key, String value, long timestamp) {
        return KeyValue.pair((Object)new Windowed((Object)CachingWindowStoreTest.bytesKey(key), (Window)new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), (Object)CachingWindowStoreTest.bytesValue(value));
    }

    private int addItemsToCache() {
        String kv;
        int i = 0;
        for (int cachedSize = 0; cachedSize < 150; cachedSize += ThreadCacheTest.memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), TOPIC) + 8 + 4) {
            kv = String.valueOf(i++);
            this.cachingStore.put(CachingWindowStoreTest.bytesKey(kv), CachingWindowStoreTest.bytesValue(kv));
        }
        return i;
    }
}

