/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.restore;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.restore.RocksDBHandle;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation;
import org.apache.flink.runtime.state.restore.KeyGroup;
import org.apache.flink.runtime.state.restore.KeyGroupEntry;
import org.apache.flink.runtime.state.restore.SavepointRestoreResult;
import org.apache.flink.runtime.state.restore.ThrowingIterator;
import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDBException;

public class RocksDBFullRestoreOperation<K>
implements RocksDBRestoreOperation {
    private final FullSnapshotRestoreOperation<K> savepointRestoreOperation;
    private final long writeBatchSize;
    private final RocksDBHandle rocksHandle;

    public RocksDBFullRestoreOperation(KeyGroupRange keyGroupRange, ClassLoader userCodeClassLoader, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, StateSerializerProvider<K> keySerializerProvider, File instanceRocksDBPath, DBOptions dbOptions, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> restoreStateHandles, @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nonnegative long writeBatchSize, Long writeBufferManagerCapacity) {
        this.writeBatchSize = writeBatchSize;
        this.rocksHandle = new RocksDBHandle(kvStateInformation, instanceRocksDBPath, dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, ttlCompactFiltersManager, writeBufferManagerCapacity);
        this.savepointRestoreOperation = new FullSnapshotRestoreOperation(keyGroupRange, userCodeClassLoader, restoreStateHandles, keySerializerProvider);
    }

    @Override
    public RocksDBRestoreResult restore() throws IOException, StateMigrationException, RocksDBException {
        this.rocksHandle.openDB();
        try (ThrowingIterator restore = this.savepointRestoreOperation.restore();){
            while (restore.hasNext()) {
                this.applyRestoreResult((SavepointRestoreResult)restore.next());
            }
        }
        return new RocksDBRestoreResult(this.rocksHandle.getDb(), this.rocksHandle.getDefaultColumnFamilyHandle(), this.rocksHandle.getNativeMetricMonitor(), -1L, null, null);
    }

    private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult) throws IOException, RocksDBException, StateMigrationException {
        List restoredMetaInfos = savepointRestoreResult.getStateMetaInfoSnapshots();
        HashMap<Integer, ColumnFamilyHandle> columnFamilyHandles = new HashMap<Integer, ColumnFamilyHandle>();
        for (int i = 0; i < restoredMetaInfos.size(); ++i) {
            StateMetaInfoSnapshot restoredMetaInfo = (StateMetaInfoSnapshot)restoredMetaInfos.get(i);
            RocksDBKeyedStateBackend.RocksDbKvStateInfo registeredStateCFHandle = this.rocksHandle.getOrRegisterStateColumnFamilyHandle(null, restoredMetaInfo);
            columnFamilyHandles.put(i, registeredStateCFHandle.columnFamilyHandle);
        }
        try (ThrowingIterator keyGroups = savepointRestoreResult.getRestoredKeyGroups();){
            this.restoreKVStateData((ThrowingIterator<KeyGroup>)keyGroups, columnFamilyHandles);
        }
    }

    private void restoreKVStateData(ThrowingIterator<KeyGroup> keyGroups, Map<Integer, ColumnFamilyHandle> columnFamilies) throws IOException, RocksDBException, StateMigrationException {
        try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), this.writeBatchSize);){
            ColumnFamilyHandle handle = null;
            while (keyGroups.hasNext()) {
                KeyGroup keyGroup = (KeyGroup)keyGroups.next();
                ThrowingIterator groupEntries = keyGroup.getKeyGroupEntries();
                Throwable throwable = null;
                try {
                    int oldKvStateId = -1;
                    while (groupEntries.hasNext()) {
                        KeyGroupEntry groupEntry = (KeyGroupEntry)groupEntries.next();
                        int kvStateId = groupEntry.getKvStateId();
                        if (kvStateId != oldKvStateId) {
                            oldKvStateId = kvStateId;
                            handle = columnFamilies.get(kvStateId);
                        }
                        writeBatchWrapper.put(handle, groupEntry.getKey(), groupEntry.getValue());
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (groupEntries == null) continue;
                    if (throwable != null) {
                        try {
                            groupEntries.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    groupEntries.close();
                }
            }
        }
    }

    @Override
    public void close() throws Exception {
        this.rocksHandle.close();
    }
}

