/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.IOException;
import java.util.List;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.CachingKeyValueStoreTest;
import org.apache.kafka.streams.state.internals.CachingWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCacheTest;
import org.apache.kafka.streams.state.internals.WindowStoreKeySchema;
import org.apache.kafka.streams.state.internals.WindowStoreUtils;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CachingWindowStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 150;
    private static final Long WINDOW_SIZE = 10000L;
    private RocksDBSegmentedBytesStore underlying;
    private CachingWindowStore<String, String> cachingStore;
    private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>> cacheListener;
    private ThreadCache cache;
    private String topic;
    private static final long DEFAULT_TIMESTAMP = 10L;
    private WindowStoreKeySchema keySchema;
    private RocksDBWindowStore<Bytes, byte[]> windowStore;

    @Before
    public void setUp() throws Exception {
        this.keySchema = new WindowStoreKeySchema();
        this.underlying = new RocksDBSegmentedBytesStore("test", 30000L, 3, (SegmentedBytesStore.KeySchema)this.keySchema);
        this.windowStore = new RocksDBWindowStore((SegmentedBytesStore)this.underlying, Serdes.Bytes(), Serdes.ByteArray(), false);
        this.cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub();
        this.cachingStore = new CachingWindowStore(this.windowStore, Serdes.String(), Serdes.String(), WINDOW_SIZE.longValue());
        this.cachingStore.setFlushListener(this.cacheListener);
        this.cache = new ThreadCache("testCache", 150L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        this.topic = "topic";
        MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector)null, this.cache);
        context.setRecordContext((RecordContext)new ProcessorRecordContext(10L, 0L, 0, this.topic));
        this.cachingStore.init((ProcessorContext)context, this.cachingStore);
    }

    @After
    public void closeStore() {
        this.cachingStore.close();
    }

    @Test
    public void shouldPutFetchFromCache() throws Exception {
        this.cachingStore.put((Object)"a", (Object)"a");
        this.cachingStore.put((Object)"b", (Object)"b");
        WindowStoreIterator a = this.cachingStore.fetch((Object)"a", 10L, 10L);
        WindowStoreIterator b = this.cachingStore.fetch((Object)"b", 10L, 10L);
        Assert.assertEquals((Object)KeyValue.pair((Object)10L, (Object)"a"), (Object)a.next());
        Assert.assertEquals((Object)KeyValue.pair((Object)10L, (Object)"b"), (Object)b.next());
        Assert.assertFalse((boolean)a.hasNext());
        Assert.assertFalse((boolean)b.hasNext());
        Assert.assertEquals((long)2L, (long)this.cache.size());
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
        int added = this.addItemsToCache();
        KeyValueIterator iter = this.underlying.fetch(Bytes.wrap((byte[])"0".getBytes()), 10L, 10L);
        KeyValue next = (KeyValue)iter.next();
        Assert.assertEquals((long)10L, (long)this.keySchema.segmentTimestamp((Bytes)next.key));
        Assert.assertArrayEquals((byte[])"0".getBytes(), (byte[])((byte[])next.value));
        Assert.assertFalse((boolean)iter.hasNext());
        Assert.assertEquals((long)(added - 1), (long)this.cache.size());
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() throws Exception {
        Windowed windowedKey = new Windowed((Object)"1", (Window)new TimeWindow(10L, 10L + WINDOW_SIZE));
        this.cachingStore.put((Object)"1", (Object)"a");
        this.cachingStore.flush();
        Assert.assertEquals((Object)"a", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() throws Exception {
        Windowed windowedKey = new Windowed((Object)"1", (Window)new TimeWindow(10L, 10L + WINDOW_SIZE));
        this.cachingStore.put((Object)"1", (Object)"a");
        this.cachingStore.flush();
        this.cachingStore.put((Object)"1", (Object)"b");
        this.cachingStore.flush();
        Assert.assertEquals((Object)"b", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertEquals((Object)"a", (Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() throws Exception {
        int numRecords = this.addItemsToCache();
        Assert.assertEquals((long)numRecords, (long)this.cacheListener.forwarded.size());
    }

    @Test
    public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() throws Exception {
        this.cachingStore.put((Object)"1", (Object)"a", 10L);
        this.cachingStore.flush();
        this.cachingStore.put((Object)"1", (Object)"b", 10L);
        WindowStoreIterator fetch = this.cachingStore.fetch((Object)"1", 10L, 10L);
        Assert.assertEquals((Object)KeyValue.pair((Object)10L, (Object)"b"), (Object)fetch.next());
        Assert.assertFalse((boolean)fetch.hasNext());
    }

    @Test
    public void shouldIterateAcrossWindows() throws Exception {
        this.cachingStore.put((Object)"1", (Object)"a", 10L);
        this.cachingStore.put((Object)"1", (Object)"b", 10L + WINDOW_SIZE);
        WindowStoreIterator fetch = this.cachingStore.fetch((Object)"1", 10L, 10L + WINDOW_SIZE);
        Assert.assertEquals((Object)KeyValue.pair((Object)10L, (Object)"a"), (Object)fetch.next());
        Assert.assertEquals((Object)KeyValue.pair((Object)(10L + WINDOW_SIZE), (Object)"b"), (Object)fetch.next());
        Assert.assertFalse((boolean)fetch.hasNext());
    }

    @Test
    public void shouldIterateCacheAndStore() throws Exception {
        Bytes key = Bytes.wrap((byte[])"1".getBytes());
        this.underlying.put(Bytes.wrap((byte[])WindowStoreUtils.toBinaryKey((Object)key, (long)10L, (int)0, (StateSerdes)WindowStoreUtils.getInnerStateSerde((String)"topic"))), "a".getBytes());
        this.cachingStore.put((Object)"1", (Object)"b", 10L + WINDOW_SIZE);
        WindowStoreIterator fetch = this.cachingStore.fetch((Object)"1", 10L, 10L + WINDOW_SIZE);
        Assert.assertEquals((Object)KeyValue.pair((Object)10L, (Object)"a"), (Object)fetch.next());
        Assert.assertEquals((Object)KeyValue.pair((Object)(10L + WINDOW_SIZE), (Object)"b"), (Object)fetch.next());
        Assert.assertFalse((boolean)fetch.hasNext());
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() throws Exception {
        this.cachingStore.put((Object)"a", (Object)"a");
        Assert.assertEquals((long)1L, (long)this.cache.size());
        this.cachingStore.close();
        Assert.assertEquals((long)0L, (long)this.cache.size());
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFetchFromClosedCachingStore() throws Exception {
        this.cachingStore.close();
        this.cachingStore.fetch((Object)"a", 0L, 10L);
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception {
        this.cachingStore.close();
        this.cachingStore.put((Object)"a", (Object)"a");
    }

    @Test
    public void shouldFetchAndIterateOverExactKeys() throws Exception {
        this.cachingStore.put((Object)"a", (Object)"0001", 0L);
        this.cachingStore.put((Object)"aa", (Object)"0002", 0L);
        this.cachingStore.put((Object)"a", (Object)"0003", 1L);
        this.cachingStore.put((Object)"aa", (Object)"0004", 1L);
        this.cachingStore.put((Object)"a", (Object)"0005", 60000L);
        List expected = Utils.mkList((Object[])new KeyValue[]{KeyValue.pair((Object)0L, (Object)"0001"), KeyValue.pair((Object)1L, (Object)"0003"), KeyValue.pair((Object)60000L, (Object)"0005")});
        MatcherAssert.assertThat(StreamsTestUtils.toList(this.cachingStore.fetch((Object)"a", 0L, Long.MAX_VALUE)), (Matcher)CoreMatchers.equalTo((Object)expected));
    }

    private int addItemsToCache() throws IOException {
        String kv;
        int i = 0;
        for (int cachedSize = 0; cachedSize < 150; cachedSize += ThreadCacheTest.memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), this.topic) + 8 + 4) {
            kv = String.valueOf(i++);
            this.cachingStore.put((Object)kv, (Object)kv);
        }
        return i;
    }
}

