/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.SegmentedBytesStoreStub;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ChangeLoggingSegmentedBytesStoreTest {
    private final NoOpRecordCollector collector = new NoOpRecordCollector(){

        @Override
        public <K, V> void send(String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
            ChangeLoggingSegmentedBytesStoreTest.this.sent.put(key, value);
        }
    };
    private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), this.collector, new ThreadCache("testCache", 0L, (StreamsMetrics)new MockStreamsMetrics(new Metrics())));
    private final SegmentedBytesStoreStub bytesStore = new SegmentedBytesStoreStub();
    private final ChangeLoggingSegmentedBytesStore store = new ChangeLoggingSegmentedBytesStore((SegmentedBytesStore)this.bytesStore);
    private final Map sent = new HashMap();

    @Before
    public void setUp() throws Exception {
        this.context.setTime(0L);
        this.store.init((ProcessorContext)this.context, (StateStore)this.store);
    }

    @After
    public void after() {
        this.context.close();
        this.store.close();
    }

    @Test
    public void shouldLogPuts() throws Exception {
        byte[] value1 = new byte[]{0};
        byte[] value2 = new byte[]{1};
        Bytes key1 = Bytes.wrap((byte[])value1);
        Bytes key2 = Bytes.wrap((byte[])value2);
        this.store.put(key1, value1);
        this.store.put(key2, value2);
        this.store.flush();
        Assert.assertArrayEquals((byte[])value1, (byte[])((byte[])this.sent.get(key1)));
        Assert.assertArrayEquals((byte[])value2, (byte[])((byte[])this.sent.get(key2)));
    }

    @Test
    public void shouldLogRemoves() throws Exception {
        Bytes key1 = Bytes.wrap((byte[])new byte[]{0});
        Bytes key2 = Bytes.wrap((byte[])new byte[]{1});
        this.store.remove(key1);
        this.store.remove(key2);
        this.store.flush();
        Assert.assertTrue((boolean)this.sent.containsKey(key1));
        Assert.assertTrue((boolean)this.sent.containsKey(key2));
        Assert.assertNull(this.sent.get(key1));
        Assert.assertNull(this.sent.get(key2));
    }

    @Test
    public void shouldDelegateToUnderlyingStoreWhenFetching() throws Exception {
        this.store.fetch(Bytes.wrap((byte[])new byte[0]), 1L, 1L);
        Assert.assertTrue((boolean)this.bytesStore.fetchCalled);
    }

    @Test
    public void shouldFlushUnderlyingStore() throws Exception {
        this.store.flush();
        Assert.assertTrue((boolean)this.bytesStore.flushed);
    }

    @Test
    public void shouldCloseUnderlyingStore() throws Exception {
        this.store.close();
        Assert.assertTrue((boolean)this.bytesStore.closed);
    }

    @Test
    public void shouldInitUnderlyingStore() throws Exception {
        Assert.assertTrue((boolean)this.bytesStore.initialized);
    }
}

