/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.rocksdb;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.state.rocksdb.AbstractRocksDBState;
import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend;
import org.apache.flink.state.rocksdb.RocksDBOperationUtils;
import org.apache.flink.state.rocksdb.RocksDBWriteBatchWrapper;
import org.apache.flink.state.rocksdb.RocksIteratorWrapper;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RocksDBMapState<K, N, UK, UV>
extends AbstractRocksDBState<K, N, Map<UK, UV>>
implements InternalMapState<K, N, UK, UV> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
    private TypeSerializer<UK> userKeySerializer;
    private TypeSerializer<UV> userValueSerializer;

    private RocksDBMapState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, TypeSerializer<Map<UK, UV>> valueSerializer, Map<UK, UV> defaultValue, RocksDBKeyedStateBackend<K> backend) {
        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        Preconditions.checkState((boolean)(valueSerializer instanceof MapSerializer), (Object)"Unexpected serializer type.");
        MapSerializer castedMapSerializer = (MapSerializer)valueSerializer;
        this.userKeySerializer = castedMapSerializer.getKeySerializer();
        this.userValueSerializer = castedMapSerializer.getValueSerializer();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<Map<UK, UV>> getValueSerializer() {
        return this.valueSerializer;
    }

    public UV get(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, this.userKeySerializer);
        byte[] rawValueBytes = this.backend.db.get(this.columnFamily, rawKeyBytes);
        return rawValueBytes == null ? null : (UV)RocksDBMapState.deserializeUserValue(this.dataInputView, rawValueBytes, this.userValueSerializer);
    }

    public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, this.userKeySerializer);
        byte[] rawValueBytes = this.serializeValueNullSensitive(userValue, this.userValueSerializer);
        this.backend.db.put(this.columnFamily, this.writeOptions, rawKeyBytes, rawValueBytes);
    }

    public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
        if (map == null) {
            return;
        }
        try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.backend.db, this.writeOptions, this.backend.getWriteBatchSize());){
            for (Map.Entry<UK, UV> entry : map.entrySet()) {
                byte[] rawKeyBytes = this.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(entry.getKey(), this.userKeySerializer);
                byte[] rawValueBytes = this.serializeValueNullSensitive(entry.getValue(), this.userValueSerializer);
                writeBatchWrapper.put(this.columnFamily, rawKeyBytes, rawValueBytes);
            }
        }
    }

    public void remove(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, this.userKeySerializer);
        this.backend.db.delete(this.columnFamily, this.writeOptions, rawKeyBytes);
    }

    public boolean contains(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, this.userKeySerializer);
        byte[] rawValueBytes = this.backend.db.get(this.columnFamily, rawKeyBytes);
        return rawValueBytes != null;
    }

    public Iterable<Map.Entry<UK, UV>> entries() {
        return this::iterator;
    }

    public Iterable<UK> keys() {
        byte[] prefixBytes = this.serializeCurrentKeyWithGroupAndNamespace();
        return () -> new RocksDBMapIterator<UK>(this.backend.db, prefixBytes, (TypeSerializer)this.userKeySerializer, (TypeSerializer)this.userValueSerializer, this.dataInputView){

            @Override
            @Nullable
            public UK next() {
                RocksDBMapEntry entry = this.nextEntry();
                return entry == null ? null : (Object)entry.getKey();
            }
        };
    }

    public Iterable<UV> values() {
        byte[] prefixBytes = this.serializeCurrentKeyWithGroupAndNamespace();
        return () -> new RocksDBMapIterator<UV>(this.backend.db, prefixBytes, (TypeSerializer)this.userKeySerializer, (TypeSerializer)this.userValueSerializer, this.dataInputView){

            @Override
            public UV next() {
                RocksDBMapEntry entry = this.nextEntry();
                return entry == null ? null : (Object)entry.getValue();
            }
        };
    }

    @Override
    public void migrateSerializedValue(DataInputDeserializer serializedOldValueInput, DataOutputSerializer serializedMigratedValueOutput, TypeSerializer<Map<UK, UV>> priorSerializer, TypeSerializer<Map<UK, UV>> newSerializer) throws StateMigrationException {
        Preconditions.checkArgument((boolean)(priorSerializer instanceof MapSerializer));
        Preconditions.checkArgument((boolean)(newSerializer instanceof MapSerializer));
        TypeSerializer priorMapValueSerializer = ((MapSerializer)priorSerializer).getValueSerializer();
        TypeSerializer newMapValueSerializer = ((MapSerializer)newSerializer).getValueSerializer();
        try {
            boolean isNull = serializedOldValueInput.readBoolean();
            Object mapUserValue = null;
            if (!isNull) {
                mapUserValue = priorMapValueSerializer.deserialize((DataInputView)serializedOldValueInput);
            }
            serializedMigratedValueOutput.writeBoolean(mapUserValue == null);
            newMapValueSerializer.serialize(mapUserValue, (DataOutputView)serializedMigratedValueOutput);
        }
        catch (Exception e) {
            throw new StateMigrationException("Error while trying to migrate RocksDB map state.", (Throwable)e);
        }
    }

    public Iterator<Map.Entry<UK, UV>> iterator() {
        byte[] prefixBytes = this.serializeCurrentKeyWithGroupAndNamespace();
        return new RocksDBMapIterator<Map.Entry<UK, UV>>(this.backend.db, prefixBytes, (TypeSerializer)this.userKeySerializer, (TypeSerializer)this.userValueSerializer, this.dataInputView){

            @Override
            public Map.Entry<UK, UV> next() {
                return this.nextEntry();
            }
        };
    }

    public boolean isEmpty() {
        byte[] prefixBytes = this.serializeCurrentKeyWithGroupAndNamespace();
        try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(this.backend.db, this.columnFamily, this.backend.getReadOptions());){
            iterator.seek(prefixBytes);
            boolean bl = !iterator.isValid() || !this.startWithKeyPrefix(prefixBytes, iterator.key());
            return bl;
        }
    }

    @Override
    public void clear() {
        try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(this.backend.db, this.columnFamily, this.backend.getReadOptions());
             RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(this.backend.db, this.backend.getWriteOptions(), this.backend.getWriteBatchSize());){
            byte[] keyBytes;
            byte[] keyPrefixBytes = this.serializeCurrentKeyWithGroupAndNamespace();
            iterator.seek(keyPrefixBytes);
            while (iterator.isValid() && this.startWithKeyPrefix(keyPrefixBytes, keyBytes = iterator.key())) {
                rocksDBWriteBatchWrapper.remove(this.columnFamily, keyBytes);
                iterator.next();
            }
        }
        catch (RocksDBException e) {
            throw new FlinkRuntimeException("Error while cleaning the state in RocksDB.", (Throwable)e);
        }
    }

    protected RocksDBMapState<K, N, UK, UV> setValueSerializer(TypeSerializer<Map<UK, UV>> valueSerializer) {
        super.setValueSerializer(valueSerializer);
        MapSerializer castedMapSerializer = (MapSerializer)valueSerializer;
        this.userKeySerializer = castedMapSerializer.getKeySerializer();
        this.userValueSerializer = castedMapSerializer.getValueSerializer();
        return this;
    }

    @Override
    public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, TypeSerializer<K> safeKeySerializer, TypeSerializer<N> safeNamespaceSerializer, TypeSerializer<Map<UK, UV>> safeValueSerializer) throws Exception {
        Preconditions.checkNotNull((Object)serializedKeyAndNamespace);
        Preconditions.checkNotNull(safeKeySerializer);
        Preconditions.checkNotNull(safeNamespaceSerializer);
        Preconditions.checkNotNull(safeValueSerializer);
        Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace((byte[])serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
        int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup((Object)keyAndNamespace.f0, (int)this.backend.getNumberOfKeyGroups());
        SerializedCompositeKeyBuilder keyBuilder = new SerializedCompositeKeyBuilder(safeKeySerializer, this.backend.getKeyGroupPrefixBytes(), 32);
        keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
        byte[] keyPrefixBytes = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, this.namespaceSerializer);
        MapSerializer serializer = (MapSerializer)safeValueSerializer;
        TypeSerializer dupUserKeySerializer = serializer.getKeySerializer();
        TypeSerializer dupUserValueSerializer = serializer.getValueSerializer();
        DataInputDeserializer inputView = new DataInputDeserializer();
        RocksDBMapIterator iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(this.backend.db, keyPrefixBytes, dupUserKeySerializer, dupUserValueSerializer, inputView){

            @Override
            public Map.Entry<UK, UV> next() {
                return this.nextEntry();
            }
        };
        if (!iterator.hasNext()) {
            return null;
        }
        return KvStateSerializer.serializeMap(() -> iterator, (TypeSerializer)dupUserKeySerializer, (TypeSerializer)dupUserValueSerializer);
    }

    private static <UK> UK deserializeUserKey(DataInputDeserializer dataInputView, int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
        dataInputView.setBuffer(rawKeyBytes, userKeyOffset, rawKeyBytes.length - userKeyOffset);
        return (UK)keySerializer.deserialize((DataInputView)dataInputView);
    }

    private static <UV> UV deserializeUserValue(DataInputDeserializer dataInputView, byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
        dataInputView.setBuffer(rawValueBytes);
        boolean isNull = dataInputView.readBoolean();
        return (UV)(isNull ? null : valueSerializer.deserialize((DataInputView)dataInputView));
    }

    private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
        if (rawKeyBytes.length < keyPrefixBytes.length) {
            return false;
        }
        int i = keyPrefixBytes.length;
        while (--i >= this.backend.getKeyGroupPrefixBytes()) {
            if (rawKeyBytes[i] == keyPrefixBytes[i]) continue;
            return false;
        }
        return true;
    }

    static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, RocksDBKeyedStateBackend<K> backend) {
        return (IS)new RocksDBMapState<K, N, UK, UV>((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), (Map)stateDesc.getDefaultValue(), backend);
    }

    static <UK, UV, K, N, SV, S extends State, IS extends S> IS update(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, IS existingState) {
        return (IS)((RocksDBMapState)existingState).setNamespaceSerializer(((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getNamespaceSerializer()).setValueSerializer(((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer()).setDefaultValue((Map)stateDesc.getDefaultValue());
    }

    static class StateSnapshotTransformerWrapper
    implements StateSnapshotTransformer<byte[]> {
        private static final byte[] NULL_VALUE;
        private static final byte NON_NULL_VALUE_PREFIX;
        private final StateSnapshotTransformer<byte[]> elementTransformer;
        private final DataInputDeserializer div;

        StateSnapshotTransformerWrapper(StateSnapshotTransformer<byte[]> originalTransformer) {
            this.elementTransformer = originalTransformer;
            this.div = new DataInputDeserializer();
        }

        @Nullable
        public byte[] filterOrTransform(@Nullable byte[] value) {
            if (value == null || this.isNull(value)) {
                return NULL_VALUE;
            }
            byte[] woNullByte = Arrays.copyOfRange(value, 1, value.length);
            byte[] filteredValue = (byte[])this.elementTransformer.filterOrTransform((Object)woNullByte);
            filteredValue = filteredValue == null ? NULL_VALUE : (filteredValue != woNullByte ? StateSnapshotTransformerWrapper.prependWithNonNullByte(filteredValue, value) : value);
            return filteredValue;
        }

        private boolean isNull(byte[] value) {
            try {
                this.div.setBuffer(value, 0, 1);
                return this.div.readBoolean();
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Failed to deserialize boolean flag of map user null value", (Throwable)e);
            }
        }

        private static byte[] prependWithNonNullByte(byte[] value, byte[] reuse) {
            int len = 1 + value.length;
            byte[] result = reuse.length == len ? reuse : new byte[len];
            result[0] = NON_NULL_VALUE_PREFIX;
            System.arraycopy(value, 0, result, 1, value.length);
            return result;
        }

        static {
            DataOutputSerializer dov = new DataOutputSerializer(1);
            try {
                dov.writeBoolean(true);
                NULL_VALUE = dov.getCopyOfBuffer();
                dov.clear();
                dov.writeBoolean(false);
                NON_NULL_VALUE_PREFIX = dov.getSharedBuffer()[0];
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Failed to serialize boolean flag of map user null value", (Throwable)e);
            }
        }
    }

    private abstract class RocksDBMapIterator<T>
    implements Iterator<T> {
        private static final int CACHE_SIZE_LIMIT = 128;
        private final RocksDB db;
        @Nonnull
        private final byte[] keyPrefixBytes;
        private boolean expired = false;
        private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList();
        private RocksDBMapEntry currentEntry;
        private int cacheIndex = 0;
        private final TypeSerializer<UK> keySerializer;
        private final TypeSerializer<UV> valueSerializer;
        private final DataInputDeserializer dataInputView;

        RocksDBMapIterator(RocksDB db, byte[] keyPrefixBytes, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer, DataInputDeserializer dataInputView) {
            this.db = db;
            this.keyPrefixBytes = keyPrefixBytes;
            this.keySerializer = keySerializer;
            this.valueSerializer = valueSerializer;
            this.dataInputView = dataInputView;
        }

        @Override
        public boolean hasNext() {
            this.loadCache();
            return this.cacheIndex < this.cacheEntries.size();
        }

        @Override
        public void remove() {
            if (this.currentEntry == null || this.currentEntry.deleted) {
                throw new IllegalStateException("The remove operation must be called after a valid next operation.");
            }
            this.currentEntry.remove();
        }

        final RocksDBMapEntry nextEntry() {
            this.loadCache();
            if (this.cacheIndex == this.cacheEntries.size()) {
                if (!this.expired) {
                    throw new IllegalStateException();
                }
                return null;
            }
            this.currentEntry = this.cacheEntries.get(this.cacheIndex);
            ++this.cacheIndex;
            return this.currentEntry;
        }

        private void loadCache() {
            if (this.cacheIndex > this.cacheEntries.size()) {
                throw new IllegalStateException();
            }
            if (this.cacheIndex < this.cacheEntries.size() || this.expired) {
                return;
            }
            try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(this.db, RocksDBMapState.this.columnFamily, RocksDBMapState.this.backend.getReadOptions());){
                byte[] startBytes = this.currentEntry == null ? this.keyPrefixBytes : this.currentEntry.rawKeyBytes;
                this.cacheEntries.clear();
                this.cacheIndex = 0;
                iterator.seek(startBytes);
                if (this.currentEntry != null && !this.currentEntry.deleted) {
                    iterator.next();
                }
                while (true) {
                    if (!iterator.isValid() || !RocksDBMapState.this.startWithKeyPrefix(this.keyPrefixBytes, iterator.key())) {
                        this.expired = true;
                        break;
                    }
                    if (this.cacheEntries.size() >= 128) {
                        break;
                    }
                    RocksDBMapEntry entry = new RocksDBMapEntry(this.db, this.keyPrefixBytes.length, iterator.key(), iterator.value(), this.keySerializer, this.valueSerializer, this.dataInputView);
                    this.cacheEntries.add(entry);
                    iterator.next();
                }
            }
        }
    }

    private class RocksDBMapEntry
    implements Map.Entry<UK, UV> {
        private final RocksDB db;
        private final byte[] rawKeyBytes;
        private byte[] rawValueBytes;
        private boolean deleted;
        private UK userKey;
        private UV userValue;
        private final int userKeyOffset;
        private final TypeSerializer<UK> keySerializer;
        private final TypeSerializer<UV> valueSerializer;
        private final DataInputDeserializer dataInputView;

        RocksDBMapEntry(@Nonnegative RocksDB db, @Nonnull int userKeyOffset, @Nonnull byte[] rawKeyBytes, @Nonnull byte[] rawValueBytes, @Nonnull TypeSerializer<UK> keySerializer, @Nonnull TypeSerializer<UV> valueSerializer, DataInputDeserializer dataInputView) {
            this.db = db;
            this.userKeyOffset = userKeyOffset;
            this.keySerializer = keySerializer;
            this.valueSerializer = valueSerializer;
            this.rawKeyBytes = rawKeyBytes;
            this.rawValueBytes = rawValueBytes;
            this.deleted = false;
            this.dataInputView = dataInputView;
        }

        public void remove() {
            this.deleted = true;
            this.rawValueBytes = null;
            try {
                this.db.delete(RocksDBMapState.this.columnFamily, RocksDBMapState.this.writeOptions, this.rawKeyBytes);
            }
            catch (RocksDBException e) {
                throw new FlinkRuntimeException("Error while removing data from RocksDB.", (Throwable)e);
            }
        }

        @Override
        public UK getKey() {
            if (this.userKey == null) {
                try {
                    this.userKey = RocksDBMapState.deserializeUserKey(this.dataInputView, this.userKeyOffset, this.rawKeyBytes, this.keySerializer);
                }
                catch (IOException e) {
                    throw new FlinkRuntimeException("Error while deserializing the user key.", (Throwable)e);
                }
            }
            return this.userKey;
        }

        @Override
        public UV getValue() {
            if (this.deleted) {
                return null;
            }
            if (this.userValue == null) {
                try {
                    this.userValue = RocksDBMapState.deserializeUserValue(this.dataInputView, this.rawValueBytes, this.valueSerializer);
                }
                catch (IOException e) {
                    throw new FlinkRuntimeException("Error while deserializing the user value.", (Throwable)e);
                }
            }
            return this.userValue;
        }

        @Override
        public UV setValue(UV value) {
            if (this.deleted) {
                throw new IllegalStateException("The value has already been deleted.");
            }
            Object oldValue = this.getValue();
            try {
                this.userValue = value;
                this.rawValueBytes = RocksDBMapState.this.serializeValueNullSensitive(value, this.valueSerializer);
                this.db.put(RocksDBMapState.this.columnFamily, RocksDBMapState.this.writeOptions, this.rawKeyBytes, this.rawValueBytes);
            }
            catch (IOException | RocksDBException e) {
                throw new FlinkRuntimeException("Error while putting data into RocksDB.", e);
            }
            return oldValue;
        }

        @Override
        public boolean equals(Object o) {
            if (!(o instanceof Map.Entry)) {
                return false;
            }
            Map.Entry e = (Map.Entry)o;
            return Objects.equals(this.getKey(), e.getKey()) && Objects.equals(this.getValue(), e.getValue());
        }
    }
}

