/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.AbstractKeyValueStoreTest;
import org.apache.kafka.streams.state.internals.MonotonicProcessorRecordContext;
import org.apache.kafka.streams.state.internals.PositionSerde;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.powermock.api.easymock.PowerMock;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.Filter;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.PlainTableConfig;
import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import org.rocksdb.TableFormatConfig;

public class RocksDBStoreTest
extends AbstractKeyValueStoreTest {
    private static boolean enableBloomFilters = false;
    static final String DB_NAME = "db-name";
    static final String METRICS_SCOPE = "metrics-scope";
    private File dir;
    private final Time time = new MockTime();
    private final Serializer<String> stringSerializer = new StringSerializer();
    private final Deserializer<String> stringDeserializer = new StringDeserializer();
    private final RocksDBMetricsRecorder metricsRecorder = (RocksDBMetricsRecorder)EasyMock.mock(RocksDBMetricsRecorder.class);
    InternalMockProcessorContext context;
    RocksDBStore rocksDBStore;

    @Before
    public void setUp() {
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig((Map)props));
        this.rocksDBStore = this.getRocksDBStore();
    }

    @After
    public void tearDown() {
        this.rocksDBStore.close();
    }

    @Override
    protected <K, V> KeyValueStore<K, V> createKeyValueStore(StateStoreContext context) {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"my-store"), (Serde)context.keySerde(), (Serde)context.valueSerde());
        KeyValueStore store = (KeyValueStore)storeBuilder.build();
        store.init(context, (StateStore)store);
        return store;
    }

    RocksDBStore getRocksDBStore() {
        return new RocksDBStore(DB_NAME, METRICS_SCOPE);
    }

    private RocksDBStore getRocksDBStoreWithRocksDBMetricsRecorder() {
        return new RocksDBStore(DB_NAME, METRICS_SCOPE, this.metricsRecorder);
    }

    private InternalMockProcessorContext getProcessorContext(Properties streamsProps) {
        return new InternalMockProcessorContext(TestUtils.tempDirectory(), new StreamsConfig((Map)streamsProps));
    }

    private InternalMockProcessorContext getProcessorContext(Sensor.RecordingLevel recordingLevel, Class<? extends RocksDBConfigSetter> rocksDBConfigSetterClass) {
        Properties streamsProps = StreamsTestUtils.getStreamsConfig();
        streamsProps.setProperty("metrics.recording.level", recordingLevel.name());
        streamsProps.put("rocksdb.config.setter", rocksDBConfigSetterClass);
        return this.getProcessorContext(streamsProps);
    }

    private InternalMockProcessorContext getProcessorContext(Sensor.RecordingLevel recordingLevel) {
        Properties streamsProps = StreamsTestUtils.getStreamsConfig();
        streamsProps.setProperty("metrics.recording.level", recordingLevel.name());
        return this.getProcessorContext(streamsProps);
    }

    @Test
    public void shouldAddValueProvidersWithoutStatisticsToInjectedMetricsRecorderWhenRecordingLevelInfo() {
        this.rocksDBStore = this.getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = this.getProcessorContext(Sensor.RecordingLevel.INFO);
        EasyMock.reset((Object[])new Object[]{this.metricsRecorder});
        this.metricsRecorder.addValueProviders((String)EasyMock.eq((Object)DB_NAME), (RocksDB)EasyMock.notNull(), (Cache)EasyMock.notNull(), (Statistics)EasyMock.isNull());
        PowerMock.replay((Object[])new Object[]{this.metricsRecorder});
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        PowerMock.verify((Object[])new Object[]{this.metricsRecorder});
        EasyMock.reset((Object[])new Object[]{this.metricsRecorder});
    }

    @Test
    public void shouldAddValueProvidersWithStatisticsToInjectedMetricsRecorderWhenRecordingLevelDebug() {
        this.rocksDBStore = this.getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = this.getProcessorContext(Sensor.RecordingLevel.DEBUG);
        EasyMock.reset((Object[])new Object[]{this.metricsRecorder});
        this.metricsRecorder.addValueProviders((String)EasyMock.eq((Object)DB_NAME), (RocksDB)EasyMock.notNull(), (Cache)EasyMock.notNull(), (Statistics)EasyMock.notNull());
        PowerMock.replay((Object[])new Object[]{this.metricsRecorder});
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        PowerMock.verify((Object[])new Object[]{this.metricsRecorder});
        EasyMock.reset((Object[])new Object[]{this.metricsRecorder});
    }

    @Test
    public void shouldRemoveValueProvidersFromInjectedMetricsRecorderOnClose() {
        this.rocksDBStore = this.getRocksDBStoreWithRocksDBMetricsRecorder();
        try {
            this.context = this.getProcessorContext(Sensor.RecordingLevel.DEBUG);
            this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
            EasyMock.reset((Object[])new Object[]{this.metricsRecorder});
            this.metricsRecorder.removeValueProviders(DB_NAME);
            PowerMock.replay((Object[])new Object[]{this.metricsRecorder});
        }
        finally {
            this.rocksDBStore.close();
        }
        PowerMock.verify((Object[])new Object[]{this.metricsRecorder});
    }

    @Test
    public void shouldNotSetStatisticsInValueProvidersWhenUserProvidesStatistics() {
        this.rocksDBStore = this.getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = this.getProcessorContext(Sensor.RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
        this.metricsRecorder.addValueProviders((String)EasyMock.eq((Object)DB_NAME), (RocksDB)EasyMock.notNull(), (Cache)EasyMock.notNull(), (Statistics)EasyMock.isNull());
        PowerMock.replay((Object[])new Object[]{this.metricsRecorder});
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        PowerMock.verify((Object[])new Object[]{this.metricsRecorder});
        EasyMock.reset((Object[])new Object[]{this.metricsRecorder});
    }

    @Test
    public void shouldCloseStatisticsWhenUserProvidesStatistics() throws Exception {
        this.rocksDBStore = this.getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = this.getProcessorContext(Sensor.RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        Statistics userStatistics = RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics;
        Statistics statisticsHandle = this.getStatistics(this.rocksDBStore);
        this.rocksDBStore.close();
        Assert.assertFalse((boolean)userStatistics.isOwningHandle());
        Assert.assertFalse((boolean)statisticsHandle.isOwningHandle());
        Assert.assertNull((Object)this.getStatistics(this.rocksDBStore));
        Assert.assertNull((Object)RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics);
    }

    @Test
    public void shouldSetStatisticsInValueProvidersWhenUserProvidesNoStatistics() {
        this.rocksDBStore = this.getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = this.getProcessorContext(Sensor.RecordingLevel.DEBUG);
        this.metricsRecorder.addValueProviders((String)EasyMock.eq((Object)DB_NAME), (RocksDB)EasyMock.notNull(), (Cache)EasyMock.notNull(), (Statistics)EasyMock.notNull());
        PowerMock.replay((Object[])new Object[]{this.metricsRecorder});
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        PowerMock.verify((Object[])new Object[]{this.metricsRecorder});
        EasyMock.reset((Object[])new Object[]{this.metricsRecorder});
    }

    @Test
    public void shouldCloseStatisticsWhenUserProvidesNoStatistics() throws Exception {
        this.rocksDBStore = this.getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = this.getProcessorContext(Sensor.RecordingLevel.DEBUG);
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        Statistics statisticsHandle = this.getStatistics(this.rocksDBStore);
        this.rocksDBStore.close();
        Assert.assertFalse((boolean)statisticsHandle.isOwningHandle());
        Assert.assertNull((Object)this.getStatistics(this.rocksDBStore));
    }

    @Test
    public void shouldThrowWhenUserProvidesNewBlockBasedTableFormatConfig() {
        this.rocksDBStore = this.getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = this.getProcessorContext(Sensor.RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig.class);
        Assert.assertThrows((String)"The used block-based table format configuration does not expose the block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to the RocksDB options.", ProcessorStateException.class, () -> this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir()));
    }

    @Test
    public void shouldNotSetCacheInValueProvidersWhenUserProvidesPlainTableFormatConfig() {
        this.rocksDBStore = this.getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = this.getProcessorContext(Sensor.RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig.class);
        this.metricsRecorder.addValueProviders((String)EasyMock.eq((Object)DB_NAME), (RocksDB)EasyMock.notNull(), (Cache)EasyMock.isNull(), (Statistics)EasyMock.notNull());
        PowerMock.replay((Object[])new Object[]{this.metricsRecorder});
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        PowerMock.verify((Object[])new Object[]{this.metricsRecorder});
        EasyMock.reset((Object[])new Object[]{this.metricsRecorder});
    }

    @Test
    public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.rocksDBStore.put(new Bytes("existingKey".getBytes(StandardCharsets.UTF_8)), "existingValue".getBytes(StandardCharsets.UTF_8));
        this.rocksDBStore.flush();
        ArrayList<KeyValue<byte[], byte[]>> restoreBytes = new ArrayList<KeyValue<byte[], byte[]>>();
        byte[] restoredKey = "restoredKey".getBytes(StandardCharsets.UTF_8);
        byte[] restoredValue = "restoredValue".getBytes(StandardCharsets.UTF_8);
        restoreBytes.add(KeyValue.pair((Object)restoredKey, (Object)restoredValue));
        this.context.restore(DB_NAME, restoreBytes);
        MatcherAssert.assertThat((Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"restoredKey")))), (Matcher)CoreMatchers.equalTo((Object)"restoredValue"));
    }

    @Test
    public void shouldCallRocksDbConfigSetter() {
        MockRocksDbConfigSetter.called = false;
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        Object param = new Object();
        props.put("abc.def", param);
        InternalMockProcessorContext context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig((Map)props));
        this.rocksDBStore.init(context, (StateStore)this.rocksDBStore);
        Assert.assertTrue((boolean)MockRocksDbConfigSetter.called);
        MatcherAssert.assertThat((Object)MockRocksDbConfigSetter.configMap.get("abc.def"), (Matcher)CoreMatchers.equalTo((Object)param));
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() {
        File tmpDir = TestUtils.tempDirectory();
        InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir, new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()));
        Assert.assertTrue((boolean)tmpDir.setReadOnly());
        Assert.assertThrows(ProcessorStateException.class, () -> this.rocksDBStore.openDB(tmpContext.appConfigs(), tmpContext.stateDir()));
    }

    @Override
    @Test
    public void shouldPutAll() {
        ArrayList<KeyValue> entries = new ArrayList<KeyValue>();
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"1")), (Object)this.stringSerializer.serialize(null, (Object)"a")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"2")), (Object)this.stringSerializer.serialize(null, (Object)"b")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"3")), (Object)this.stringSerializer.serialize(null, (Object)"c")));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.rocksDBStore.putAll(entries);
        this.rocksDBStore.flush();
        Assert.assertEquals((Object)"a", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
        Assert.assertEquals((Object)"b", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"2")))));
        Assert.assertEquals((Object)"c", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"3")))));
    }

    @Test
    public void shouldMatchPositionAfterPut() {
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 1L, 0, "", (Headers)new RecordHeaders()));
        this.rocksDBStore.put(new Bytes(this.stringSerializer.serialize(null, (Object)"one")), this.stringSerializer.serialize(null, (Object)"A"));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 2L, 0, "", (Headers)new RecordHeaders()));
        this.rocksDBStore.put(new Bytes(this.stringSerializer.serialize(null, (Object)"two")), this.stringSerializer.serialize(null, (Object)"B"));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 3L, 0, "", (Headers)new RecordHeaders()));
        this.rocksDBStore.put(new Bytes(this.stringSerializer.serialize(null, (Object)"three")), this.stringSerializer.serialize(null, (Object)"C"));
        Position expected = Position.fromMap((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"", (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)0, (Object)3L)}))}));
        Position actual = this.rocksDBStore.getPosition();
        Assert.assertEquals((Object)expected, (Object)actual);
    }

    @Test
    public void shouldReturnKeysWithGivenPrefix() {
        ArrayList<KeyValue> entries = new ArrayList<KeyValue>();
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"k1")), (Object)this.stringSerializer.serialize(null, (Object)"a")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"prefix_3")), (Object)this.stringSerializer.serialize(null, (Object)"b")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"k2")), (Object)this.stringSerializer.serialize(null, (Object)"c")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"prefix_2")), (Object)this.stringSerializer.serialize(null, (Object)"d")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"k3")), (Object)this.stringSerializer.serialize(null, (Object)"e")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"prefix_1")), (Object)this.stringSerializer.serialize(null, (Object)"f")));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.rocksDBStore.putAll(entries);
        this.rocksDBStore.flush();
        try (KeyValueIterator keysWithPrefix = this.rocksDBStore.prefixScan((Object)"prefix", this.stringSerializer);){
            ArrayList<String> valuesWithPrefix = new ArrayList<String>();
            int numberOfKeysReturned = 0;
            while (keysWithPrefix.hasNext()) {
                KeyValue next = (KeyValue)keysWithPrefix.next();
                valuesWithPrefix.add(new String((byte[])next.value));
                ++numberOfKeysReturned;
            }
            MatcherAssert.assertThat((Object)numberOfKeysReturned, (Matcher)CoreMatchers.is((Object)3));
            MatcherAssert.assertThat(valuesWithPrefix.get(0), (Matcher)CoreMatchers.is((Object)"f"));
            MatcherAssert.assertThat(valuesWithPrefix.get(1), (Matcher)CoreMatchers.is((Object)"d"));
            MatcherAssert.assertThat(valuesWithPrefix.get(2), (Matcher)CoreMatchers.is((Object)"b"));
        }
    }

    @Test
    public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() {
        ArrayList<KeyValue> entries = new ArrayList<KeyValue>();
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"abc")), (Object)this.stringSerializer.serialize(null, (Object)"f")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"abcd")), (Object)this.stringSerializer.serialize(null, (Object)"f")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"abce")), (Object)this.stringSerializer.serialize(null, (Object)"f")));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.rocksDBStore.putAll(entries);
        this.rocksDBStore.flush();
        try (KeyValueIterator keysWithPrefixAsabcd = this.rocksDBStore.prefixScan((Object)"abcd", this.stringSerializer);){
            int numberOfKeysReturned = 0;
            while (keysWithPrefixAsabcd.hasNext()) {
                ((Bytes)((KeyValue)keysWithPrefixAsabcd.next()).key).get();
                ++numberOfKeysReturned;
            }
            MatcherAssert.assertThat((Object)numberOfKeysReturned, (Matcher)CoreMatchers.is((Object)1));
        }
    }

    @Test
    public void shouldReturnUUIDsWithStringPrefix() {
        ArrayList<KeyValue> entries = new ArrayList<KeyValue>();
        Serializer uuidSerializer = Serdes.UUID().serializer();
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String prefix = uuid1.toString().substring(0, 4);
        int numMatches = uuid2.toString().substring(0, 4).equals(prefix) ? 2 : 1;
        entries.add(new KeyValue((Object)new Bytes(uuidSerializer.serialize(null, (Object)uuid1)), (Object)this.stringSerializer.serialize(null, (Object)"a")));
        entries.add(new KeyValue((Object)new Bytes(uuidSerializer.serialize(null, (Object)uuid2)), (Object)this.stringSerializer.serialize(null, (Object)"b")));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.rocksDBStore.putAll(entries);
        this.rocksDBStore.flush();
        try (KeyValueIterator keysWithPrefix = this.rocksDBStore.prefixScan((Object)prefix, this.stringSerializer);){
            ArrayList<String> valuesWithPrefix = new ArrayList<String>();
            int numberOfKeysReturned = 0;
            while (keysWithPrefix.hasNext()) {
                KeyValue next = (KeyValue)keysWithPrefix.next();
                valuesWithPrefix.add(new String((byte[])next.value));
                ++numberOfKeysReturned;
            }
            MatcherAssert.assertThat((Object)numberOfKeysReturned, (Matcher)CoreMatchers.is((Object)numMatches));
            if (numMatches == 2) {
                MatcherAssert.assertThat(valuesWithPrefix.get(0), (Matcher)CoreMatchers.either((Matcher)CoreMatchers.is((Object)"a")).or(CoreMatchers.is((Object)"b")));
            } else {
                MatcherAssert.assertThat(valuesWithPrefix.get(0), (Matcher)CoreMatchers.is((Object)"a"));
            }
        }
    }

    @Test
    public void shouldReturnNoKeys() {
        ArrayList<KeyValue> entries = new ArrayList<KeyValue>();
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"a")), (Object)this.stringSerializer.serialize(null, (Object)"a")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"b")), (Object)this.stringSerializer.serialize(null, (Object)"c")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"c")), (Object)this.stringSerializer.serialize(null, (Object)"e")));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.rocksDBStore.putAll(entries);
        this.rocksDBStore.flush();
        try (KeyValueIterator keysWithPrefix = this.rocksDBStore.prefixScan((Object)"d", this.stringSerializer);){
            int numberOfKeysReturned = 0;
            while (keysWithPrefix.hasNext()) {
                keysWithPrefix.next();
                ++numberOfKeysReturned;
            }
            MatcherAssert.assertThat((Object)numberOfKeysReturned, (Matcher)CoreMatchers.is((Object)0));
        }
    }

    @Test
    public void shouldRestoreAll() {
        List<KeyValue<byte[], byte[]>> entries = this.getKeyValueEntries();
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), entries);
        Assert.assertEquals((Object)"a", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
        Assert.assertEquals((Object)"b", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"2")))));
        Assert.assertEquals((Object)"c", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"3")))));
    }

    @Test
    public void shouldPutOnlyIfAbsentValue() {
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        Bytes keyBytes = new Bytes(this.stringSerializer.serialize(null, (Object)"one"));
        byte[] valueBytes = this.stringSerializer.serialize(null, (Object)"A");
        byte[] valueBytesUpdate = this.stringSerializer.serialize(null, (Object)"B");
        this.rocksDBStore.putIfAbsent(keyBytes, valueBytes);
        this.rocksDBStore.putIfAbsent(keyBytes, valueBytesUpdate);
        String retrievedValue = (String)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(keyBytes));
        Assert.assertEquals((Object)"A", (Object)retrievedValue);
    }

    @Test
    public void shouldHandleDeletesOnRestoreAll() {
        List<KeyValue<byte[], byte[]>> entries = this.getKeyValueEntries();
        entries.add((KeyValue<byte[], byte[]>)new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), null));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), entries);
        try (KeyValueIterator iterator = this.rocksDBStore.all();){
            HashSet<Object> keys = new HashSet<Object>();
            while (iterator.hasNext()) {
                keys.add(this.stringDeserializer.deserialize(null, ((Bytes)((KeyValue)iterator.next()).key).get()));
            }
            MatcherAssert.assertThat(keys, (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{"2", "3"})));
        }
    }

    @Test
    public void shouldHandleDeletesAndPutBackOnRestoreAll() {
        ArrayList<KeyValue<byte[], byte[]>> entries = new ArrayList<KeyValue<byte[], byte[]>>();
        entries.add(new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), (Object)"a".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"2".getBytes(StandardCharsets.UTF_8), (Object)"b".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), null));
        entries.add(new KeyValue((Object)"3".getBytes(StandardCharsets.UTF_8), (Object)"c".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), (Object)"restored".getBytes(StandardCharsets.UTF_8)));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), entries);
        try (KeyValueIterator iterator = this.rocksDBStore.all();){
            HashSet<Object> keys = new HashSet<Object>();
            while (iterator.hasNext()) {
                keys.add(this.stringDeserializer.deserialize(null, ((Bytes)((KeyValue)iterator.next()).key).get()));
            }
            MatcherAssert.assertThat(keys, (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{"1", "2", "3"})));
            Assert.assertEquals((Object)"restored", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
            Assert.assertEquals((Object)"b", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"2")))));
            Assert.assertEquals((Object)"c", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"3")))));
        }
    }

    @Test
    public void shouldRestoreThenDeleteOnRestoreAll() {
        List<KeyValue<byte[], byte[]>> entries = this.getKeyValueEntries();
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), entries);
        Assert.assertEquals((Object)"a", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
        Assert.assertEquals((Object)"b", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"2")))));
        Assert.assertEquals((Object)"c", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"3")))));
        entries.clear();
        entries.add((KeyValue<byte[], byte[]>)new KeyValue((Object)"2".getBytes(StandardCharsets.UTF_8), (Object)"b".getBytes(StandardCharsets.UTF_8)));
        entries.add((KeyValue<byte[], byte[]>)new KeyValue((Object)"3".getBytes(StandardCharsets.UTF_8), (Object)"c".getBytes(StandardCharsets.UTF_8)));
        entries.add((KeyValue<byte[], byte[]>)new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), null));
        this.context.restore(this.rocksDBStore.name(), entries);
        try (KeyValueIterator iterator = this.rocksDBStore.all();){
            HashSet<Object> keys = new HashSet<Object>();
            while (iterator.hasNext()) {
                keys.add(this.stringDeserializer.deserialize(null, ((Bytes)((KeyValue)iterator.next()).key).get()));
            }
            MatcherAssert.assertThat(keys, (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{"2", "3"})));
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionOnNullPut() {
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        Assert.assertThrows(NullPointerException.class, () -> this.rocksDBStore.put(null, this.stringSerializer.serialize(null, (Object)"someVal")));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnNullPutAll() {
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        Assert.assertThrows(NullPointerException.class, () -> this.rocksDBStore.put(null, this.stringSerializer.serialize(null, (Object)"someVal")));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnNullGet() {
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        Assert.assertThrows(NullPointerException.class, () -> this.rocksDBStore.get(null));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnDelete() {
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        Assert.assertThrows(NullPointerException.class, () -> this.rocksDBStore.delete(null));
    }

    @Test
    public void shouldReturnValueOnRange() {
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        KeyValue kv0 = new KeyValue((Object)"0", (Object)"zero");
        KeyValue kv1 = new KeyValue((Object)"1", (Object)"one");
        KeyValue kv2 = new KeyValue((Object)"2", (Object)"two");
        this.rocksDBStore.put(new Bytes(((String)kv0.key).getBytes(StandardCharsets.UTF_8)), ((String)kv0.value).getBytes(StandardCharsets.UTF_8));
        this.rocksDBStore.put(new Bytes(((String)kv1.key).getBytes(StandardCharsets.UTF_8)), ((String)kv1.value).getBytes(StandardCharsets.UTF_8));
        this.rocksDBStore.put(new Bytes(((String)kv2.key).getBytes(StandardCharsets.UTF_8)), ((String)kv2.value).getBytes(StandardCharsets.UTF_8));
        LinkedList<KeyValue> expectedContents = new LinkedList<KeyValue>();
        expectedContents.add(kv0);
        expectedContents.add(kv1);
        try (KeyValueIterator iterator = this.rocksDBStore.range(null, new Bytes(this.stringSerializer.serialize(null, (Object)"1")));){
            Assert.assertEquals(expectedContents, this.getDeserializedList((KeyValueIterator<Bytes, byte[]>)iterator));
        }
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        Utils.delete((File)this.dir);
        this.rocksDBStore.put(new Bytes(this.stringSerializer.serialize(null, (Object)"anyKey")), this.stringSerializer.serialize(null, (Object)"anyValue"));
        Assert.assertThrows(ProcessorStateException.class, () -> this.rocksDBStore.flush());
    }

    @Test
    public void shouldHandleToggleOfEnablingBloomFilters() {
        byte[] valBytes;
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("rocksdb.config.setter", TestingBloomFilterRocksDBConfigSetter.class);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig((Map)props));
        enableBloomFilters = false;
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        ArrayList<String> expectedValues = new ArrayList<String>();
        expectedValues.add("a");
        expectedValues.add("b");
        expectedValues.add("c");
        List<KeyValue<byte[], byte[]>> keyValues = this.getKeyValueEntries();
        for (KeyValue<byte[], byte[]> keyValue : keyValues) {
            this.rocksDBStore.put(new Bytes((byte[])keyValue.key), (byte[])keyValue.value);
        }
        int expectedIndex = 0;
        for (KeyValue<byte[], byte[]> keyValue : keyValues) {
            valBytes = this.rocksDBStore.get(new Bytes((byte[])keyValue.key));
            MatcherAssert.assertThat((Object)new String(valBytes, StandardCharsets.UTF_8), (Matcher)CoreMatchers.is(expectedValues.get(expectedIndex++)));
        }
        Assert.assertFalse((boolean)TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet);
        this.rocksDBStore.close();
        expectedIndex = 0;
        enableBloomFilters = true;
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        for (KeyValue<byte[], byte[]> keyValue : keyValues) {
            valBytes = this.rocksDBStore.get(new Bytes((byte[])keyValue.key));
            MatcherAssert.assertThat((Object)new String(valBytes, StandardCharsets.UTF_8), (Matcher)CoreMatchers.is(expectedValues.get(expectedIndex++)));
        }
        Assert.assertTrue((boolean)TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet);
    }

    @Test
    public void shouldVerifyThatMetricsRecordedFromStatisticsGetMeasurementsFromRocksDB() {
        TaskId taskId = new TaskId(0, 0);
        Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-application", "latest", this.time);
        this.context = (InternalMockProcessorContext)((Object)EasyMock.niceMock(InternalMockProcessorContext.class));
        EasyMock.expect((Object)this.context.metrics()).andStubReturn((Object)streamsMetrics);
        EasyMock.expect((Object)this.context.taskId()).andStubReturn((Object)taskId);
        EasyMock.expect((Object)this.context.appConfigs()).andStubReturn((Object)new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()).originals());
        EasyMock.expect((Object)this.context.stateDir()).andStubReturn((Object)this.dir);
        MonotonicProcessorRecordContext processorRecordContext = new MonotonicProcessorRecordContext("test", 0);
        EasyMock.expect((Object)this.context.recordMetadata()).andStubReturn(Optional.of(processorRecordContext));
        EasyMock.replay((Object[])new Object[]{this.context});
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        byte[] key = "hello".getBytes();
        byte[] value = "world".getBytes();
        this.rocksDBStore.put(Bytes.wrap((byte[])key), value);
        streamsMetrics.rocksDBMetricsRecordingTrigger().run();
        KafkaMetric bytesWrittenTotal = metrics.metric(new MetricName("bytes-written-total", "stream-state-metrics", "description is not verified", streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)));
        MatcherAssert.assertThat((Object)((Double)bytesWrittenTotal.metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() {
        TaskId taskId = new TaskId(0, 0);
        Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO));
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-application", "latest", this.time);
        this.context = (InternalMockProcessorContext)((Object)EasyMock.niceMock(InternalMockProcessorContext.class));
        EasyMock.expect((Object)this.context.metrics()).andStubReturn((Object)streamsMetrics);
        EasyMock.expect((Object)this.context.taskId()).andStubReturn((Object)taskId);
        EasyMock.expect((Object)this.context.appConfigs()).andStubReturn((Object)new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()).originals());
        EasyMock.expect((Object)this.context.stateDir()).andStubReturn((Object)this.dir);
        MonotonicProcessorRecordContext processorRecordContext = new MonotonicProcessorRecordContext("test", 0);
        EasyMock.expect((Object)this.context.recordMetadata()).andStubReturn(Optional.of(processorRecordContext));
        EasyMock.replay((Object[])new Object[]{this.context});
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        byte[] key = "hello".getBytes();
        byte[] value = "world".getBytes();
        this.rocksDBStore.put(Bytes.wrap((byte[])key), value);
        KafkaMetric numberOfEntriesActiveMemTable = metrics.metric(new MetricName("num-entries-active-mem-table", "stream-state-metrics", "description is not verified", streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)));
        MatcherAssert.assertThat((Object)numberOfEntriesActiveMemTable, (Matcher)CoreMatchers.notNullValue());
        MatcherAssert.assertThat((Object)((BigInteger)numberOfEntriesActiveMemTable.metricValue()), (Matcher)Matchers.greaterThan((Comparable)BigInteger.valueOf(0L)));
    }

    @Test
    public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() {
        TaskId taskId = new TaskId(0, 0);
        Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO));
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-application", "latest", this.time);
        Properties props = StreamsTestUtils.getStreamsConfig();
        this.context = (InternalMockProcessorContext)((Object)EasyMock.niceMock(InternalMockProcessorContext.class));
        EasyMock.expect((Object)this.context.metrics()).andStubReturn((Object)streamsMetrics);
        EasyMock.expect((Object)this.context.taskId()).andStubReturn((Object)taskId);
        EasyMock.expect((Object)this.context.appConfigs()).andStubReturn((Object)new StreamsConfig((Map)props).originals());
        EasyMock.expect((Object)this.context.stateDir()).andStubReturn((Object)this.dir);
        EasyMock.replay((Object[])new Object[]{this.context});
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        List<String> propertyNames = Arrays.asList("num-entries-active-mem-table", "num-deletes-active-mem-table", "num-entries-imm-mem-tables", "num-deletes-imm-mem-tables", "num-immutable-mem-table", "cur-size-active-mem-table", "cur-size-all-mem-tables", "size-all-mem-tables", "mem-table-flush-pending", "num-running-flushes", "compaction-pending", "num-running-compactions", "estimate-pending-compaction-bytes", "total-sst-files-size", "live-sst-files-size", "num-live-versions", "block-cache-capacity", "block-cache-usage", "block-cache-pinned-usage", "estimate-num-keys", "estimate-table-readers-mem", "background-errors");
        for (String propertyname : propertyNames) {
            KafkaMetric metric = metrics.metric(new MetricName(propertyname, "stream-state-metrics", "description is not verified", streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)));
            MatcherAssert.assertThat((String)("Metric " + propertyname + " not found!"), (Object)metric, (Matcher)CoreMatchers.notNullValue());
            metric.metricValue();
        }
    }

    @Test
    public void shouldPerformRangeQueriesWithCachingDisabled() {
        this.context.setTime(1L);
        this.store.put((Object)1, (Object)"hi");
        this.store.put((Object)2, (Object)"goodbye");
        try (KeyValueIterator range = this.store.range((Object)1, (Object)2);){
            Assert.assertEquals((Object)"hi", (Object)((KeyValue)range.next()).value);
            Assert.assertEquals((Object)"goodbye", (Object)((KeyValue)range.next()).value);
            Assert.assertFalse((boolean)range.hasNext());
        }
    }

    @Test
    public void shouldPerformAllQueriesWithCachingDisabled() {
        this.context.setTime(1L);
        this.store.put((Object)1, (Object)"hi");
        this.store.put((Object)2, (Object)"goodbye");
        try (KeyValueIterator range = this.store.all();){
            Assert.assertEquals((Object)"hi", (Object)((KeyValue)range.next()).value);
            Assert.assertEquals((Object)"goodbye", (Object)((KeyValue)range.next()).value);
            Assert.assertFalse((boolean)range.hasNext());
        }
    }

    @Test
    public void shouldCloseOpenRangeIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext() {
        this.context.setTime(1L);
        this.store.put((Object)1, (Object)"hi");
        this.store.put((Object)2, (Object)"goodbye");
        try (KeyValueIterator iteratorOne = this.store.range((Object)1, (Object)5);
             KeyValueIterator iteratorTwo = this.store.range((Object)1, (Object)4);){
            Assert.assertTrue((boolean)iteratorOne.hasNext());
            Assert.assertTrue((boolean)iteratorTwo.hasNext());
            this.store.close();
            Assertions.assertThrows(InvalidStateStoreException.class, () -> iteratorOne.hasNext());
            Assertions.assertThrows(InvalidStateStoreException.class, () -> {
                KeyValue cfr_ignored_0 = (KeyValue)iteratorOne.next();
            });
            Assertions.assertThrows(InvalidStateStoreException.class, () -> iteratorTwo.hasNext());
            Assertions.assertThrows(InvalidStateStoreException.class, () -> {
                KeyValue cfr_ignored_0 = (KeyValue)iteratorTwo.next();
            });
        }
    }

    @Test
    public void shouldRestoreRecordsAndConsistencyVectorSingleTopic() {
        List<ConsumerRecord<byte[], byte[]>> entries = this.getChangelogRecords();
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        props.put("__iq.consistency.offset.vector.enabled__", (Object)true);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig((Map)props));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restoreWithHeaders(this.rocksDBStore.name(), entries);
        Assert.assertEquals((Object)"a", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
        Assert.assertEquals((Object)"b", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"2")))));
        Assert.assertEquals((Object)"c", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"3")))));
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition(), (Matcher)Matchers.notNullValue());
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition().getPartitionPositions(""), (Matcher)Matchers.notNullValue());
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition().getPartitionPositions(""), (Matcher)Matchers.hasEntry((Object)0, (Object)3L));
    }

    @Test
    public void shouldRestoreRecordsAndConsistencyVectorMultipleTopics() {
        List<ConsumerRecord<byte[], byte[]>> entries = this.getChangelogRecordsMultipleTopics();
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        props.put("__iq.consistency.offset.vector.enabled__", (Object)true);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig((Map)props));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restoreWithHeaders(this.rocksDBStore.name(), entries);
        Assert.assertEquals((Object)"a", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
        Assert.assertEquals((Object)"b", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"2")))));
        Assert.assertEquals((Object)"c", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"3")))));
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition(), (Matcher)Matchers.notNullValue());
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition().getPartitionPositions("A"), (Matcher)Matchers.notNullValue());
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition().getPartitionPositions("A"), (Matcher)Matchers.hasEntry((Object)0, (Object)3L));
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition().getPartitionPositions("B"), (Matcher)Matchers.notNullValue());
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition().getPartitionPositions("B"), (Matcher)Matchers.hasEntry((Object)0, (Object)2L));
    }

    @Test
    public void shouldHandleTombstoneRecords() {
        List<ConsumerRecord<byte[], byte[]>> entries = this.getChangelogRecordsWithTombstones();
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        props.put("__iq.consistency.offset.vector.enabled__", (Object)true);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig((Map)props));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restoreWithHeaders(this.rocksDBStore.name(), entries);
        Assert.assertNull((Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition(), (Matcher)Matchers.notNullValue());
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition().getPartitionPositions("A"), (Matcher)Matchers.hasEntry((Object)0, (Object)2L));
    }

    @Test
    public void shouldNotThrowWhenRestoringOnMissingHeaders() {
        List<KeyValue<byte[], byte[]>> entries = this.getChangelogRecordsWithoutHeaders();
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        props.put("__iq.consistency.offset.vector.enabled__", (Object)true);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig((Map)props));
        this.rocksDBStore.init((StateStoreContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), entries);
        MatcherAssert.assertThat((Object)this.rocksDBStore.getPosition(), (Matcher)CoreMatchers.is((Object)Position.emptyPosition()));
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
        ArrayList<ConsumerRecord<byte[], byte[]>> entries = new ArrayList<ConsumerRecord<byte[], byte[]>>();
        entries.add(this.createChangelogRecord("1".getBytes(StandardCharsets.UTF_8), "a".getBytes(StandardCharsets.UTF_8), "", 0, 1L));
        entries.add(this.createChangelogRecord("2".getBytes(StandardCharsets.UTF_8), "b".getBytes(StandardCharsets.UTF_8), "", 0, 2L));
        entries.add(this.createChangelogRecord("3".getBytes(StandardCharsets.UTF_8), "c".getBytes(StandardCharsets.UTF_8), "", 0, 3L));
        return entries;
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecordsMultipleTopics() {
        ArrayList<ConsumerRecord<byte[], byte[]>> entries = new ArrayList<ConsumerRecord<byte[], byte[]>>();
        entries.add(this.createChangelogRecord("1".getBytes(StandardCharsets.UTF_8), "a".getBytes(StandardCharsets.UTF_8), "A", 0, 1L));
        entries.add(this.createChangelogRecord("2".getBytes(StandardCharsets.UTF_8), "b".getBytes(StandardCharsets.UTF_8), "B", 0, 2L));
        entries.add(this.createChangelogRecord("3".getBytes(StandardCharsets.UTF_8), "c".getBytes(StandardCharsets.UTF_8), "A", 0, 3L));
        return entries;
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecordsWithTombstones() {
        ArrayList<ConsumerRecord<byte[], byte[]>> entries = new ArrayList<ConsumerRecord<byte[], byte[]>>();
        entries.add(this.createChangelogRecord("1".getBytes(StandardCharsets.UTF_8), "a".getBytes(StandardCharsets.UTF_8), "A", 0, 1L));
        entries.add(this.createChangelogRecord("1".getBytes(StandardCharsets.UTF_8), null, "A", 0, 2L));
        return entries;
    }

    private List<KeyValue<byte[], byte[]>> getChangelogRecordsWithoutHeaders() {
        ArrayList<KeyValue<byte[], byte[]>> entries = new ArrayList<KeyValue<byte[], byte[]>>();
        entries.add(new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), (Object)"a".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"2".getBytes(StandardCharsets.UTF_8), (Object)"b".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"3".getBytes(StandardCharsets.UTF_8), (Object)"c".getBytes(StandardCharsets.UTF_8)));
        return entries;
    }

    private ConsumerRecord<byte[], byte[]> createChangelogRecord(byte[] key, byte[] value, String topic, int partition, long offset) {
        RecordHeaders headers = new RecordHeaders();
        Position position = Position.emptyPosition();
        position = position.withComponent(topic, partition, offset);
        headers.add((Header)ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        headers.add((Header)new RecordHeader("c", PositionSerde.serialize((Position)position).array()));
        return new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, (Object)key, (Object)value, (Headers)headers, Optional.empty());
    }

    private List<KeyValue<byte[], byte[]>> getKeyValueEntries() {
        ArrayList<KeyValue<byte[], byte[]>> entries = new ArrayList<KeyValue<byte[], byte[]>>();
        entries.add(new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), (Object)"a".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"2".getBytes(StandardCharsets.UTF_8), (Object)"b".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"3".getBytes(StandardCharsets.UTF_8), (Object)"c".getBytes(StandardCharsets.UTF_8)));
        return entries;
    }

    private List<KeyValue<String, String>> getDeserializedList(KeyValueIterator<Bytes, byte[]> iter) {
        List bytes = Utils.toList(iter);
        List<KeyValue<String, String>> result = bytes.stream().map(kv -> new KeyValue((Object)((Bytes)kv.key).toString(), this.stringDeserializer.deserialize(null, (byte[])kv.value))).collect(Collectors.toList());
        return result;
    }

    private Statistics getStatistics(RocksDBStore rocksDBStore) throws Exception {
        Field statisticsField = rocksDBStore.getClass().getDeclaredField("statistics");
        statisticsField.setAccessible(true);
        Statistics statistics = (Statistics)statisticsField.get(rocksDBStore);
        statisticsField.setAccessible(false);
        return statistics;
    }

    public static class TestingBloomFilterRocksDBConfigSetter
    implements RocksDBConfigSetter {
        static boolean bloomFiltersSet;
        static Filter filter;
        static Cache cache;

        public void setConfig(String storeName, Options options, Map<String, Object> configs) {
            BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)options.tableFormatConfig();
            cache = new LRUCache(0x3200000L);
            tableConfig.setBlockCache(cache);
            tableConfig.setBlockSize(4096L);
            if (enableBloomFilters) {
                filter = new BloomFilter();
                tableConfig.setFilterPolicy(filter);
                options.optimizeFiltersForHits();
                bloomFiltersSet = true;
            } else {
                options.setOptimizeFiltersForHits(false);
                bloomFiltersSet = false;
            }
            options.setTableFormatConfig((TableFormatConfig)tableConfig);
        }

        public void close(String storeName, Options options) {
            if (filter != null) {
                filter.close();
            }
            cache.close();
        }
    }

    public static class RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig
    implements RocksDBConfigSetter {
        public void setConfig(String storeName, Options options, Map<String, Object> configs) {
            options.setTableFormatConfig((TableFormatConfig)new PlainTableConfig());
        }

        public void close(String storeName, Options options) {
            options.statistics().close();
        }
    }

    public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig
    implements RocksDBConfigSetter {
        public void setConfig(String storeName, Options options, Map<String, Object> configs) {
            options.setTableFormatConfig((TableFormatConfig)new BlockBasedTableConfig());
        }

        public void close(String storeName, Options options) {
            options.statistics().close();
        }
    }

    public static class RocksDBConfigSetterWithUserProvidedStatistics
    implements RocksDBConfigSetter {
        protected static Statistics lastStatistics = null;

        public void setConfig(String storeName, Options options, Map<String, Object> configs) {
            lastStatistics = new Statistics();
            options.setStatistics(lastStatistics);
        }

        public void close(String storeName, Options options) {
            Assert.assertTrue((boolean)lastStatistics.isOwningHandle());
            lastStatistics.close();
            lastStatistics = null;
        }
    }
}

