/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.junit.Assert;
import org.junit.Test;

public class ThreadCacheTest {
    @Test
    public void basicPutGet() throws IOException {
        byte[] key;
        int i;
        List<KeyValue> toInsert = Arrays.asList(new KeyValue((Object)"K1", (Object)"V1"), new KeyValue((Object)"K2", (Object)"V2"), new KeyValue((Object)"K3", (Object)"V3"), new KeyValue((Object)"K4", (Object)"V4"), new KeyValue((Object)"K5", (Object)"V5"));
        KeyValue kv = toInsert.get(0);
        String name = "name";
        ThreadCache cache = new ThreadCache("testCache", (long)(toInsert.size() * ThreadCacheTest.memoryCacheEntrySize(((String)kv.key).getBytes(), ((String)kv.value).getBytes(), "")), (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        for (i = 0; i < toInsert.size(); ++i) {
            key = ((String)toInsert.get((int)i).key).getBytes();
            byte[] value = ((String)toInsert.get((int)i).value).getBytes();
            cache.put("name", key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
        }
        for (i = 0; i < toInsert.size(); ++i) {
            key = ((String)toInsert.get((int)i).key).getBytes();
            LRUCacheEntry entry = cache.get("name", key);
            Assert.assertEquals((Object)entry.isDirty, (Object)true);
            Assert.assertEquals((Object)new String(entry.value), (Object)toInsert.get((int)i).value);
        }
        Assert.assertEquals((long)cache.gets(), (long)5L);
        Assert.assertEquals((long)cache.puts(), (long)5L);
        Assert.assertEquals((long)cache.evicts(), (long)0L);
        Assert.assertEquals((long)cache.flushes(), (long)0L);
    }

    private void checkOverheads(double entryFactor, double systemFactor, long desiredCacheSize, int keySizeBytes, int valueSizeBytes) {
        Runtime runtime = Runtime.getRuntime();
        byte[] key = new byte[keySizeBytes];
        byte[] value = new byte[valueSizeBytes];
        String name = "name";
        long numElements = desiredCacheSize / (long)ThreadCacheTest.memoryCacheEntrySize(key, value, "");
        System.gc();
        long prevRuntimeMemory = runtime.totalMemory() - runtime.freeMemory();
        ThreadCache cache = new ThreadCache("testCache", desiredCacheSize, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        long size = cache.sizeBytes();
        Assert.assertEquals((long)size, (long)0L);
        int i = 0;
        while ((long)i < numElements) {
            String keyStr = "K" + i;
            key = keyStr.getBytes();
            value = new byte[valueSizeBytes];
            cache.put("name", key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
            ++i;
        }
        System.gc();
        double ceiling = (double)desiredCacheSize + (double)desiredCacheSize * entryFactor;
        long usedRuntimeMemory = runtime.totalMemory() - runtime.freeMemory() - prevRuntimeMemory;
        Assert.assertTrue(((double)cache.sizeBytes() <= ceiling ? 1 : 0) != 0);
        Assert.assertTrue((String)("Used memory size " + usedRuntimeMemory + " greater than expected " + (double)cache.sizeBytes() * systemFactor), ((double)cache.sizeBytes() * systemFactor >= (double)usedRuntimeMemory ? 1 : 0) != 0);
    }

    @Test
    public void cacheOverheadsSmallValues() {
        Runtime runtime = Runtime.getRuntime();
        double factor = 0.05;
        double systemFactor = 3.0;
        long desiredCacheSize = Math.min(0x6400000L, runtime.maxMemory());
        int keySizeBytes = 8;
        int valueSizeBytes = 100;
        this.checkOverheads(factor, systemFactor, desiredCacheSize, keySizeBytes, valueSizeBytes);
    }

    @Test
    public void cacheOverheadsLargeValues() {
        Runtime runtime = Runtime.getRuntime();
        double factor = 0.05;
        double systemFactor = 2.0;
        long desiredCacheSize = Math.min(0x6400000L, runtime.maxMemory());
        int keySizeBytes = 8;
        int valueSizeBytes = 1000;
        this.checkOverheads(factor, systemFactor, desiredCacheSize, keySizeBytes, valueSizeBytes);
    }

    static int memoryCacheEntrySize(byte[] key, byte[] value, String topic) {
        return key.length + value.length + 1 + 8 + 8 + 4 + topic.length() + key.length + 8 + 8 + 8;
    }

    @Test
    public void evict() throws IOException {
        int i;
        final ArrayList received = new ArrayList();
        List<KeyValue> expected = Arrays.asList(new KeyValue((Object)"K1", (Object)"V1"));
        List<KeyValue> toInsert = Arrays.asList(new KeyValue((Object)"K1", (Object)"V1"), new KeyValue((Object)"K2", (Object)"V2"), new KeyValue((Object)"K3", (Object)"V3"), new KeyValue((Object)"K4", (Object)"V4"), new KeyValue((Object)"K5", (Object)"V5"));
        KeyValue kv = toInsert.get(0);
        String namespace = "kafka";
        ThreadCache cache = new ThreadCache("testCache", (long)ThreadCacheTest.memoryCacheEntrySize(((String)kv.key).getBytes(), ((String)kv.value).getBytes(), ""), (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        cache.addDirtyEntryFlushListener("kafka", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
                for (ThreadCache.DirtyEntry dirtyEntry : dirty) {
                    received.add(new KeyValue((Object)dirtyEntry.key().toString(), (Object)new String(dirtyEntry.newValue())));
                }
            }
        });
        for (i = 0; i < toInsert.size(); ++i) {
            byte[] key = ((String)toInsert.get((int)i).key).getBytes();
            byte[] value = ((String)toInsert.get((int)i).value).getBytes();
            cache.put("kafka", key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
        }
        for (i = 0; i < expected.size(); ++i) {
            KeyValue expectedRecord = expected.get(i);
            KeyValue actualRecord = (KeyValue)received.get(i);
            Assert.assertEquals((Object)expectedRecord, (Object)actualRecord);
        }
        Assert.assertEquals((long)cache.evicts(), (long)4L);
    }

    @Test
    public void shouldDelete() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 10000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        byte[] key = new byte[]{0};
        cache.put("name", key, this.dirtyEntry(key));
        Assert.assertEquals((Object)key, (Object)cache.delete((String)"name", (byte[])key).value);
        Assert.assertNull((Object)cache.get("name", key));
    }

    @Test
    public void shouldNotFlushAfterDelete() throws Exception {
        byte[] key = new byte[]{0};
        ThreadCache cache = new ThreadCache("testCache", 10000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        final ArrayList received = new ArrayList();
        String namespace = "namespace";
        cache.addDirtyEntryFlushListener("namespace", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
                received.addAll(dirty);
            }
        });
        cache.put("namespace", key, this.dirtyEntry(key));
        Assert.assertEquals((Object)key, (Object)cache.delete((String)"namespace", (byte[])key).value);
        cache.flush("namespace");
        Assert.assertEquals((long)0L, (long)received.size());
        Assert.assertEquals((long)cache.flushes(), (long)1L);
    }

    @Test
    public void shouldNotBlowUpOnNonExistentKeyWhenDeleting() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 10000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        byte[] key = new byte[]{0};
        cache.put("name", key, this.dirtyEntry(key));
        Assert.assertNull((Object)cache.delete("name", new byte[]{1}));
    }

    @Test
    public void shouldNotBlowUpOnNonExistentNamespaceWhenDeleting() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 10000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        Assert.assertNull((Object)cache.delete("name", new byte[]{1}));
    }

    @Test
    public void shouldNotClashWithOverlappingNames() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 10000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        byte[] nameByte = new byte[]{0};
        byte[] name1Byte = new byte[]{1};
        cache.put("name", nameByte, this.dirtyEntry(nameByte));
        cache.put("name1", nameByte, this.dirtyEntry(name1Byte));
        Assert.assertArrayEquals((byte[])nameByte, (byte[])cache.get((String)"name", (byte[])nameByte).value);
        Assert.assertArrayEquals((byte[])name1Byte, (byte[])cache.get((String)"name1", (byte[])nameByte).value);
    }

    @Test
    public void shouldPeekNextKey() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 10000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        byte[] theByte = new byte[]{0};
        String namespace = "streams";
        cache.put("streams", theByte, this.dirtyEntry(theByte));
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("streams", theByte, new byte[]{1});
        Assert.assertEquals((Object)Bytes.wrap((byte[])theByte), (Object)iterator.peekNextKey());
        Assert.assertEquals((Object)Bytes.wrap((byte[])theByte), (Object)iterator.peekNextKey());
    }

