/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.util.Preconditions;

@Internal
public class NestedMapsStateTable<K, N, S>
extends StateTable<K, N, S> {
    private final Map<N, Map<K, S>>[] state;
    private final int keyGroupOffset;

    public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
        super(keyContext, metaInfo);
        this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();
        Map[] state = new Map[keyContext.getKeyGroupRange().getNumberOfKeyGroups()];
        this.state = state;
    }

    @VisibleForTesting
    public Map<N, Map<K, S>>[] getState() {
        return this.state;
    }

    @VisibleForTesting
    Map<N, Map<K, S>> getMapForKeyGroup(int keyGroupIndex) {
        int pos = this.indexToOffset(keyGroupIndex);
        if (pos >= 0 && pos < this.state.length) {
            return this.state[pos];
        }
        return null;
    }

    private void setMapForKeyGroup(int keyGroupId, Map<N, Map<K, S>> map) {
        try {
            this.state[this.indexToOffset((int)keyGroupId)] = map;
        }
        catch (ArrayIndexOutOfBoundsException e) {
            throw new IllegalArgumentException("Key group index out of range of key group range [" + this.keyGroupOffset + ", " + (this.keyGroupOffset + this.state.length) + ").");
        }
    }

    private int indexToOffset(int index) {
        return index - this.keyGroupOffset;
    }

    @Override
    public int size() {
        int count = 0;
        for (Map<N, Map<N, S>> map : this.state) {
            if (null == map) continue;
            for (Map<K, S> keyMap : map.values()) {
                if (null == keyMap) continue;
                count += keyMap.size();
            }
        }
        return count;
    }

    @Override
    public S get(N namespace) {
        return this.get(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), namespace);
    }

    @Override
    public boolean containsKey(N namespace) {
        return this.containsKey(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), namespace);
    }

    @Override
    public void put(N namespace, S state) {
        this.put(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), namespace, state);
    }

    @Override
    public S putAndGetOld(N namespace, S state) {
        return this.putAndGetOld(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), namespace, state);
    }

    @Override
    public void remove(N namespace) {
        this.remove(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), namespace);
    }

    @Override
    public S removeAndGetOld(N namespace) {
        return this.removeAndGetOld(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), namespace);
    }

    @Override
    public S get(K key, N namespace) {
        int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, this.keyContext.getNumberOfKeyGroups());
        return this.get(key, keyGroup, namespace);
    }

    private boolean containsKey(K key, int keyGroupIndex, N namespace) {
        this.checkKeyNamespacePreconditions(key, namespace);
        Map<N, Map<K, S>> namespaceMap = this.getMapForKeyGroup(keyGroupIndex);
        if (namespaceMap == null) {
            return false;
        }
        Map<K, S> keyedMap = namespaceMap.get(namespace);
        return keyedMap != null && keyedMap.containsKey(key);
    }

    S get(K key, int keyGroupIndex, N namespace) {
        this.checkKeyNamespacePreconditions(key, namespace);
        Map<N, Map<K, S>> namespaceMap = this.getMapForKeyGroup(keyGroupIndex);
        if (namespaceMap == null) {
            return null;
        }
        Map<K, S> keyedMap = namespaceMap.get(namespace);
        if (keyedMap == null) {
            return null;
        }
        return keyedMap.get(key);
    }

    @Override
    public void put(K key, int keyGroupIndex, N namespace, S value) {
        this.putAndGetOld(key, keyGroupIndex, namespace, value);
    }

    private S putAndGetOld(K key, int keyGroupIndex, N namespace, S value) {
        Map<K, S> keyedMap;
        this.checkKeyNamespacePreconditions(key, namespace);
        Map<N, Map<N, S>> namespaceMap = this.getMapForKeyGroup(keyGroupIndex);
        if (namespaceMap == null) {
            namespaceMap = new HashMap<N, Map<K, S>>();
            this.setMapForKeyGroup(keyGroupIndex, namespaceMap);
        }
        if ((keyedMap = namespaceMap.get(namespace)) == null) {
            keyedMap = new HashMap<K, S>();
            namespaceMap.put(namespace, keyedMap);
        }
        return keyedMap.put(key, value);
    }

    private void remove(K key, int keyGroupIndex, N namespace) {
        this.removeAndGetOld(key, keyGroupIndex, namespace);
    }

    private S removeAndGetOld(K key, int keyGroupIndex, N namespace) {
        this.checkKeyNamespacePreconditions(key, namespace);
        Map<N, Map<K, S>> namespaceMap = this.getMapForKeyGroup(keyGroupIndex);
        if (namespaceMap == null) {
            return null;
        }
        Map<K, S> keyedMap = namespaceMap.get(namespace);
        if (keyedMap == null) {
            return null;
        }
        S removed = keyedMap.remove(key);
        if (keyedMap.isEmpty()) {
            namespaceMap.remove(namespace);
        }
        return removed;
    }

    private void checkKeyNamespacePreconditions(K key, N namespace) {
        Preconditions.checkNotNull(key, (String)"No key set. This method should not be called outside of a keyed context.");
        Preconditions.checkNotNull(namespace, (String)"Provided namespace is null.");
    }

    @Override
    public int sizeOfNamespace(Object namespace) {
        int count = 0;
        for (Map<N, Map<N, S>> map : this.state) {
            if (null == map) continue;
            Map<K, S> keyMap = map.get(namespace);
            count += keyMap != null ? keyMap.size() : 0;
        }
        return count;
    }

    @Override
    public <T> void transform(N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
        Map keyedMap;
        Object key = this.keyContext.getCurrentKey();
        this.checkKeyNamespacePreconditions(key, namespace);
        int keyGroupIndex = this.keyContext.getCurrentKeyGroupIndex();
        Map<N, Map<N, S>> namespaceMap = this.getMapForKeyGroup(keyGroupIndex);
        if (namespaceMap == null) {
            namespaceMap = new HashMap<N, Map<K, S>>();
            this.setMapForKeyGroup(keyGroupIndex, namespaceMap);
        }
        if ((keyedMap = namespaceMap.get(namespace)) == null) {
            keyedMap = new HashMap<K, S>();
            namespaceMap.put(namespace, keyedMap);
        }
        keyedMap.put(key, transformation.apply(keyedMap.get(key), value));
    }

    private static <K, N, S> int countMappingsInKeyGroup(Map<N, Map<K, S>> keyGroupMap) {
        int count = 0;
        for (Map<K, S> namespaceMap : keyGroupMap.values()) {
            count += namespaceMap.size();
        }
        return count;
    }

    @Override
    public NestedMapsStateTableSnapshot<K, N, S> createSnapshot() {
        return new NestedMapsStateTableSnapshot(this);
    }

    static class NestedMapsStateTableSnapshot<K, N, S>
    extends AbstractStateTableSnapshot<K, N, S, NestedMapsStateTable<K, N, S>> {
        NestedMapsStateTableSnapshot(NestedMapsStateTable<K, N, S> owningTable) {
            super(owningTable);
        }

        @Override
        public void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException {
            Map keyGroupMap = ((NestedMapsStateTable)this.owningStateTable).getMapForKeyGroup(keyGroupId);
            if (null != keyGroupMap) {
                TypeSerializer keySerializer = ((NestedMapsStateTable)this.owningStateTable).keyContext.getKeySerializer();
                TypeSerializer namespaceSerializer = ((NestedMapsStateTable)this.owningStateTable).metaInfo.getNamespaceSerializer();
                TypeSerializer stateSerializer = ((NestedMapsStateTable)this.owningStateTable).metaInfo.getStateSerializer();
                dov.writeInt(NestedMapsStateTable.countMappingsInKeyGroup(keyGroupMap));
                for (Map.Entry namespaceEntry : keyGroupMap.entrySet()) {
                    Object namespace = namespaceEntry.getKey();
                    Map namespaceMap = namespaceEntry.getValue();
                    for (Map.Entry keyEntry : namespaceMap.entrySet()) {
                        namespaceSerializer.serialize(namespace, dov);
                        keySerializer.serialize(keyEntry.getKey(), dov);
                        stateSerializer.serialize(keyEntry.getValue(), dov);
                    }
                }
            } else {
                dov.writeInt(0);
            }
        }
    }
}

