/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCacheTest;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CachingKeyValueStoreTest {
    private final int maxCacheSizeBytes = 150;
    private MockProcessorContext context;
    private CachingKeyValueStore<String, String> store;
    private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
    private ThreadCache cache;
    private CacheFlushListenerStub<String> cacheFlushListener;
    private String topic;

    @Before
    public void setUp() throws Exception {
        String storeName = "store";
        this.underlyingStore = new InMemoryKeyValueStore("store", Serdes.Bytes(), Serdes.ByteArray());
        this.cacheFlushListener = new CacheFlushListenerStub();
        this.store = new CachingKeyValueStore(this.underlyingStore, Serdes.String(), Serdes.String());
        this.store.setFlushListener(this.cacheFlushListener);
        this.cache = new ThreadCache("testCache", 150L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        this.context = new MockProcessorContext(null, null, null, (RecordCollector)null, this.cache);
        this.topic = "topic";
        this.context.setRecordContext((RecordContext)new ProcessorRecordContext(10L, 0L, 0, this.topic));
        this.store.init((ProcessorContext)this.context, null);
    }

    @After
    public void after() {
        this.context.close();
    }

    @Test
    public void shouldPutGetToFromCache() throws Exception {
        this.store.put((Object)"key", (Object)"value");
        this.store.put((Object)"key2", (Object)"value2");
        Assert.assertEquals((Object)"value", (Object)this.store.get((Object)"key"));
        Assert.assertEquals((Object)"value2", (Object)this.store.get((Object)"key2"));
        Assert.assertEquals((long)2L, (long)this.cache.size());
        Assert.assertEquals((long)0L, (long)this.underlyingStore.approximateNumEntries());
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
        int added = this.addItemsToCache();
        Assert.assertEquals((long)added, (long)this.underlyingStore.approximateNumEntries());
        Assert.assertEquals((long)added, (long)this.store.approximateNumEntries());
        Assert.assertNotNull((Object)this.underlyingStore.get((Object)Bytes.wrap((byte[])"0".getBytes())));
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() throws Exception {
        int numRecords = this.addItemsToCache();
        Assert.assertEquals((long)numRecords, (long)this.cacheFlushListener.forwarded.size());
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() throws Exception {
        this.store.put((Object)"1", (Object)"a");
        this.store.flush();
        Assert.assertEquals((Object)"a", (Object)this.cacheFlushListener.forwarded.get((Object)"1").newValue);
        Assert.assertNull((Object)this.cacheFlushListener.forwarded.get((Object)"1").oldValue);
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() throws Exception {
        this.store.put((Object)"1", (Object)"a");
        this.store.flush();
        this.store.put((Object)"1", (Object)"b");
        this.store.flush();
        Assert.assertEquals((Object)"b", (Object)this.cacheFlushListener.forwarded.get((Object)"1").newValue);
        Assert.assertEquals((Object)"a", (Object)this.cacheFlushListener.forwarded.get((Object)"1").oldValue);
    }

    @Test
    public void shouldIterateAllStoredItems() throws Exception {
        int items = this.addItemsToCache();
        KeyValueIterator all = this.store.all();
        ArrayList<Object> results = new ArrayList<Object>();
        while (all.hasNext()) {
            results.add(((KeyValue)all.next()).key);
        }
        Assert.assertEquals((long)items, (long)results.size());
    }

    @Test
    public void shouldIterateOverRange() throws Exception {
        int items = this.addItemsToCache();
        KeyValueIterator range = this.store.range((Object)String.valueOf(0), (Object)String.valueOf(items));
        ArrayList<Object> results = new ArrayList<Object>();
        while (range.hasNext()) {
            results.add(((KeyValue)range.next()).key);
        }
        Assert.assertEquals((long)items, (long)results.size());
    }

    @Test
    public void shouldDeleteItemsFromCache() throws Exception {
        this.store.put((Object)"a", (Object)"a");
        this.store.delete((Object)"a");
        Assert.assertNull((Object)this.store.get((Object)"a"));
        Assert.assertFalse((boolean)this.store.range((Object)"a", (Object)"b").hasNext());
        Assert.assertFalse((boolean)this.store.all().hasNext());
    }

    @Test
    public void shouldNotShowItemsDeletedFromCacheButFlushedToStoreBeforeDelete() throws Exception {
        this.store.put((Object)"a", (Object)"a");
        this.store.flush();
        this.store.delete((Object)"a");
        Assert.assertNull((Object)this.store.get((Object)"a"));
        Assert.assertFalse((boolean)this.store.range((Object)"a", (Object)"b").hasNext());
        Assert.assertFalse((boolean)this.store.all().hasNext());
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() throws Exception {
        this.store.put((Object)"a", (Object)"a");
        Assert.assertEquals((long)1L, (long)this.cache.size());
        this.store.close();
        Assert.assertEquals((long)0L, (long)this.cache.size());
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToGetFromClosedCachingStore() throws Exception {
        this.store.close();
        this.store.get((Object)"a");
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception {
        this.store.close();
        this.store.put((Object)"a", (Object)"a");
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() throws Exception {
        this.store.close();
        this.store.range((Object)"a", (Object)"b");
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoAllQueryOnClosedCachingStore() throws Exception {
        this.store.close();
        this.store.all();
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoGetApproxSizeOnClosedCachingStore() throws Exception {
        this.store.close();
        this.store.approximateNumEntries();
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoPutAllClosedCachingStore() throws Exception {
        this.store.close();
        this.store.putAll(Collections.singletonList(KeyValue.pair((Object)"a", (Object)"a")));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoPutIfAbsentClosedCachingStore() throws Exception {
        this.store.close();
        this.store.putIfAbsent((Object)"b", (Object)"c");
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDeleteFromClosedCachingStore() throws Exception {
        this.store.close();
        this.store.delete((Object)"key");
    }

    @Test
    public void shouldReturnNullIfKeyIsNull() throws Exception {
        Assert.assertNull((Object)this.store.get(null));
    }

    private int addItemsToCache() throws IOException {
        String kv;
        int i = 0;
        for (int cachedSize = 0; cachedSize < 150; cachedSize += ThreadCacheTest.memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), this.topic)) {
            kv = String.valueOf(i++);
            this.store.put((Object)kv, (Object)kv);
        }
        return i;
    }

    public static class CacheFlushListenerStub<K>
    implements CacheFlushListener<K, String> {
        final Map<K, Change<String>> forwarded = new HashMap<K, Change<String>>();

        public void apply(K key, String newValue, String oldValue) {
            this.forwarded.put(key, (Change<String>)new Change((Object)newValue, (Object)oldValue));
        }
    }
}

