/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.FullSnapshotResources;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyValueStateIterator;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapKeyValueStateIterator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.heap.StateUID;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

@Internal
final class HeapSnapshotResources<K>
implements FullSnapshotResources<K> {
    private final List<StateMetaInfoSnapshot> metaInfoSnapshots;
    private final Map<StateUID, StateSnapshot> cowStateStableSnapshots;
    private final StreamCompressionDecorator streamCompressionDecorator;
    private final Map<StateUID, Integer> stateNamesToId;
    private final KeyGroupRange keyGroupRange;
    private final TypeSerializer<K> keySerializer;
    private final int totalKeyGroups;

    private HeapSnapshotResources(List<StateMetaInfoSnapshot> metaInfoSnapshots, Map<StateUID, StateSnapshot> cowStateStableSnapshots, StreamCompressionDecorator streamCompressionDecorator, Map<StateUID, Integer> stateNamesToId, KeyGroupRange keyGroupRange, TypeSerializer<K> keySerializer, int totalKeyGroups) {
        this.metaInfoSnapshots = metaInfoSnapshots;
        this.cowStateStableSnapshots = cowStateStableSnapshots;
        this.streamCompressionDecorator = streamCompressionDecorator;
        this.stateNamesToId = stateNamesToId;
        this.keyGroupRange = keyGroupRange;
        this.keySerializer = keySerializer;
        this.totalKeyGroups = totalKeyGroups;
    }

    public static <K> HeapSnapshotResources<K> create(Map<String, StateTable<K, ?, ?>> registeredKVStates, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, StreamCompressionDecorator streamCompressionDecorator, KeyGroupRange keyGroupRange, TypeSerializer<K> keySerializer, int totalKeyGroups) {
        if (registeredKVStates.isEmpty() && registeredPQStates.isEmpty()) {
            return new HeapSnapshotResources<K>(Collections.emptyList(), Collections.emptyMap(), streamCompressionDecorator, Collections.emptyMap(), keyGroupRange, keySerializer, totalKeyGroups);
        }
        int numStates = registeredKVStates.size() + registeredPQStates.size();
        Preconditions.checkState(numStates <= Short.MAX_VALUE, "Too many states: " + numStates + ". Currently at most 32767 states are supported");
        ArrayList<StateMetaInfoSnapshot> metaInfoSnapshots = new ArrayList<StateMetaInfoSnapshot>(numStates);
        HashMap<StateUID, Integer> stateNamesToId = CollectionUtil.newHashMapWithExpectedSize(numStates);
        HashMap<StateUID, StateSnapshot> cowStateStableSnapshots = CollectionUtil.newHashMapWithExpectedSize(numStates);
        HeapSnapshotResources.processSnapshotMetaInfoForAllStates(metaInfoSnapshots, cowStateStableSnapshots, stateNamesToId, registeredKVStates, StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
        HeapSnapshotResources.processSnapshotMetaInfoForAllStates(metaInfoSnapshots, cowStateStableSnapshots, stateNamesToId, registeredPQStates, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
        return new HeapSnapshotResources<K>(metaInfoSnapshots, cowStateStableSnapshots, streamCompressionDecorator, stateNamesToId, keyGroupRange, keySerializer, totalKeyGroups);
    }

    private static void processSnapshotMetaInfoForAllStates(List<StateMetaInfoSnapshot> metaInfoSnapshots, Map<StateUID, StateSnapshot> cowStateStableSnapshots, Map<StateUID, Integer> stateNamesToId, Map<String, ? extends StateSnapshotRestore> registeredStates, StateMetaInfoSnapshot.BackendStateType stateType) {
        for (Map.Entry<String, ? extends StateSnapshotRestore> kvState : registeredStates.entrySet()) {
            StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
            stateNamesToId.put(stateUid, stateNamesToId.size());
            StateSnapshotRestore state = kvState.getValue();
            if (null == state) continue;
            StateSnapshot stateSnapshot = state.stateSnapshot();
            metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
            cowStateStableSnapshots.put(stateUid, stateSnapshot);
        }
    }

    @Override
    public void release() {
        for (StateSnapshot stateSnapshot : this.cowStateStableSnapshots.values()) {
            stateSnapshot.release();
        }
    }

    @Override
    public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
        return this.metaInfoSnapshots;
    }

    @Override
    public KeyValueStateIterator createKVStateIterator() throws IOException {
        return new HeapKeyValueStateIterator(this.keyGroupRange, this.keySerializer, this.totalKeyGroups, this.stateNamesToId, this.cowStateStableSnapshots);
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    @Override
    public StreamCompressionDecorator getStreamCompressionDecorator() {
        return this.streamCompressionDecorator;
    }

    public Map<StateUID, StateSnapshot> getCowStateStableSnapshots() {
        return this.cowStateStableSnapshots;
    }

    public Map<StateUID, Integer> getStateNamesToId() {
        return this.stateNamesToId;
    }
}

