/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.Spliterators;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.CopyOnWriteSkipListStateMapSnapshot;
import org.apache.flink.runtime.state.heap.LevelIndexHeader;
import org.apache.flink.runtime.state.heap.NodeStatus;
import org.apache.flink.runtime.state.heap.OnHeapLevelIndexHeader;
import org.apache.flink.runtime.state.heap.SkipListKeyComparator;
import org.apache.flink.runtime.state.heap.SkipListKeySerializer;
import org.apache.flink.runtime.state.heap.SkipListUtils;
import org.apache.flink.runtime.state.heap.SkipListValueSerializer;
import org.apache.flink.runtime.state.heap.StateMap;
import org.apache.flink.runtime.state.heap.StateMapSnapshot;
import org.apache.flink.runtime.state.heap.space.Allocator;
import org.apache.flink.runtime.state.heap.space.Chunk;
import org.apache.flink.runtime.state.heap.space.SpaceUtils;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CopyOnWriteSkipListStateMap<K, N, S>
extends StateMap<K, N, S>
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
    static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
    static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
    private final SkipListKeySerializer<K, N> skipListKeySerializer;
    private final SkipListValueSerializer<S> skipListValueSerializer;
    private final Allocator spaceAllocator;
    private final LevelIndexHeader levelIndexHeader;
    private int randomSeed;
    private volatile int stateMapVersion;
    private volatile int highestRequiredSnapshotVersionPlusOne;
    private volatile int highestFinishedSnapshotVersion;
    private final TreeSet<Integer> snapshotVersions;
    private int totalSize;
    private int requestCount;
    private final Set<Long> logicallyRemovedNodes;
    private int numKeysToDeleteOneTime;
    private float logicalRemovedKeysRatio;
    private final Set<Long> pruningValueNodes;
    private final AtomicBoolean closed;
    private final ResourceGuard resourceGuard;

    public CopyOnWriteSkipListStateMap(@Nonnull TypeSerializer<K> keySerializer, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull TypeSerializer<S> stateSerializer, @Nonnull Allocator spaceAllocator, int numKeysToDeleteOneTime, float logicalRemovedKeysRatio) {
        this.skipListKeySerializer = new SkipListKeySerializer<K, N>(keySerializer, namespaceSerializer);
        this.skipListValueSerializer = new SkipListValueSerializer<S>(stateSerializer);
        this.spaceAllocator = spaceAllocator;
        Preconditions.checkArgument((numKeysToDeleteOneTime >= 0 ? 1 : 0) != 0, (Object)("numKeysToDeleteOneTime should be non-negative, but is " + numKeysToDeleteOneTime));
        this.numKeysToDeleteOneTime = numKeysToDeleteOneTime;
        Preconditions.checkArgument((logicalRemovedKeysRatio >= 0.0f && logicalRemovedKeysRatio <= 1.0f ? 1 : 0) != 0, (Object)("logicalRemovedKeysRatio should be in [0, 1], but is " + logicalRemovedKeysRatio));
        this.logicalRemovedKeysRatio = logicalRemovedKeysRatio;
        this.levelIndexHeader = new OnHeapLevelIndexHeader();
        this.randomSeed = ThreadLocalRandom.current().nextInt() | 0x100;
        this.stateMapVersion = 0;
        this.highestRequiredSnapshotVersionPlusOne = 0;
        this.highestFinishedSnapshotVersion = 0;
        this.snapshotVersions = new TreeSet();
        this.totalSize = 0;
        this.requestCount = 0;
        this.logicallyRemovedNodes = new HashSet<Long>();
        this.pruningValueNodes = ConcurrentHashMap.newKeySet();
        this.closed = new AtomicBoolean(false);
        this.resourceGuard = new ResourceGuard();
    }

    public int size() {
        return this.totalSize - this.logicallyRemovedNodes.size();
    }

    int totalSize() {
        return this.totalSize;
    }

    public int getRequestCount() {
        return this.requestCount;
    }

    public S get(K key, N namespace) {
        this.updateStat();
        return this.getNodeInternal(key, namespace);
    }

    public boolean containsKey(K key, N namespace) {
        this.updateStat();
        S node = this.getNodeInternal(key, namespace);
        return node != null;
    }

    public void put(K key, N namespace, S state) {
        this.updateStat();
        MemorySegment keySegment = this.getKeySegment(key, namespace);
        int keyLen = keySegment.size();
        byte[] value = this.skipListValueSerializer.serialize(state);
        this.putValue(keySegment, 0, keyLen, value, false);
    }

    public S putAndGetOld(K key, N namespace, S state) {
        this.updateStat();
        MemorySegment keySegment = this.getKeySegment(key, namespace);
        int keyLen = keySegment.size();
        byte[] value = this.skipListValueSerializer.serialize(state);
        return this.putValue(keySegment, 0, keyLen, value, true);
    }

    public void remove(K key, N namespace) {
        this.updateStat();
        MemorySegment keySegment = this.getKeySegment(key, namespace);
        int keyLen = keySegment.size();
        this.removeNode(keySegment, 0, keyLen, false);
    }

    public S removeAndGetOld(K key, N namespace) {
        this.updateStat();
        MemorySegment keySegment = this.getKeySegment(key, namespace);
        int keyLen = keySegment.size();
        return this.removeNode(keySegment, 0, keyLen, true);
    }

    public <T> void transform(K key, N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
        this.updateStat();
        MemorySegment keySegment = this.getKeySegment(key, namespace);
        int keyLen = keySegment.size();
        S oldState = this.getNode(keySegment, 0, keyLen);
        Object newState = transformation.apply(oldState, value);
        byte[] stateBytes = this.skipListValueSerializer.serialize(newState);
        this.putValue(keySegment, 0, keyLen, stateBytes, false);
    }

    @Nullable
    @VisibleForTesting
    S getNode(MemorySegment keySegment, int keyOffset, int keyLen) {
        SkipListIterateAndProcessResult result = this.iterateAndProcess(keySegment, keyOffset, keyLen, (pointers, isRemoved) -> {
            long currentNode = pointers.currentNode;
            return isRemoved != false ? null : this.getNodeStateHelper(currentNode);
        });
        return result.isKeyFound ? (S)result.state : null;
    }

    @VisibleForTesting
    S putValue(MemorySegment keySegment, int keyOffset, int keyLen, byte[] value, boolean returnOldState) {
        SkipListIterateAndProcessResult result = this.iterateAndProcess(keySegment, keyOffset, keyLen, (pointers, isLogicallyRemoved) -> this.putValue(pointers.currentNode, value, returnOldState));
        if (result.isKeyFound) {
            return result.state;
        }
        long prevNode = result.prevNode;
        long currentNode = result.currentNode;
        int level = this.getRandomIndexLevel();
        this.levelIndexHeader.updateLevel(level);
        int totalMetaKeyLen = SkipListUtils.getKeyMetaLen(level) + keyLen;
        long node = this.allocateSpace(totalMetaKeyLen);
        int totalValueLen = SkipListUtils.getValueMetaLen() + value.length;
        long valuePointer = this.allocateSpace(totalValueLen);
        this.doWriteKey(node, level, keySegment, keyOffset, keyLen, valuePointer, currentNode);
        this.doWriteValue(valuePointer, value, this.stateMapVersion, node, -1L);
        this.helpSetNextNode(prevNode, node, 0);
        if (level > 0) {
            SkipListUtils.buildLevelIndex(node, level, keySegment, keyOffset, this.levelIndexHeader, this.spaceAllocator);
        }
        ++this.totalSize;
        return null;
    }

    private S putValue(long currentNode, byte[] value, boolean returnOldState) {
        int version = SkipListUtils.helpGetNodeLatestVersion(currentNode, this.spaceAllocator);
        boolean needCopyOnWrite = version < this.highestRequiredSnapshotVersionPlusOne;
        long oldValuePointer = needCopyOnWrite ? this.updateValueWithCopyOnWrite(currentNode, value) : this.updateValueWithReplace(currentNode, value);
        NodeStatus oldStatus = this.helpSetNodeStatus(currentNode, NodeStatus.PUT);
        if (oldStatus == NodeStatus.REMOVE) {
            this.logicallyRemovedNodes.remove(currentNode);
        }
        S oldState = null;
        if (returnOldState) {
            oldState = this.helpGetState(oldValuePointer);
        }
        if (!needCopyOnWrite) {
            this.spaceAllocator.free(oldValuePointer);
        }
        return oldState;
    }

    private S removeNode(MemorySegment keySegment, int keyOffset, int keyLen, boolean returnOldState) {
        SkipListIterateAndProcessResult result = this.iterateAndProcess(keySegment, keyOffset, keyLen, (pointers, isLogicallyRemoved) -> this.removeNode((SkipListNodePointers)pointers, (Boolean)isLogicallyRemoved, returnOldState));
        return result.isKeyFound ? (S)result.state : null;
    }

    private S removeNode(SkipListNodePointers pointers, Boolean isLogicallyRemoved, boolean returnOldState) {
        boolean oldValueNeedFree;
        long oldValuePointer;
        long prevNode = pointers.prevNode;
        long currentNode = pointers.currentNode;
        long nextNode = pointers.nextNode;
        if (isLogicallyRemoved.booleanValue() && this.highestRequiredSnapshotVersionPlusOne != 0) {
            return null;
        }
        if (this.highestRequiredSnapshotVersionPlusOne == 0) {
            oldValuePointer = this.doPhysicalRemoveAndGetValue(currentNode, prevNode, nextNode);
            if (isLogicallyRemoved.booleanValue()) {
                this.logicallyRemovedNodes.remove(currentNode);
            }
            oldValueNeedFree = true;
        } else {
            int version = SkipListUtils.helpGetNodeLatestVersion(currentNode, this.spaceAllocator);
            if (version < this.highestRequiredSnapshotVersionPlusOne) {
                oldValuePointer = this.updateValueWithCopyOnWrite(currentNode, null);
                oldValueNeedFree = false;
            } else {
                oldValuePointer = this.updateValueWithReplace(currentNode, null);
                oldValueNeedFree = true;
            }
            this.helpSetNodeStatus(currentNode, NodeStatus.REMOVE);
            this.logicallyRemovedNodes.add(currentNode);
        }
        S oldState = null;
        if (returnOldState) {
            oldState = this.helpGetState(oldValuePointer);
        }
        if (oldValueNeedFree) {
            this.spaceAllocator.free(oldValuePointer);
        }
        return oldState;
    }

    private long allocateSpace(int size) {
        try {
            return this.spaceAllocator.allocate(size);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Failed to allocate space in CopyOnWriteSkipListStateMap", (Throwable)e);
        }
    }

    private SkipListIterateAndProcessResult iterateAndProcess(MemorySegment keySegment, int keyOffset, int keyLen, BiFunction<SkipListNodePointers, Boolean, S> function) {
        int deleteCount = 0;
        long prevNode = this.findPredecessor(keySegment, keyOffset, 1);
        long currentNode = this.helpGetNextNode(prevNode, 0);
        while (currentNode != -1L) {
            long nextNode = this.helpGetNextNode(currentNode, 0);
            boolean isRemoved = this.isNodeRemoved(currentNode);
            if (isRemoved && this.highestRequiredSnapshotVersionPlusOne == 0 && deleteCount < this.numKeysToDeleteOneTime) {
                this.doPhysicalRemove(currentNode, prevNode, nextNode);
                this.logicallyRemovedNodes.remove(currentNode);
                currentNode = nextNode;
                ++deleteCount;
                continue;
            }
            int c = this.compareSegmentAndNode(keySegment, keyOffset, keyLen, currentNode);
            if (c < 0) break;
            if (c > 0) {
                prevNode = currentNode;
                currentNode = nextNode;
                continue;
            }
            S state = function.apply(new SkipListNodePointers(prevNode, currentNode, nextNode), isRemoved);
            return new SkipListIterateAndProcessResult(prevNode, currentNode, true, state);
        }
        return new SkipListIterateAndProcessResult(prevNode, currentNode, false, null);
    }

    private long findPredecessor(MemorySegment keySegment, int keyOffset, int level) {
        return SkipListUtils.findPredecessor(keySegment, keyOffset, level, this.levelIndexHeader, this.spaceAllocator);
    }

    private int compareSegmentAndNode(MemorySegment keySegment, int keyOffset, int keyLen, long targetNode) {
        return SkipListUtils.compareSegmentAndNode(keySegment, keyOffset, targetNode, this.spaceAllocator);
    }

    private int compareNamespaceAndNode(MemorySegment namespaceSegment, int namespaceOffset, int namespaceLen, long targetNode) {
        Node nodeStorage = this.getNodeSegmentAndOffset(targetNode);
        MemorySegment targetSegment = nodeStorage.nodeSegment;
        int offsetInSegment = nodeStorage.nodeOffset;
        int level = SkipListUtils.getLevel(targetSegment, offsetInSegment);
        int targetKeyOffset = offsetInSegment + SkipListUtils.getKeyDataOffset(level);
        return SkipListKeyComparator.compareNamespaceAndNode(namespaceSegment, namespaceOffset, namespaceLen, targetSegment, targetKeyOffset);
    }

    private long updateValueWithCopyOnWrite(long node, byte[] value) {
        int valueSize = value == null ? 0 : value.length;
        int totalValueLen = SkipListUtils.getValueMetaLen() + valueSize;
        long valuePointer = this.allocateSpace(totalValueLen);
        Node nodeStorage = this.getNodeSegmentAndOffset(node);
        MemorySegment nodeSegment = nodeStorage.nodeSegment;
        int offsetInNodeSegment = nodeStorage.nodeOffset;
        long oldValuePointer = SkipListUtils.getValuePointer(nodeSegment, offsetInNodeSegment);
        this.doWriteValue(valuePointer, value, this.stateMapVersion, node, oldValuePointer);
        SkipListUtils.putValuePointer(nodeSegment, offsetInNodeSegment, valuePointer);
        return oldValuePointer;
    }

    private long updateValueWithReplace(long node, byte[] value) {
        int valueSize = value == null ? 0 : value.length;
        int totalValueLen = SkipListUtils.getValueMetaLen() + valueSize;
        long valuePointer = this.allocateSpace(totalValueLen);
        Node nodeStorage = this.getNodeSegmentAndOffset(node);
        MemorySegment nodeSegment = nodeStorage.nodeSegment;
        int offsetInNodeSegment = nodeStorage.nodeOffset;
        long oldValuePointer = SkipListUtils.getValuePointer(nodeSegment, offsetInNodeSegment);
        long nextValuePointer = SkipListUtils.helpGetNextValuePointer(oldValuePointer, this.spaceAllocator);
        this.doWriteValue(valuePointer, value, this.stateMapVersion, node, nextValuePointer);
        SkipListUtils.putValuePointer(nodeSegment, offsetInNodeSegment, valuePointer);
        return oldValuePointer;
    }

    private void doPhysicalRemove(long node, long prevNode, long nextNode) {
        long valuePointer = this.deleteNodeMeta(node, prevNode, nextNode);
        SkipListUtils.removeAllValues(valuePointer, this.spaceAllocator);
    }

    private long doPhysicalRemoveAndGetValue(long node, long prevNode, long nextNode) {
        long valuePointer = this.deleteNodeMeta(node, prevNode, nextNode);
        long nextValuePointer = SkipListUtils.helpGetNextValuePointer(valuePointer, this.spaceAllocator);
        SkipListUtils.removeAllValues(nextValuePointer, this.spaceAllocator);
        return valuePointer;
    }

    private long deleteNodeMeta(long node, long prevNode, long nextNode) {
        this.helpSetNextNode(prevNode, nextNode, 0);
        SkipListUtils.removeLevelIndex(node, this.spaceAllocator, this.levelIndexHeader);
        long valuePointer = SkipListUtils.helpGetValuePointer(node, this.spaceAllocator);
        this.spaceAllocator.free(node);
        --this.totalSize;
        return valuePointer;
    }

    private int getRandomIndexLevel() {
        int x = this.randomSeed;
        x ^= x << 13;
        x ^= x >>> 17;
        x ^= x << 5;
        this.randomSeed = x;
        if ((x & 0x8001) != 0) {
            return 0;
        }
        int level = 1;
        int curMax = this.levelIndexHeader.getLevel();
        x >>>= 1;
        while ((x & 1) != 0) {
            x >>>= 1;
            if (++level <= curMax) continue;
            break;
        }
        return level;
    }

    private void doWriteKey(long node, int level, MemorySegment keySegment, int keyOffset, int keyLen, long valuePointer, long nextNode) {
        Node nodeStorage = this.getNodeSegmentAndOffset(node);
        MemorySegment segment = nodeStorage.nodeSegment;
        int offsetInSegment = nodeStorage.nodeOffset;
        SkipListUtils.putLevelAndNodeStatus(segment, offsetInSegment, level, NodeStatus.PUT);
        SkipListUtils.putKeyLen(segment, offsetInSegment, keyLen);
        SkipListUtils.putValuePointer(segment, offsetInSegment, valuePointer);
        SkipListUtils.putNextKeyPointer(segment, offsetInSegment, nextNode);
        SkipListUtils.putKeyData(segment, offsetInSegment, keySegment, keyOffset, keyLen, level);
    }

    private void doWriteValue(long valuePointer, byte[] value, int version, long keyPointer, long nextValuePointer) {
        Node node = this.getNodeSegmentAndOffset(valuePointer);
        MemorySegment segment = node.nodeSegment;
        int offsetInSegment = node.nodeOffset;
        SkipListUtils.putValueVersion(segment, offsetInSegment, version);
        SkipListUtils.putKeyPointer(segment, offsetInSegment, keyPointer);
        SkipListUtils.putNextValuePointer(segment, offsetInSegment, nextValuePointer);
        SkipListUtils.putValueLen(segment, offsetInSegment, value == null ? 0 : value.length);
        if (value != null) {
            SkipListUtils.putValueData(segment, offsetInSegment, value);
        }
    }

    private long getFirstNodeWithNamespace(MemorySegment namespaceSegment, int namespaceOffset, int namespaceLen) {
        int c;
        int currentLevel = this.levelIndexHeader.getLevel();
        long prevNode = -2L;
        long currentNode = this.helpGetNextNode(prevNode, currentLevel);
        while (true) {
            if (currentNode != -1L && (c = this.compareNamespaceAndNode(namespaceSegment, namespaceOffset, namespaceLen, currentNode)) > 0) {
                prevNode = currentNode;
                currentNode = this.helpGetNextNode(prevNode, currentLevel);
                continue;
            }
            if (--currentLevel < 0) break;
            currentNode = this.helpGetNextNode(prevNode, currentLevel);
        }
        while (currentNode != -1L) {
            if (this.isNodeRemoved(currentNode)) {
                currentNode = this.helpGetNextNode(currentNode, 0);
                continue;
            }
            c = this.compareNamespaceAndNode(namespaceSegment, namespaceOffset, namespaceLen, currentNode);
            if (c == 0) {
                return currentNode;
            }
            if (c >= 0) continue;
            break;
        }
        return -1L;
    }

    private void tryToDeleteNodesPhysically() {
        if (this.highestRequiredSnapshotVersionPlusOne != 0) {
            return;
        }
        int threshold = (int)((float)this.totalSize * this.logicalRemovedKeysRatio);
        int size = this.logicallyRemovedNodes.size();
        if (size > threshold) {
            this.deleteLogicallyRemovedNodes(size - threshold);
        }
    }

    private void deleteLogicallyRemovedNodes(int maxNodes) {
        Iterator<Long> nodeIterator = this.logicallyRemovedNodes.iterator();
        for (int count = 0; count < maxNodes && nodeIterator.hasNext(); ++count) {
            this.deleteNode(nodeIterator.next());
            nodeIterator.remove();
        }
    }

    private void deleteNode(long node) {
        long prevNode = SkipListUtils.findPredecessor(node, 1, this.levelIndexHeader, this.spaceAllocator);
        long currentNode = this.helpGetNextNode(prevNode, 0);
        while (currentNode != node) {
            prevNode = currentNode;
            currentNode = this.helpGetNextNode(prevNode, 0);
        }
        long nextNode = this.helpGetNextNode(currentNode, 0);
        this.doPhysicalRemove(currentNode, prevNode, nextNode);
    }

    private void releaseAllResource() {
        long node = this.levelIndexHeader.getNextNode(0);
        while (node != -1L) {
            long nextNode = this.helpGetNextNode(node, 0);
            long valuePointer = SkipListUtils.helpGetValuePointer(node, this.spaceAllocator);
            this.spaceAllocator.free(node);
            SkipListUtils.removeAllValues(valuePointer, this.spaceAllocator);
            node = nextNode;
        }
        this.totalSize = 0;
        this.logicallyRemovedNodes.clear();
    }

    long getValueForSnapshot(long node, int snapshotVersion) {
        long snapshotValuePointer = -1L;
        ValueVersionIterator versionIterator = new ValueVersionIterator(node);
        while (versionIterator.hasNext()) {
            long valuePointer = versionIterator.getValuePointer();
            int version = versionIterator.next();
            if (version >= snapshotVersion) continue;
            snapshotValuePointer = valuePointer;
            break;
        }
        return snapshotValuePointer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    long getAndPruneValueForSnapshot(long node, int snapshotVersion) {
        boolean isPruning = this.pruningValueNodes.add(node);
        try {
            long snapshotValuePointer = -1L;
            ValueVersionIterator versionIterator = new ValueVersionIterator(node);
            while (versionIterator.hasNext()) {
                long valuePointer = versionIterator.getValuePointer();
                int version = versionIterator.next();
                if (snapshotValuePointer == -1L && version < snapshotVersion) {
                    snapshotValuePointer = valuePointer;
                    if (!isPruning) break;
                }
                if (this.highestFinishedSnapshotVersion < version) continue;
                long nextValuePointer = SkipListUtils.helpGetNextValuePointer(valuePointer, this.spaceAllocator);
                if (nextValuePointer == -1L) break;
                SkipListUtils.helpSetNextValuePointer(valuePointer, -1L, this.spaceAllocator);
                SkipListUtils.removeAllValues(nextValuePointer, this.spaceAllocator);
                break;
            }
            long l = snapshotValuePointer;
            return l;
        }
        finally {
            if (isPruning) {
                this.pruningValueNodes.remove(node);
            }
        }
    }

    private void updateStat() {
        ++this.requestCount;
    }

    private S getNodeInternal(K key, N namespace) {
        MemorySegment keySegment = this.getKeySegment(key, namespace);
        int keyLen = keySegment.size();
        return this.getNode(keySegment, 0, keyLen);
    }

    private MemorySegment getKeySegment(K key, N namespace) {
        return this.skipListKeySerializer.serializeToSegment(key, namespace);
    }

    private boolean isNodeRemoved(long node) {
        return SkipListUtils.isNodeRemoved(node, this.spaceAllocator);
    }

    private void helpSetNextNode(long node, long nextNode, int level) {
        SkipListUtils.helpSetNextNode(node, nextNode, level, this.levelIndexHeader, this.spaceAllocator);
    }

    long helpGetNextNode(long node, int level) {
        return SkipListUtils.helpGetNextNode(node, level, this.levelIndexHeader, this.spaceAllocator);
    }

    int helpGetValueLen(long valuePointer) {
        return SkipListUtils.helpGetValueLen(valuePointer, this.spaceAllocator);
    }

    private NodeStatus helpSetNodeStatus(long node, NodeStatus newStatus) {
        Node nodeStorage = this.getNodeSegmentAndOffset(node);
        MemorySegment segment = nodeStorage.nodeSegment;
        int offsetInSegment = nodeStorage.nodeOffset;
        NodeStatus oldStatus = SkipListUtils.getNodeStatus(segment, offsetInSegment);
        if (oldStatus != newStatus) {
            int level = SkipListUtils.getLevel(segment, offsetInSegment);
            SkipListUtils.putLevelAndNodeStatus(segment, offsetInSegment, level, newStatus);
        }
        return oldStatus;
    }

    private S getNodeStateHelper(long node) {
        Node nodeStorage = this.getNodeSegmentAndOffset(node);
        MemorySegment segment = nodeStorage.nodeSegment;
        int offsetInSegment = nodeStorage.nodeOffset;
        long valuePointer = SkipListUtils.getValuePointer(segment, offsetInSegment);
        return this.helpGetState(valuePointer);
    }

    Tuple2<byte[], byte[]> helpGetBytesForKeyAndNamespace(long node) {
        Node nodeStorage = this.getNodeSegmentAndOffset(node);
        MemorySegment segment = nodeStorage.nodeSegment;
        int offsetInSegment = nodeStorage.nodeOffset;
        int level = SkipListUtils.getLevel(segment, offsetInSegment);
        int keyDataOffset = offsetInSegment + SkipListUtils.getKeyDataOffset(level);
        return this.skipListKeySerializer.getSerializedKeyAndNamespace(segment, keyDataOffset);
    }

    byte[] helpGetBytesForState(long valuePointer) {
        Node node = this.getNodeSegmentAndOffset(valuePointer);
        MemorySegment segment = node.nodeSegment;
        int offsetInSegment = node.nodeOffset;
        int valueLen = SkipListUtils.getValueLen(segment, offsetInSegment);
        MemorySegment valueSegment = MemorySegmentFactory.allocateUnpooledSegment((int)valueLen);
        segment.copyTo(offsetInSegment + SkipListUtils.getValueMetaLen(), valueSegment, 0, valueLen);
        return valueSegment.getArray();
    }

    private K helpGetKey(long node) {
        Node nodeStorage = this.getNodeSegmentAndOffset(node);
        MemorySegment segment = nodeStorage.nodeSegment;
        int offsetInSegment = nodeStorage.nodeOffset;
        int level = SkipListUtils.getLevel(segment, offsetInSegment);
        int keyDataLen = SkipListUtils.getKeyLen(segment, offsetInSegment);
        int keyDataOffset = offsetInSegment + SkipListUtils.getKeyDataOffset(level);
        return this.skipListKeySerializer.deserializeKey(segment, keyDataOffset, keyDataLen);
    }

    S helpGetState(long valuePointer, SkipListValueSerializer<S> serializer) {
        if (valuePointer == -1L) {
            return null;
        }
        Node node = this.getNodeSegmentAndOffset(valuePointer);
        MemorySegment segment = node.nodeSegment;
        int offsetInSegment = node.nodeOffset;
        int valueLen = SkipListUtils.getValueLen(segment, offsetInSegment);
        if (valueLen == 0) {
            return null;
        }
        return serializer.deserializeState(segment, offsetInSegment + SkipListUtils.getValueMetaLen(), valueLen);
    }

    S helpGetState(long valuePointer) {
        return this.helpGetState(valuePointer, this.skipListValueSerializer);
    }

    private StateEntry<K, N, S> helpGetStateEntry(long node) {
        Node nodeStorage = this.getNodeSegmentAndOffset(node);
        MemorySegment segment = nodeStorage.nodeSegment;
        int offsetInSegment = nodeStorage.nodeOffset;
        int level = SkipListUtils.getLevel(segment, offsetInSegment);
        int keyDataLen = SkipListUtils.getKeyLen(segment, offsetInSegment);
        int keyDataOffset = offsetInSegment + SkipListUtils.getKeyDataOffset(level);
        K key = this.skipListKeySerializer.deserializeKey(segment, keyDataOffset, keyDataLen);
        N namespace = this.skipListKeySerializer.deserializeNamespace(segment, keyDataOffset, keyDataLen);
        long valuePointer = SkipListUtils.getValuePointer(segment, offsetInSegment);
        S state = this.helpGetState(valuePointer);
        return new StateEntry.SimpleStateEntry(key, namespace, state);
    }

    public Stream<K> getKeys(N namespace) {
        this.updateStat();
        MemorySegment namespaceSegment = this.skipListKeySerializer.serializeNamespaceToSegment(namespace);
        NamespaceNodeIterator nodeIter = new NamespaceNodeIterator(namespaceSegment, 0, namespaceSegment.size());
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(nodeIter, 0), false).map(this::helpGetKey);
    }

    public int sizeOfNamespace(Object namespace) {
        this.updateStat();
        MemorySegment namespaceSegment = this.skipListKeySerializer.serializeNamespaceToSegment(namespace);
        NamespaceNodeIterator nodeIter = new NamespaceNodeIterator(namespaceSegment, 0, namespaceSegment.size());
        int size = 0;
        while (nodeIter.hasNext()) {
            nodeIter.next();
            ++size;
        }
        return size;
    }

    @Nonnull
    public Iterator<StateEntry<K, N, S>> iterator() {
        this.updateStat();
        final NodeIterator nodeIter = new NodeIterator();
        return new Iterator<StateEntry<K, N, S>>(){

            @Override
            public boolean hasNext() {
                return nodeIter.hasNext();
            }

            @Override
            public StateEntry<K, N, S> next() {
                return CopyOnWriteSkipListStateMap.this.helpGetStateEntry((Long)nodeIter.next());
            }
        };
    }

    public InternalKvState.StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
        return new StateIncrementalVisitor(recommendedMaxNumberOfReturnedRecords);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    public CopyOnWriteSkipListStateMapSnapshot<K, N, S> stateSnapshot() {
        ResourceGuard.Lease lease;
        this.tryToDeleteNodesPhysically();
        try {
            lease = this.resourceGuard.acquireResource();
        }
        catch (Exception e) {
            throw new RuntimeException("Acquire resource failed, and can't make snapshot of state map", e);
        }
        TreeSet<Integer> treeSet = this.snapshotVersions;
        synchronized (treeSet) {
            if (++this.stateMapVersion < 0) {
                throw new IllegalStateException("Version count overflow. Enforcing restart.");
            }
            this.highestRequiredSnapshotVersionPlusOne = this.stateMapVersion;
            this.snapshotVersions.add(this.highestRequiredSnapshotVersionPlusOne);
        }
        return new CopyOnWriteSkipListStateMapSnapshot(this, lease);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseSnapshot(StateMapSnapshot<K, N, S, ? extends StateMap<K, N, S>> snapshotToRelease) {
        CopyOnWriteSkipListStateMapSnapshot snapshot = (CopyOnWriteSkipListStateMapSnapshot)snapshotToRelease;
        int snapshotVersion = snapshot.getSnapshotVersion();
        Preconditions.checkArgument((boolean)snapshot.isOwner(this), (Object)"Cannot release snapshot which is owned by a different state map.");
        TreeSet<Integer> treeSet = this.snapshotVersions;
        synchronized (treeSet) {
            Preconditions.checkState((boolean)this.snapshotVersions.remove(snapshotVersion), (Object)"Attempt to release unknown snapshot version");
            this.highestRequiredSnapshotVersionPlusOne = this.snapshotVersions.isEmpty() ? 0 : this.snapshotVersions.last();
            this.highestFinishedSnapshotVersion = this.snapshotVersions.isEmpty() ? this.stateMapVersion : this.snapshotVersions.first() - 1;
        }
    }

    LevelIndexHeader getLevelIndexHeader() {
        return this.levelIndexHeader;
    }

    int getStateMapVersion() {
        return this.stateMapVersion;
    }

    @VisibleForTesting
    int getHighestRequiredSnapshotVersionPlusOne() {
        return this.highestRequiredSnapshotVersionPlusOne;
    }

    @VisibleForTesting
    int getHighestFinishedSnapshotVersion() {
        return this.highestFinishedSnapshotVersion;
    }

    @VisibleForTesting
    Set<Integer> getSnapshotVersions() {
        return this.snapshotVersions;
    }

    @VisibleForTesting
    Set<Long> getLogicallyRemovedNodes() {
        return this.logicallyRemovedNodes;
    }

    @VisibleForTesting
    Set<Long> getPruningValueNodes() {
        return this.pruningValueNodes;
    }

    @VisibleForTesting
    ResourceGuard getResourceGuard() {
        return this.resourceGuard;
    }

    boolean isClosed() {
        return this.closed.get();
    }

    @Override
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            LOG.warn("State map has been closed");
            return;
        }
        this.resourceGuard.close();
        this.releaseAllResource();
    }

    private Node getNodeSegmentAndOffset(long node) {
        Chunk nodeChunk = this.spaceAllocator.getChunkById(SpaceUtils.getChunkIdByAddress(node));
        int offsetInNodeChunk = SpaceUtils.getChunkOffsetByAddress(node);
        MemorySegment nodeSegment = nodeChunk.getMemorySegment(offsetInNodeChunk);
        int offsetInNodeSegment = nodeChunk.getOffsetInSegment(offsetInNodeChunk);
        return new Node(nodeSegment, offsetInNodeSegment);
    }

    private class Node {
        MemorySegment nodeSegment;
        int nodeOffset;

        Node(MemorySegment nodeSegment, int nodeOffset) {
            this.nodeSegment = nodeSegment;
            this.nodeOffset = nodeOffset;
        }
    }

    private class SkipListNodePointers {
        long prevNode;
        long currentNode;
        long nextNode;

        SkipListNodePointers(long prevNode, long currentNode, long nextNode) {
            this.prevNode = prevNode;
            this.currentNode = currentNode;
            this.nextNode = nextNode;
        }
    }

    private class SkipListIterateAndProcessResult {
        long prevNode;
        long currentNode;
        boolean isKeyFound;
        S state;

        SkipListIterateAndProcessResult(long prevNode, long currentNode, boolean isKeyFound, S state) {
            this.prevNode = prevNode;
            this.currentNode = currentNode;
            this.isKeyFound = isKeyFound;
            this.state = state;
        }
    }

    private class ValueVersionIterator
    implements Iterator<Integer> {
        private long valuePointer;

        ValueVersionIterator(long node) {
            this.valuePointer = SkipListUtils.helpGetValuePointer(node, CopyOnWriteSkipListStateMap.this.spaceAllocator);
        }

        @Override
        public boolean hasNext() {
            return this.valuePointer != -1L;
        }

        @Override
        public Integer next() {
            int version = SkipListUtils.helpGetValueVersion(this.valuePointer, CopyOnWriteSkipListStateMap.this.spaceAllocator);
            this.valuePointer = SkipListUtils.helpGetNextValuePointer(this.valuePointer, CopyOnWriteSkipListStateMap.this.spaceAllocator);
            return version;
        }

        long getValuePointer() {
            return this.valuePointer;
        }
    }

    class StateIncrementalVisitor
    implements InternalKvState.StateIncrementalVisitor<K, N, S> {
        private final int recommendedMaxNumberOfReturnedRecords;
        private MemorySegment nextKeySegment;
        private int nextKeyOffset;
        private final Collection<StateEntry<K, N, S>> entryToReturn = new ArrayList(5);

        StateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
            this.recommendedMaxNumberOfReturnedRecords = recommendedMaxNumberOfReturnedRecords;
            this.init();
        }

        private void init() {
            long node = this.getNextNode(-2L);
            if (node != -1L) {
                this.setKeySegment(node);
            }
        }

        private long findNextNode(MemorySegment segment, int offset) {
            long node = CopyOnWriteSkipListStateMap.this.findPredecessor(segment, offset, 0);
            return this.getNextNode(node);
        }

        private long getNextNode(long node) {
            long n = CopyOnWriteSkipListStateMap.this.helpGetNextNode(node, 0);
            while (n != -1L && CopyOnWriteSkipListStateMap.this.isNodeRemoved(n)) {
                n = CopyOnWriteSkipListStateMap.this.helpGetNextNode(n, 0);
            }
            return n;
        }

        private void updateNextKeySegment(long node) {
            if (node != -1L && (node = this.getNextNode(node)) != -1L) {
                this.setKeySegment(node);
                return;
            }
            this.nextKeySegment = null;
        }

        private void setKeySegment(long node) {
            Node nodeStorage = CopyOnWriteSkipListStateMap.this.getNodeSegmentAndOffset(node);
            MemorySegment segment = nodeStorage.nodeSegment;
            int offsetInSegment = nodeStorage.nodeOffset;
            int level = SkipListUtils.getLevel(segment, offsetInSegment);
            int keyLen = SkipListUtils.getKeyLen(segment, offsetInSegment);
            int keyDataOffset = offsetInSegment + SkipListUtils.getKeyDataOffset(level);
            MemorySegment nextKeySegment = MemorySegmentFactory.allocateUnpooledSegment((int)keyLen);
            segment.copyTo(keyDataOffset, nextKeySegment, 0, keyLen);
            this.nextKeySegment = nextKeySegment;
            this.nextKeyOffset = 0;
        }

        public boolean hasNext() {
            return !CopyOnWriteSkipListStateMap.this.isClosed() && this.nextKeySegment != null && this.findNextNode(this.nextKeySegment, this.nextKeyOffset) != -1L;
        }

        public Collection<StateEntry<K, N, S>> nextEntries() {
            if (this.nextKeySegment == null) {
                return Collections.emptyList();
            }
            long node = this.findNextNode(this.nextKeySegment, this.nextKeyOffset);
            if (node == -1L) {
                this.nextKeySegment = null;
                return Collections.emptyList();
            }
            this.entryToReturn.clear();
            this.entryToReturn.add(CopyOnWriteSkipListStateMap.this.helpGetStateEntry(node));
            for (int n = 1; n < this.recommendedMaxNumberOfReturnedRecords && (node = this.getNextNode(node)) != -1L; ++n) {
                this.entryToReturn.add(CopyOnWriteSkipListStateMap.this.helpGetStateEntry(node));
            }
            this.updateNextKeySegment(node);
            return this.entryToReturn;
        }

        public void remove(StateEntry<K, N, S> stateEntry) {
            CopyOnWriteSkipListStateMap.this.remove(stateEntry.getKey(), stateEntry.getNamespace());
        }

        public void update(StateEntry<K, N, S> stateEntry, S newValue) {
            CopyOnWriteSkipListStateMap.this.put(stateEntry.getKey(), stateEntry.getNamespace(), newValue);
        }
    }

    class NamespaceNodeIterator
    implements Iterator<Long> {
        private final MemorySegment namespaceSegment;
        private final int namespaceOffset;
        private final int namespaceLen;
        private long nextNode;

        NamespaceNodeIterator(MemorySegment namespaceSegment, int namespaceOffset, int namespaceLen) {
            this.namespaceSegment = namespaceSegment;
            this.namespaceOffset = namespaceOffset;
            this.namespaceLen = namespaceLen;
            this.nextNode = CopyOnWriteSkipListStateMap.this.getFirstNodeWithNamespace(namespaceSegment, namespaceOffset, namespaceLen);
        }

        private long getNextNode(long node) {
            long n = CopyOnWriteSkipListStateMap.this.helpGetNextNode(node, 0);
            while (n != -1L && CopyOnWriteSkipListStateMap.this.isNodeRemoved(n)) {
                n = CopyOnWriteSkipListStateMap.this.helpGetNextNode(n, 0);
            }
            if (n != -1L && CopyOnWriteSkipListStateMap.this.compareNamespaceAndNode(this.namespaceSegment, this.namespaceOffset, this.namespaceLen, n) == 0) {
                return n;
            }
            return -1L;
        }

        @Override
        public boolean hasNext() {
            return this.nextNode != -1L;
        }

        @Override
        public Long next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            long node = this.nextNode;
            this.nextNode = this.getNextNode(node);
            return node;
        }
    }

    class NodeIterator
    implements Iterator<Long> {
        private long nextNode = this.getNextNode(-2L);

        NodeIterator() {
        }

        private long getNextNode(long node) {
            long n = CopyOnWriteSkipListStateMap.this.helpGetNextNode(node, 0);
            while (n != -1L && CopyOnWriteSkipListStateMap.this.isNodeRemoved(n)) {
                n = CopyOnWriteSkipListStateMap.this.helpGetNextNode(n, 0);
            }
            return n;
        }

        @Override
        public boolean hasNext() {
            return this.nextNode != -1L;
        }

        @Override
        public Long next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            long node = this.nextNode;
            this.nextNode = this.getNextNode(node);
            return node;
        }
    }
}