    @Test
    public void shouldGetSameKeyAsPeekNext() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 10000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        byte[] theByte = new byte[]{0};
        String namespace = "streams";
        cache.put("streams", theByte, this.dirtyEntry(theByte));
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("streams", theByte, new byte[]{1});
        Assert.assertEquals((Object)iterator.peekNextKey(), (Object)iterator.next().key);
    }

    @Test(expected=NoSuchElementException.class)
    public void shouldThrowIfNoPeekNextKey() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 10000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", new byte[]{0}, new byte[]{1});
        iterator.peekNextKey();
    }

    @Test
    public void shouldReturnFalseIfNoNextKey() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 10000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", new byte[]{0}, new byte[]{1});
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldPeekAndIterateOverRange() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 10000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
        String namespace = "streams";
        for (byte[] aByte : bytes) {
            cache.put("streams", aByte, this.dirtyEntry(aByte));
        }
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("streams", new byte[]{1}, new byte[]{4});
        int bytesIndex = 1;
        while (iterator.hasNext()) {
            Bytes peekedKey = iterator.peekNextKey();
            KeyValue next = iterator.next();
            Assert.assertArrayEquals((byte[])bytes[bytesIndex], (byte[])peekedKey.get());
            Assert.assertArrayEquals((byte[])bytes[bytesIndex], (byte[])((Bytes)next.key).get());
            ++bytesIndex;
        }
        Assert.assertEquals((long)5L, (long)bytesIndex);
    }

    @Test
    public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() throws Exception {
        String namespace = "streams";
        int entrySize = ThreadCacheTest.memoryCacheEntrySize(new byte[1], new byte[1], "");
        ThreadCache cache = new ThreadCache("testCache", (long)(entrySize * 5), (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        cache.addDirtyEntryFlushListener("streams", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
            }
        });
        byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}};
        for (int i = 0; i < 5; ++i) {
            cache.put("streams", bytes[i], this.dirtyEntry(bytes[i]));
        }
        Assert.assertEquals((long)5L, (long)cache.size());
        ThreadCache.MemoryLRUCacheBytesIterator range = cache.range("streams", new byte[]{0}, new byte[]{5});
        cache.put("streams", new byte[]{6}, this.dirtyEntry(new byte[]{6}));
        Assert.assertEquals((Object)Bytes.wrap((byte[])new byte[]{1}), (Object)range.peekNextKey());
    }

    @Test
    public void shouldFlushDirtyEntriesForNamespace() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 100000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        final ArrayList received = new ArrayList();
        cache.addDirtyEntryFlushListener("1", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
                for (ThreadCache.DirtyEntry dirtyEntry : dirty) {
                    received.add(dirtyEntry.key().get());
                }
            }
        });
        List<byte[]> expected = Arrays.asList({0}, {1}, {2});
        for (byte[] bytes : expected) {
            cache.put("1", bytes, this.dirtyEntry(bytes));
        }
        cache.put("2", new byte[]{4}, this.dirtyEntry(new byte[]{4}));
        cache.flush("1");
        Assert.assertEquals(expected, received);
    }

    @Test
    public void shouldNotFlushCleanEntriesForNamespace() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 100000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        final ArrayList received = new ArrayList();
        cache.addDirtyEntryFlushListener("1", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
                for (ThreadCache.DirtyEntry dirtyEntry : dirty) {
                    received.add(dirtyEntry.key().get());
                }
            }
        });
        List<byte[]> toInsert = Arrays.asList({0}, {1}, {2});
        for (byte[] bytes : toInsert) {
            cache.put("1", bytes, this.cleanEntry(bytes));
        }
        cache.put("2", new byte[]{4}, this.cleanEntry(new byte[]{4}));
        cache.flush("1");
        Assert.assertEquals((Object)Collections.EMPTY_LIST, received);
    }

    private void shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(ThreadCache cache) {
        final ArrayList received = new ArrayList();
        String namespace = "namespace";
        cache.addDirtyEntryFlushListener("namespace", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
                received.addAll(dirty);
            }
        });
        cache.put("namespace", new byte[]{0}, this.dirtyEntry(new byte[]{0}));
        Assert.assertEquals((long)1L, (long)received.size());
        cache.flush("namespace");
        Assert.assertEquals((long)1L, (long)received.size());
    }

    @Test
    public void shouldEvictImmediatelyIfCacheSizeIsVerySmall() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 1L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        this.shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
    }

    @Test
    public void shouldEvictImmediatelyIfCacheSizeIsZero() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 0L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        this.shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
    }

    @Test
    public void shouldEvictAfterPutAll() throws Exception {
        final ArrayList received = new ArrayList();
        String namespace = "namespace";
        ThreadCache cache = new ThreadCache("testCache", 1L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        cache.addDirtyEntryFlushListener("namespace", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
                received.addAll(dirty);
            }
        });
        cache.putAll("namespace", Arrays.asList(KeyValue.pair((Object)new byte[]{0}, (Object)this.dirtyEntry(new byte[]{5})), KeyValue.pair((Object)new byte[]{1}, (Object)this.dirtyEntry(new byte[]{6}))));
        Assert.assertEquals((long)cache.evicts(), (long)2L);
    }

    @Test
    public void shouldPutAll() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 100000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        cache.putAll("name", Arrays.asList(KeyValue.pair((Object)new byte[]{0}, (Object)this.dirtyEntry(new byte[]{5})), KeyValue.pair((Object)new byte[]{1}, (Object)this.dirtyEntry(new byte[]{6}))));
        Assert.assertArrayEquals((byte[])new byte[]{5}, (byte[])cache.get((String)"name", (byte[])new byte[]{0}).value);
        Assert.assertArrayEquals((byte[])new byte[]{6}, (byte[])cache.get((String)"name", (byte[])new byte[]{1}).value);
    }

    @Test
    public void shouldNotForwardCleanEntryOnEviction() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 0L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        final ArrayList received = new ArrayList();
        cache.addDirtyEntryFlushListener("name", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
                received.addAll(dirty);
            }
        });
        cache.put("name", new byte[]{1}, this.cleanEntry(new byte[]{0}));
        Assert.assertEquals((long)0L, (long)received.size());
    }

    @Test
    public void shouldPutIfAbsent() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 100000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        byte[] key = new byte[]{10};
        byte[] value = new byte[]{30};
        Assert.assertNull((Object)cache.putIfAbsent("n", key, this.dirtyEntry(value)));
        Assert.assertArrayEquals((byte[])value, (byte[])cache.putIfAbsent((String)"n", (byte[])key, (LRUCacheEntry)this.dirtyEntry((byte[])new byte[]{8})).value);
        Assert.assertArrayEquals((byte[])value, (byte[])cache.get((String)"n", (byte[])key).value);
    }

    @Test
    public void shouldEvictAfterPutIfAbsent() throws Exception {
        final ArrayList received = new ArrayList();
        String namespace = "namespace";
        ThreadCache cache = new ThreadCache("testCache", 1L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        cache.addDirtyEntryFlushListener("namespace", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
                received.addAll(dirty);
            }
        });
        cache.putIfAbsent("namespace", new byte[]{0}, this.dirtyEntry(new byte[]{5}));
        cache.putIfAbsent("namespace", new byte[]{1}, this.dirtyEntry(new byte[]{6}));
        cache.putIfAbsent("namespace", new byte[]{1}, this.dirtyEntry(new byte[]{6}));
        Assert.assertEquals((long)cache.evicts(), (long)3L);
    }

    @Test
    public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() throws Exception {
        int maxCacheSizeInBytes = 100;
        final ThreadCache threadCache = new ThreadCache("testCache", 100L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        threadCache.addDirtyEntryFlushListener("name", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
                threadCache.put("other", new byte[]{0}, ThreadCacheTest.this.dirtyEntry(new byte[2]));
            }
        });
        threadCache.addDirtyEntryFlushListener("other", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
            }
        });
        threadCache.addDirtyEntryFlushListener("another", new ThreadCache.DirtyEntryFlushListener(){

            public void apply(List<ThreadCache.DirtyEntry> dirty) {
            }
        });
        threadCache.put("another", new byte[]{1}, this.dirtyEntry(new byte[1]));
        threadCache.put("name", new byte[]{1}, this.dirtyEntry(new byte[1]));
        int remaining = (int)(100L - threadCache.sizeBytes());
        threadCache.put("name", new byte[]{2}, this.dirtyEntry(new byte[remaining + 100]));
    }

    @Test
    public void shouldCleanupNamedCacheOnClose() throws Exception {
        ThreadCache cache = new ThreadCache("testCache", 100000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        cache.put("one", new byte[]{1}, this.cleanEntry(new byte[]{1}));
        cache.put("two", new byte[]{1}, this.cleanEntry(new byte[]{1}));
        Assert.assertEquals((long)cache.size(), (long)2L);
        cache.close("two");
        Assert.assertEquals((long)cache.size(), (long)1L);
        Assert.assertNull((Object)cache.get("two", new byte[]{1}));
    }

    @Test
    public void shouldReturnNullIfKeyIsNull() throws Exception {
        ThreadCache threadCache = new ThreadCache("testCache", 10L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        threadCache.put("one", new byte[]{1}, this.cleanEntry(new byte[]{1}));
        Assert.assertNull((Object)threadCache.get("one", null));
    }

    private LRUCacheEntry dirtyEntry(byte[] key) {
        return new LRUCacheEntry(key, true, -1L, -1L, -1, "");
    }

    private LRUCacheEntry cleanEntry(byte[] key) {
        return new LRUCacheEntry(key);
    }
}

