/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.restore;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation;
import org.apache.flink.runtime.state.restore.KeyGroup;
import org.apache.flink.runtime.state.restore.KeyGroupEntry;
import org.apache.flink.runtime.state.restore.SavepointRestoreResult;
import org.apache.flink.runtime.state.restore.ThrowingIterator;
import org.apache.flink.state.forst.ForStDBTtlCompactFiltersManager;
import org.apache.flink.state.forst.ForStDBWriteBatchWrapper;
import org.apache.flink.state.forst.ForStNativeMetricOptions;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.restore.ForStHandle;
import org.apache.flink.state.forst.restore.ForStRestoreOperation;
import org.apache.flink.state.forst.restore.ForStRestoreResult;
import org.apache.flink.util.StateMigrationException;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.ColumnFamilyOptions;
import org.forstdb.DBOptions;
import org.forstdb.RocksDBException;

public class ForStHeapTimersFullRestoreOperation<K>
implements ForStRestoreOperation {
    private final FullSnapshotRestoreOperation<K> savepointRestoreOperation;
    private final LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
    private final HeapPriorityQueueSetFactory priorityQueueFactory;
    private final int numberOfKeyGroups;
    private final DataInputDeserializer deserializer = new DataInputDeserializer();
    private final ForStHandle rocksHandle;
    private final KeyGroupRange keyGroupRange;
    private final int keyGroupPrefixBytes;
    private final ICloseableRegistry cancelStreamRegistryForRestore;

    public ForStHeapTimersFullRestoreOperation(KeyGroupRange keyGroupRange, int numberOfKeyGroups, ClassLoader userCodeClassLoader, Map<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, HeapPriorityQueueSetFactory priorityQueueFactory, StateSerializerProvider<K> keySerializerProvider, Path instanceRocksDBPath, DBOptions dbOptions, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, ForStNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, @Nonnull ForStDBTtlCompactFiltersManager ttlCompactFiltersManager, @Nonnegative long writeBatchSize, Long writeBufferManagerCapacity, @Nonnull Collection<KeyedStateHandle> restoreStateHandles, ICloseableRegistry cancelStreamRegistryForRestore) {
        this.rocksHandle = new ForStHandle(kvStateInformation, instanceRocksDBPath, dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, ttlCompactFiltersManager, writeBufferManagerCapacity);
        this.savepointRestoreOperation = new FullSnapshotRestoreOperation(keyGroupRange, userCodeClassLoader, restoreStateHandles, keySerializerProvider);
        this.registeredPQStates = registeredPQStates;
        this.priorityQueueFactory = priorityQueueFactory;
        this.numberOfKeyGroups = numberOfKeyGroups;
        this.keyGroupRange = keyGroupRange;
        this.keyGroupPrefixBytes = CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix((int)numberOfKeyGroups);
        this.cancelStreamRegistryForRestore = cancelStreamRegistryForRestore;
    }

    @Override
    public ForStRestoreResult restore() throws IOException, StateMigrationException, RocksDBException {
        this.rocksHandle.openDB();
        try (ThrowingIterator restore = this.savepointRestoreOperation.restore();){
            while (restore.hasNext()) {
                this.applyRestoreResult((SavepointRestoreResult)restore.next());
            }
        }
        return new ForStRestoreResult(this.rocksHandle.getDb(), this.rocksHandle.getDefaultColumnFamilyHandle(), this.rocksHandle.getNativeMetricMonitor(), -1L, null, null);
    }

    private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult) throws IOException, RocksDBException, StateMigrationException {
        List restoredMetaInfos = savepointRestoreResult.getStateMetaInfoSnapshots();
        HashMap<Integer, ColumnFamilyHandle> columnFamilyHandles = new HashMap<Integer, ColumnFamilyHandle>();
        HashMap restoredPQStates = new HashMap();
        for (int i = 0; i < restoredMetaInfos.size(); ++i) {
            StateMetaInfoSnapshot restoredMetaInfo = (StateMetaInfoSnapshot)restoredMetaInfos.get(i);
            if (restoredMetaInfo.getBackendStateType() == StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE) {
                String stateName = restoredMetaInfo.getName();
                HeapPriorityQueueSnapshotRestoreWrapper queueWrapper = this.registeredPQStates.computeIfAbsent(stateName, key -> this.createInternal(new RegisteredPriorityQueueStateBackendMetaInfo(restoredMetaInfo)));
                restoredPQStates.put(i, queueWrapper);
                continue;
            }
            ForStOperationUtils.ForStKvStateInfo registeredStateCFHandle = this.rocksHandle.getOrRegisterStateColumnFamilyHandle(null, restoredMetaInfo, this.cancelStreamRegistryForRestore);
            columnFamilyHandles.put(i, registeredStateCFHandle.columnFamilyHandle);
        }
        try (ThrowingIterator keyGroups = savepointRestoreResult.getRestoredKeyGroups();){
            this.restoreKVStateData((ThrowingIterator<KeyGroup>)keyGroups, columnFamilyHandles, restoredPQStates);
        }
    }

    private void restoreKVStateData(ThrowingIterator<KeyGroup> keyGroups, Map<Integer, ColumnFamilyHandle> columnFamilies, Map<Integer, HeapPriorityQueueSnapshotRestoreWrapper<?>> restoredPQStates) throws IOException, RocksDBException, StateMigrationException {
        try (ForStDBWriteBatchWrapper writeBatchWrapper = new ForStDBWriteBatchWrapper(this.rocksHandle.getDb(), 0L);
             Closeable ignored = this.cancelStreamRegistryForRestore.registerCloseableTemporarily(writeBatchWrapper.getCancelCloseable());){
            HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement> restoredPQ = null;
            ColumnFamilyHandle handle = null;
            while (keyGroups.hasNext()) {
                KeyGroup keyGroup = (KeyGroup)keyGroups.next();
                ThrowingIterator groupEntries = keyGroup.getKeyGroupEntries();
                try {
                    int oldKvStateId = -1;
                    while (groupEntries.hasNext()) {
                        KeyGroupEntry groupEntry = (KeyGroupEntry)groupEntries.next();
                        int kvStateId = groupEntry.getKvStateId();
                        if (kvStateId != oldKvStateId) {
                            oldKvStateId = kvStateId;
                            handle = columnFamilies.get(kvStateId);
                            restoredPQ = this.getRestoredPQ(restoredPQStates, kvStateId);
                        }
                        if (restoredPQ != null) {
                            this.restoreQueueElement(restoredPQ, groupEntry);
                            continue;
                        }
                        if (handle != null) {
                            writeBatchWrapper.put(handle, groupEntry.getKey(), groupEntry.getValue());
                            continue;
                        }
                        throw new IllegalStateException("Unknown state id: " + kvStateId);
                    }
                }
                finally {
                    if (groupEntries == null) continue;
                    groupEntries.close();
                }
            }
        }
    }

    private void restoreQueueElement(HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement> restoredPQ, KeyGroupEntry groupEntry) throws IOException {
        this.deserializer.setBuffer(groupEntry.getKey());
        this.deserializer.skipBytesToRead(this.keyGroupPrefixBytes);
        HeapPriorityQueueElement queueElement = (HeapPriorityQueueElement)restoredPQ.getMetaInfo().getElementSerializer().deserialize((DataInputView)this.deserializer);
        restoredPQ.getPriorityQueue().add(queueElement);
    }

    private HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement> getRestoredPQ(Map<Integer, HeapPriorityQueueSnapshotRestoreWrapper<?>> restoredPQStates, int kvStateId) {
        return restoredPQStates.get(kvStateId);
    }

    private <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> HeapPriorityQueueSnapshotRestoreWrapper<T> createInternal(RegisteredPriorityQueueStateBackendMetaInfo metaInfo) {
        String stateName = metaInfo.getName();
        HeapPriorityQueueSet priorityQueue = this.priorityQueueFactory.create(stateName, metaInfo.getElementSerializer());
        return new HeapPriorityQueueSnapshotRestoreWrapper(priorityQueue, metaInfo, KeyExtractorFunction.forKeyedObjects(), this.keyGroupRange, this.numberOfKeyGroups);
    }

    @Override
    public void close() throws Exception {
        this.rocksHandle.close();
    }
}

