/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.AbstractKeyValueStoreTest;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCacheTest;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CachingKeyValueStoreTest
extends AbstractKeyValueStoreTest {
    private final int maxCacheSizeBytes = 150;
    private InternalMockProcessorContext context;
    private CachingKeyValueStore<String, String> store;
    private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
    private ThreadCache cache;
    private CacheFlushListenerStub<String, String> cacheFlushListener;
    private String topic;

    @Before
    public void setUp() {
        String storeName = "store";
        this.underlyingStore = new InMemoryKeyValueStore("store", Serdes.Bytes(), Serdes.ByteArray());
        this.cacheFlushListener = new CacheFlushListenerStub();
        this.store = new CachingKeyValueStore(this.underlyingStore, Serdes.String(), Serdes.String());
        this.store.setFlushListener(this.cacheFlushListener, false);
        this.cache = new ThreadCache(new LogContext("testCache "), 150L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        this.context = new InternalMockProcessorContext(null, null, null, null, this.cache);
        this.topic = "topic";
        this.context.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, this.topic, null));
        this.store.init((ProcessorContext)this.context, null);
    }

    @Override
    @After
    public void after() {
        super.after();
    }

    @Override
    protected <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context) {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"cache-store"), (Serde)context.keySerde(), (Serde)context.valueSerde()).withCachingEnabled();
        KeyValueStore store = (KeyValueStore)storeBuilder.build();
        CacheFlushListenerStub cacheFlushListener = new CacheFlushListenerStub();
        CachedStateStore inner = (CachedStateStore)((WrappedStateStore)store).wrapped();
        inner.setFlushListener(cacheFlushListener, false);
        store.init(context, (StateStore)store);
        return store;
    }

    @Test
    public void shouldAvoidFlushingDeletionsWithoutDirtyKeys() {
        int added = this.addItemsToCache();
        Assert.assertEquals((long)added, (long)this.underlyingStore.approximateNumEntries());
        Assert.assertEquals((long)added, (long)this.cacheFlushListener.forwarded.size());
        this.store.put(this.bytesKey("key"), this.bytesValue("value"));
        Assert.assertEquals((long)added, (long)this.underlyingStore.approximateNumEntries());
        Assert.assertEquals((long)added, (long)this.cacheFlushListener.forwarded.size());
        this.store.put(this.bytesKey("key"), null);
        this.store.flush();
        Assert.assertEquals((long)added, (long)this.underlyingStore.approximateNumEntries());
        Assert.assertEquals((long)added, (long)this.cacheFlushListener.forwarded.size());
    }

    @Test
    public void shouldCloseAfterErrorWithFlush() {
        try {
            this.cache = (ThreadCache)EasyMock.niceMock(ThreadCache.class);
            this.context = new InternalMockProcessorContext(null, null, null, null, this.cache);
            this.context.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, this.topic, null));
            this.store.init((ProcessorContext)this.context, null);
            this.cache.flush("0_0-store");
            EasyMock.expectLastCall().andThrow((Throwable)new NullPointerException("Simulating an error on flush"));
            EasyMock.replay((Object[])new Object[]{this.cache});
            this.store.close();
        }
        catch (NullPointerException npe) {
            Assert.assertFalse((boolean)this.underlyingStore.isOpen());
        }
    }

    @Test
    public void shouldPutGetToFromCache() {
        this.store.put(this.bytesKey("key"), this.bytesValue("value"));
        this.store.put(this.bytesKey("key2"), this.bytesValue("value2"));
        Assert.assertThat((Object)this.store.get(this.bytesKey("key")), (Matcher)CoreMatchers.equalTo((Object)this.bytesValue("value")));
        Assert.assertThat((Object)this.store.get(this.bytesKey("key2")), (Matcher)CoreMatchers.equalTo((Object)this.bytesValue("value2")));
        Assert.assertEquals((long)2L, (long)this.cache.size());
        Assert.assertEquals((long)0L, (long)this.underlyingStore.approximateNumEntries());
    }

    private byte[] bytesValue(String value) {
        return value.getBytes();
    }

    private Bytes bytesKey(String key) {
        return Bytes.wrap((byte[])key.getBytes());
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() {
        int added = this.addItemsToCache();
        Assert.assertEquals((long)added, (long)this.underlyingStore.approximateNumEntries());
        Assert.assertEquals((long)added, (long)this.store.approximateNumEntries());
        Assert.assertNotNull((Object)this.underlyingStore.get((Object)Bytes.wrap((byte[])"0".getBytes())));
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() {
        int numRecords = this.addItemsToCache();
        Assert.assertEquals((long)numRecords, (long)this.cacheFlushListener.forwarded.size());
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() {
        this.store.put(this.bytesKey("1"), this.bytesValue("a"));
        this.store.flush();
        Assert.assertEquals((Object)"a", (Object)this.cacheFlushListener.forwarded.get((Object)"1").newValue);
        Assert.assertNull((Object)this.cacheFlushListener.forwarded.get((Object)"1").oldValue);
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() {
        this.store.setFlushListener(this.cacheFlushListener, true);
        this.store.put(this.bytesKey("1"), this.bytesValue("a"));
        this.store.flush();
        Assert.assertEquals((Object)"a", (Object)this.cacheFlushListener.forwarded.get((Object)"1").newValue);
        Assert.assertNull((Object)this.cacheFlushListener.forwarded.get((Object)"1").oldValue);
        this.store.put(this.bytesKey("1"), this.bytesValue("b"));
        this.store.put(this.bytesKey("1"), this.bytesValue("c"));
        this.store.flush();
        Assert.assertEquals((Object)"c", (Object)this.cacheFlushListener.forwarded.get((Object)"1").newValue);
        Assert.assertEquals((Object)"a", (Object)this.cacheFlushListener.forwarded.get((Object)"1").oldValue);
        this.store.put(this.bytesKey("1"), null);
        this.store.flush();
        Assert.assertNull((Object)this.cacheFlushListener.forwarded.get((Object)"1").newValue);
        Assert.assertEquals((Object)"c", (Object)this.cacheFlushListener.forwarded.get((Object)"1").oldValue);
        this.cacheFlushListener.forwarded.clear();
        this.store.put(this.bytesKey("1"), this.bytesValue("a"));
        this.store.put(this.bytesKey("1"), this.bytesValue("b"));
        this.store.put(this.bytesKey("1"), null);
        this.store.flush();
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1"));
        this.cacheFlushListener.forwarded.clear();
    }

    @Test
    public void shouldNotForwardOldValuesWhenDisabled() {
        this.store.put(this.bytesKey("1"), this.bytesValue("a"));
        this.store.flush();
        Assert.assertEquals((Object)"a", (Object)this.cacheFlushListener.forwarded.get((Object)"1").newValue);
        Assert.assertNull((Object)this.cacheFlushListener.forwarded.get((Object)"1").oldValue);
        this.store.put(this.bytesKey("1"), this.bytesValue("b"));
        this.store.flush();
        Assert.assertEquals((Object)"b", (Object)this.cacheFlushListener.forwarded.get((Object)"1").newValue);
        Assert.assertNull((Object)this.cacheFlushListener.forwarded.get((Object)"1").oldValue);
        this.store.put(this.bytesKey("1"), null);
        this.store.flush();
        Assert.assertNull((Object)this.cacheFlushListener.forwarded.get((Object)"1").newValue);
        Assert.assertNull((Object)this.cacheFlushListener.forwarded.get((Object)"1").oldValue);
        this.cacheFlushListener.forwarded.clear();
        this.store.put(this.bytesKey("1"), this.bytesValue("a"));
        this.store.put(this.bytesKey("1"), this.bytesValue("b"));
        this.store.put(this.bytesKey("1"), null);
        this.store.flush();
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1"));
        this.cacheFlushListener.forwarded.clear();
    }

    @Test
    public void shouldIterateAllStoredItems() {
        int items = this.addItemsToCache();
        KeyValueIterator all = this.store.all();
        ArrayList<Object> results = new ArrayList<Object>();
        while (all.hasNext()) {
            results.add(((KeyValue)all.next()).key);
        }
        Assert.assertEquals((long)items, (long)results.size());
    }

    @Test
    public void shouldIterateOverRange() {
        int items = this.addItemsToCache();
        KeyValueIterator range = this.store.range(this.bytesKey(String.valueOf(0)), this.bytesKey(String.valueOf(items)));
        ArrayList<Object> results = new ArrayList<Object>();
        while (range.hasNext()) {
            results.add(((KeyValue)range.next()).key);
        }
        Assert.assertEquals((long)items, (long)results.size());
    }

    @Test
    public void shouldDeleteItemsFromCache() {
        this.store.put(this.bytesKey("a"), this.bytesValue("a"));
        this.store.delete(this.bytesKey("a"));
        Assert.assertNull((Object)this.store.get(this.bytesKey("a")));
        Assert.assertFalse((boolean)this.store.range(this.bytesKey("a"), this.bytesKey("b")).hasNext());
        Assert.assertFalse((boolean)this.store.all().hasNext());
    }

    @Test
    public void shouldNotShowItemsDeletedFromCacheButFlushedToStoreBeforeDelete() {
        this.store.put(this.bytesKey("a"), this.bytesValue("a"));
        this.store.flush();
        this.store.delete(this.bytesKey("a"));
        Assert.assertNull((Object)this.store.get(this.bytesKey("a")));
        Assert.assertFalse((boolean)this.store.range(this.bytesKey("a"), this.bytesKey("b")).hasNext());
        Assert.assertFalse((boolean)this.store.all().hasNext());
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() {
        this.store.put(this.bytesKey("a"), this.bytesValue("a"));
        Assert.assertEquals((long)1L, (long)this.cache.size());
        this.store.close();
        Assert.assertEquals((long)0L, (long)this.cache.size());
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToGetFromClosedCachingStore() {
        this.store.close();
        this.store.get(this.bytesKey("a"));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToWriteToClosedCachingStore() {
        this.store.close();
        this.store.put(this.bytesKey("a"), this.bytesValue("a"));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() {
        this.store.close();
        this.store.range(this.bytesKey("a"), this.bytesKey("b"));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoAllQueryOnClosedCachingStore() {
        this.store.close();
        this.store.all();
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoGetApproxSizeOnClosedCachingStore() {
        this.store.close();
        this.store.approximateNumEntries();
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoPutAllClosedCachingStore() {
        this.store.close();
        this.store.putAll(Collections.singletonList(KeyValue.pair((Object)this.bytesKey("a"), (Object)this.bytesValue("a"))));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoPutIfAbsentClosedCachingStore() {
        this.store.close();
        this.store.putIfAbsent(this.bytesKey("b"), this.bytesValue("c"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnPutWithNullKey() {
        this.store.put(null, this.bytesValue("c"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnPutIfAbsentWithNullKey() {
        this.store.putIfAbsent(null, this.bytesValue("c"));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnPutAllWithNullKey() {
        ArrayList<KeyValue> entries = new ArrayList<KeyValue>();
        entries.add(new KeyValue(null, (Object)this.bytesValue("a")));
        try {
            this.store.putAll(entries);
            Assert.fail((String)"Should have thrown NullPointerException while putAll null key");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldPutIfAbsent() {
        this.store.putIfAbsent(this.bytesKey("b"), this.bytesValue("2"));
        Assert.assertThat((Object)this.store.get(this.bytesKey("b")), (Matcher)CoreMatchers.equalTo((Object)this.bytesValue("2")));
        this.store.putIfAbsent(this.bytesKey("b"), this.bytesValue("3"));
        Assert.assertThat((Object)this.store.get(this.bytesKey("b")), (Matcher)CoreMatchers.equalTo((Object)this.bytesValue("2")));
    }

    @Override
    @Test
    public void shouldPutAll() {
        ArrayList<KeyValue> entries = new ArrayList<KeyValue>();
        entries.add(new KeyValue((Object)this.bytesKey("a"), (Object)this.bytesValue("1")));
        entries.add(new KeyValue((Object)this.bytesKey("b"), (Object)this.bytesValue("2")));
        this.store.putAll(entries);
        Assert.assertThat((Object)this.store.get(this.bytesKey("a")), (Matcher)CoreMatchers.equalTo((Object)this.bytesValue("1")));
        Assert.assertThat((Object)this.store.get(this.bytesKey("b")), (Matcher)CoreMatchers.equalTo((Object)this.bytesValue("2")));
    }

    @Test
    public void shouldReturnUnderlying() {
        Assert.assertEquals((Object)this.store.wrapped(), this.underlyingStore);
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDeleteFromClosedCachingStore() {
        this.store.close();
        this.store.delete(this.bytesKey("key"));
    }

    private int addItemsToCache() {
        String kv;
        int i = 0;
        for (int cachedSize = 0; cachedSize < 150; cachedSize += ThreadCacheTest.memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), this.topic)) {
            kv = String.valueOf(i++);
            this.store.put(this.bytesKey(kv), this.bytesValue(kv));
        }
        return i;
    }

    public static class CacheFlushListenerStub<K, V>
    implements CacheFlushListener<K, V> {
        final Map<K, Change<V>> forwarded = new HashMap<K, Change<V>>();

        public void apply(K key, V newValue, V oldValue, long timestamp) {
            this.forwarded.put(key, new Change(newValue, oldValue));
        }
    }
}

