/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.CachingSessionStore;
import org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.RocksDBSessionStore;
import org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.Segments;
import org.apache.kafka.streams.state.internals.SessionKeySchema;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CachingSessionStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 600;
    private final StateSerdes<String, Long> serdes = new StateSerdes("name", Serdes.String(), Serdes.Long());
    private MockProcessorContext context;
    private RocksDBSegmentedBytesStore underlying;
    private CachingSessionStore<String, Long> cachingStore;
    private ThreadCache cache;
    private static final Long DEFAULT_TIMESTAMP = 10L;

    @Before
    public void setUp() throws Exception {
        SessionKeySchema schema = new SessionKeySchema();
        schema.init("topic");
        int retention = 60000;
        int numSegments = 3;
        this.underlying = new RocksDBSegmentedBytesStore("test", 60000L, 3, (SegmentedBytesStore.KeySchema)schema);
        RocksDBSessionStore sessionStore = new RocksDBSessionStore((SegmentedBytesStore)this.underlying, Serdes.Bytes(), Serdes.ByteArray());
        this.cachingStore = new CachingSessionStore((SessionStore)sessionStore, Serdes.String(), Serdes.Long(), Segments.segmentInterval((long)60000L, (int)3));
        this.cache = new ThreadCache("testCache", 600L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()));
        this.context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector)null, this.cache);
        this.context.setRecordContext((RecordContext)new ProcessorRecordContext(DEFAULT_TIMESTAMP.longValue(), 0L, 0, "topic"));
        this.cachingStore.init((ProcessorContext)this.context, this.cachingStore);
    }

    @After
    public void close() {
        this.context.close();
        this.cachingStore.close();
    }

    @Test
    public void shouldPutFetchFromCache() throws Exception {
        this.cachingStore.put(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)1L);
        this.cachingStore.put(new Windowed((Object)"aa", (Window)new SessionWindow(0L, 0L)), (Object)1L);
        this.cachingStore.put(new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), (Object)1L);
        Assert.assertEquals((long)3L, (long)this.cache.size());
        KeyValueIterator a = this.cachingStore.findSessions((Object)"a", 0L, 0L);
        KeyValueIterator b = this.cachingStore.findSessions((Object)"b", 0L, 0L);
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)1L), (Object)a.next());
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), (Object)1L), (Object)b.next());
        Assert.assertFalse((boolean)a.hasNext());
        Assert.assertFalse((boolean)b.hasNext());
    }

    @Test
    public void shouldPutFetchAllKeysFromCache() throws Exception {
        this.cachingStore.put(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)1L);
        this.cachingStore.put(new Windowed((Object)"aa", (Window)new SessionWindow(0L, 0L)), (Object)1L);
        this.cachingStore.put(new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), (Object)1L);
        Assert.assertEquals((long)3L, (long)this.cache.size());
        KeyValueIterator all = this.cachingStore.findSessions((Object)"a", (Object)"b", 0L, 0L);
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)1L), (Object)all.next());
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"aa", (Window)new SessionWindow(0L, 0L)), (Object)1L), (Object)all.next());
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), (Object)1L), (Object)all.next());
        Assert.assertFalse((boolean)all.hasNext());
    }

    @Test
    public void shouldPutFetchRangeFromCache() throws Exception {
        this.cachingStore.put(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)1L);
        this.cachingStore.put(new Windowed((Object)"aa", (Window)new SessionWindow(0L, 0L)), (Object)1L);
        this.cachingStore.put(new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), (Object)1L);
        Assert.assertEquals((long)3L, (long)this.cache.size());
        KeyValueIterator some = this.cachingStore.findSessions((Object)"aa", (Object)"b", 0L, 0L);
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"aa", (Window)new SessionWindow(0L, 0L)), (Object)1L), (Object)some.next());
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), (Object)1L), (Object)some.next());
        Assert.assertFalse((boolean)some.hasNext());
    }

    @Test
    public void shouldFetchAllSessionsWithSameRecordKey() throws Exception {
        List<KeyValue> expected = Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(10L, 10L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(100L, 100L)), (Object)3L), KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(1000L, 1000L)), (Object)4L));
        for (KeyValue kv : expected) {
            this.cachingStore.put((Windowed)kv.key, kv.value);
        }
        this.cachingStore.put(new Windowed((Object)"aa", (Window)new SessionWindow(0L, 0L)), (Object)5L);
        List<KeyValue<Windowed<String>, Long>> results = RocksDBSessionStoreTest.toList((KeyValueIterator<Windowed<String>, Long>)this.cachingStore.fetch((Object)"a"));
        Assert.assertEquals(expected, results);
    }

    @Test
    public void shouldFlushItemsToStoreOnEviction() throws Exception {
        List<KeyValue<Windowed<String>, Long>> added = this.addSessionsUntilOverflow("a", "b", "c", "d");
        Assert.assertEquals((long)(added.size() - 1), (long)this.cache.size());
        KeyValueIterator iterator = this.underlying.fetch(Bytes.wrap((byte[])((String)((Windowed)added.get((int)0).key).key()).getBytes()), 0L, 0L);
        KeyValue next = (KeyValue)iterator.next();
        Assert.assertEquals((Object)added.get((int)0).key, (Object)SessionKeySerde.from((byte[])((Bytes)next.key).get(), (Deserializer)Serdes.String().deserializer(), (String)"dummy"));
        Assert.assertArrayEquals((byte[])this.serdes.rawValue(added.get((int)0).value), (byte[])((byte[])next.value));
    }

    @Test
    public void shouldQueryItemsInCacheAndStore() throws Exception {
        List<KeyValue<Windowed<String>, Long>> added = this.addSessionsUntilOverflow("a");
        KeyValueIterator iterator = this.cachingStore.findSessions((Object)"a", 0L, (long)(added.size() * 10));
        List<KeyValue<Windowed<String>, Long>> actual = RocksDBSessionStoreTest.toList((KeyValueIterator<Windowed<String>, Long>)iterator);
        Assert.assertEquals(added, actual);
    }

    @Test
    public void shouldRemove() throws Exception {
        Windowed a = new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L));
        Windowed b = new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L));
        this.cachingStore.put(a, (Object)2L);
        this.cachingStore.put(b, (Object)2L);
        this.cachingStore.flush();
        this.cachingStore.remove(a);
        this.cachingStore.flush();
        KeyValueIterator rangeIter = this.cachingStore.findSessions((Object)"a", 0L, 0L);
        Assert.assertFalse((boolean)rangeIter.hasNext());
    }

    @Test
    public void shouldFetchCorrectlyAcrossSegments() throws Exception {
        Windowed a1 = new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L));
        Windowed a2 = new Windowed((Object)"a", (Window)new SessionWindow(60000L, 60000L));
        Windowed a3 = new Windowed((Object)"a", (Window)new SessionWindow(120000L, 120000L));
        this.cachingStore.put(a1, (Object)1L);
        this.cachingStore.put(a2, (Object)2L);
        this.cachingStore.put(a3, (Object)3L);
        this.cachingStore.flush();
        KeyValueIterator results = this.cachingStore.findSessions((Object)"a", 0L, 120000L);
        Assert.assertEquals((Object)a1, (Object)((KeyValue)results.next()).key);
        Assert.assertEquals((Object)a2, (Object)((KeyValue)results.next()).key);
        Assert.assertEquals((Object)a3, (Object)((KeyValue)results.next()).key);
        Assert.assertFalse((boolean)results.hasNext());
    }

    @Test
    public void shouldFetchRangeCorrectlyAcrossSegments() throws Exception {
        Windowed a1 = new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L));
        Windowed aa1 = new Windowed((Object)"aa", (Window)new SessionWindow(0L, 0L));
        Windowed a2 = new Windowed((Object)"a", (Window)new SessionWindow(60000L, 60000L));
        Windowed a3 = new Windowed((Object)"a", (Window)new SessionWindow(120000L, 120000L));
        Windowed aa3 = new Windowed((Object)"aa", (Window)new SessionWindow(120000L, 120000L));
        this.cachingStore.put(a1, (Object)1L);
        this.cachingStore.put(aa1, (Object)1L);
        this.cachingStore.put(a2, (Object)2L);
        this.cachingStore.put(a3, (Object)3L);
        this.cachingStore.put(aa3, (Object)3L);
        this.cachingStore.flush();
        KeyValueIterator rangeResults = this.cachingStore.findSessions((Object)"a", (Object)"aa", 0L, 120000L);
        Assert.assertEquals((Object)a1, (Object)((KeyValue)rangeResults.next()).key);
        Assert.assertEquals((Object)aa1, (Object)((KeyValue)rangeResults.next()).key);
        Assert.assertEquals((Object)a2, (Object)((KeyValue)rangeResults.next()).key);
        Assert.assertEquals((Object)a3, (Object)((KeyValue)rangeResults.next()).key);
        Assert.assertEquals((Object)aa3, (Object)((KeyValue)rangeResults.next()).key);
        Assert.assertFalse((boolean)rangeResults.hasNext());
    }

    @Test
    public void shouldForwardChangedValuesDuringFlush() throws Exception {
        Windowed a = new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L));
        final ArrayList flushed = new ArrayList();
        this.cachingStore.setFlushListener((CacheFlushListener)new CacheFlushListener<Windowed<String>, Long>(){

            public void apply(Windowed<String> key, Long newValue, Long oldValue) {
                flushed.add(KeyValue.pair(key, (Object)new Change((Object)newValue, (Object)oldValue)));
            }
        });
        this.cachingStore.put(a, (Object)1L);
        this.cachingStore.flush();
        this.cachingStore.put(a, (Object)2L);
        this.cachingStore.flush();
        this.cachingStore.remove(a);
        this.cachingStore.flush();
        Assert.assertEquals(flushed, Arrays.asList(KeyValue.pair((Object)a, (Object)new Change((Object)1L, null)), KeyValue.pair((Object)a, (Object)new Change((Object)2L, (Object)1L)), KeyValue.pair((Object)a, (Object)new Change(null, (Object)2L))));
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() throws Exception {
        Windowed a1 = new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L));
        this.cachingStore.put(a1, (Object)1L);
        Assert.assertEquals((long)1L, (long)this.cache.size());
        this.cachingStore.close();
        Assert.assertEquals((long)0L, (long)this.cache.size());
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFetchFromClosedCachingStore() throws Exception {
        this.cachingStore.close();
        this.cachingStore.fetch((Object)"a");
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() throws Exception {
        this.cachingStore.close();
        this.cachingStore.findSessions((Object)"a", 0L, Long.MAX_VALUE);
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToRemoveFromClosedCachingStore() throws Exception {
        this.cachingStore.close();
        this.cachingStore.remove(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToPutIntoClosedCachingStore() throws Exception {
        this.cachingStore.close();
        this.cachingStore.put(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)1L);
    }

    private List<KeyValue<Windowed<String>, Long>> addSessionsUntilOverflow(String ... sessionIds) {
        Random random = new Random();
        ArrayList<KeyValue<Windowed<String>, Long>> results = new ArrayList<KeyValue<Windowed<String>, Long>>();
        while (this.cache.size() == (long)results.size()) {
            String sessionId = sessionIds[random.nextInt(sessionIds.length)];
            this.addSingleSession(sessionId, results);
        }
        return results;
    }

    private void addSingleSession(String sessionId, List<KeyValue<Windowed<String>, Long>> allSessions) {
        int timestamp = allSessions.size() * 10;
        Windowed key = new Windowed((Object)sessionId, (Window)new SessionWindow((long)timestamp, (long)timestamp));
        Long value = 1L;
        this.cachingStore.put(key, (Object)value);
        allSessions.add((KeyValue<Windowed<String>, Long>)KeyValue.pair((Object)key, (Object)value));
    }
}

