/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.restore;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.forst.ForStDBTtlCompactFiltersManager;
import org.apache.flink.state.forst.ForStDBWriteBatchWrapper;
import org.apache.flink.state.forst.ForStIncrementalCheckpointUtils;
import org.apache.flink.state.forst.ForStNativeMetricOptions;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.restore.ForStHandle;
import org.apache.flink.state.forst.restore.ForStRestoreOperation;
import org.apache.flink.state.forst.restore.ForStRestoreResult;
import org.apache.flink.state.forst.sync.ForStIteratorWrapper;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.function.RunnableWithException;
import org.forstdb.Checkpoint;
import org.forstdb.ColumnFamilyDescriptor;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.ColumnFamilyOptions;
import org.forstdb.DBOptions;
import org.forstdb.ExportImportFilesMetaData;
import org.forstdb.ReadOptions;
import org.forstdb.RocksDB;
import org.forstdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForStIncrementalRestoreOperation<K>
implements ForStRestoreOperation {
    private static final Logger logger = LoggerFactory.getLogger(ForStIncrementalRestoreOperation.class);
    private final String operatorIdentifier;
    private final SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> restoredSstFiles;
    private final ForStHandle forstHandle;
    private final List<IncrementalRemoteKeyedStateHandle> restoreStateHandles;
    private final CloseableRegistry cancelStreamRegistry;
    private final KeyGroupRange keyGroupRange;
    private final int keyGroupPrefixBytes;
    private final ForStResourceContainer optionsContainer;
    private final Path forstBasePath;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final ClassLoader userCodeClassLoader;
    private final StateBackend.CustomInitializationMetrics customInitializationMetrics;
    private long lastCompletedCheckpointId;
    private final long writeBatchSize;
    private final double overlapFractionThreshold;
    private final boolean useIngestDbRestoreMode;
    private final boolean useDeleteFilesInRange;
    private UUID backendUID;
    private boolean isKeySerializerCompatibilityChecked;
    private final RecoveryClaimMode recoveryClaimMode;

    public ForStIncrementalRestoreOperation(String operatorIdentifier, KeyGroupRange keyGroupRange, int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, ClassLoader userCodeClassLoader, Map<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, StateSerializerProvider<K> keySerializerProvider, ForStResourceContainer optionsContainer, Path forstBasePath, Path instanceRocksDBPath, DBOptions dbOptions, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, ForStNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, @Nonnull ForStDBTtlCompactFiltersManager ttlCompactFiltersManager, @Nonnegative long writeBatchSize, Long writeBufferManagerCapacity, StateBackend.CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection<IncrementalRemoteKeyedStateHandle> restoreStateHandles, double overlapFractionThreshold, boolean useIngestDbRestoreMode, boolean useDeleteFilesInRange, RecoveryClaimMode recoveryClaimMode) {
        this.forstHandle = new ForStHandle(kvStateInformation, instanceRocksDBPath, dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, ttlCompactFiltersManager, writeBufferManagerCapacity);
        this.operatorIdentifier = operatorIdentifier;
        this.restoredSstFiles = new TreeMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>>();
        this.lastCompletedCheckpointId = -1L;
        this.backendUID = UUID.randomUUID();
        this.customInitializationMetrics = customInitializationMetrics;
        this.restoreStateHandles = restoreStateHandles.stream().collect(Collectors.toList());
        this.cancelStreamRegistry = cancelStreamRegistry;
        this.keyGroupRange = keyGroupRange;
        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
        this.optionsContainer = optionsContainer;
        this.forstBasePath = forstBasePath;
        this.keySerializerProvider = keySerializerProvider;
        this.userCodeClassLoader = userCodeClassLoader;
        this.writeBatchSize = writeBatchSize;
        this.overlapFractionThreshold = overlapFractionThreshold;
        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
        this.useDeleteFilesInRange = useDeleteFilesInRange;
        this.recoveryClaimMode = recoveryClaimMode;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ForStRestoreResult restore() throws Exception {
        if (this.restoreStateHandles == null || this.restoreStateHandles.isEmpty()) {
            return null;
        }
        logger.info("Starting RocksDB incremental recovery in operator {} target key-group range {}. Use IngestDB={}, State Handles={}", new Object[]{this.operatorIdentifier, this.keyGroupRange.prettyPrintInterval(), this.useIngestDbRestoreMode, this.restoreStateHandles});
        ArrayList<StateHandleTransferSpec> toTransferSpecs = new ArrayList<StateHandleTransferSpec>();
        int bestStateHandleForInit = -1;
        if (!this.useIngestDbRestoreMode || this.restoreStateHandles.size() == 1) {
            bestStateHandleForInit = ForStIncrementalCheckpointUtils.findTheBestStateHandleForInitial(this.restoreStateHandles, this.keyGroupRange, this.overlapFractionThreshold);
            toTransferSpecs.add(new StateHandleTransferSpec(this.restoreStateHandles.get(bestStateHandleForInit), new Path(this.forstBasePath, "db")));
        }
        for (int i = 0; i < this.restoreStateHandles.size(); ++i) {
            if (i == bestStateHandleForInit) continue;
            IncrementalRemoteKeyedStateHandle handle = this.restoreStateHandles.get(i);
            toTransferSpecs.add(new StateHandleTransferSpec(handle, new Path(this.forstBasePath, UUID.randomUUID().toString())));
        }
        try {
            this.runAndReportDuration(() -> this.transferAllStateHandles(toTransferSpecs), "DownloadStateDurationMs");
            this.runAndReportDuration(() -> this.innerRestore(toTransferSpecs), "RestoreStateDurationMs");
            ForStRestoreResult forStRestoreResult = new ForStRestoreResult(this.forstHandle.getDb(), this.forstHandle.getDefaultColumnFamilyHandle(), this.forstHandle.getNativeMetricMonitor(), this.lastCompletedCheckpointId, this.backendUID, this.restoredSstFiles);
            return forStRestoreResult;
        }
        finally {
            if (!this.useIngestDbRestoreMode || this.restoreStateHandles.size() == 1) {
                toTransferSpecs.remove(0);
            }
            toTransferSpecs.stream().map(StateHandleTransferSpec::getTransferDestination).forEach(dir -> {
                try {
                    FileSystem fs = this.getFileSystem((Path)dir);
                    fs.delete(dir, true);
                }
                catch (IOException ignored) {
                    logger.warn("Failed to delete transfer destination {}", dir);
                }
            });
        }
    }

    private void transferAllStateHandles(List<StateHandleTransferSpec> specs) throws Exception {
        try (ForStStateDataTransfer transfer = new ForStStateDataTransfer(4, this.optionsContainer.getFileSystem());){
            transfer.transferAllStateDataToDirectory(this.optionsContainer.getPathContainer(), specs, this.cancelStreamRegistry, this.recoveryClaimMode);
        }
    }

    private void innerRestore(List<StateHandleTransferSpec> stateHandles) throws Exception {
        if (stateHandles.size() == 1) {
            this.initBaseDBFromSingleStateHandle(stateHandles.get(0));
        } else {
            this.restoreFromMultipleStateHandles(stateHandles);
        }
    }

    private void initBaseDBFromSingleStateHandle(StateHandleTransferSpec stateHandleSpec) throws Exception {
        IncrementalRemoteKeyedStateHandle stateHandle = stateHandleSpec.getStateHandle();
        logger.info("Starting opening base ForSt instance in operator {} with target key-group range {} from state handle {}.", new Object[]{this.operatorIdentifier, this.keyGroupRange.prettyPrintInterval(), stateHandleSpec});
        this.restoreBaseDBFromMainHandle(stateHandleSpec);
        KeyGroupRange stateHandleKeyGroupRange = stateHandle.getKeyGroupRange();
        if (Objects.equals(stateHandleKeyGroupRange, this.keyGroupRange)) {
            this.restorePreviousIncrementalFilesStatus((IncrementalKeyedStateHandle)stateHandle);
        } else {
            try {
                ForStIncrementalCheckpointUtils.clipDBWithKeyGroupRange(this.forstHandle.getDb(), this.forstHandle.getColumnFamilyHandles(), this.keyGroupRange, stateHandleKeyGroupRange, this.keyGroupPrefixBytes, this.useDeleteFilesInRange);
            }
            catch (RocksDBException e) {
                String errMsg = "Failed to clip DB after initialization.";
                logger.error(errMsg, (Throwable)e);
                throw new BackendBuildingException(errMsg, (Throwable)e);
            }
        }
        logger.info("Finished opening base ForSt instance in operator {} with target key-group range {}.", (Object)this.operatorIdentifier, (Object)this.keyGroupRange.prettyPrintInterval());
    }

    private void restoreFromMultipleStateHandles(List<StateHandleTransferSpec> stateHandles) throws Exception {
        logger.info("Starting to restore backend with range {} in operator {} from multiple state handles {} with useIngestDbRestoreMode = {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, stateHandles, this.useIngestDbRestoreMode});
        byte[] startKeyGroupPrefixBytes = new byte[this.keyGroupPrefixBytes];
        CompositeKeySerializationUtils.serializeKeyGroup((int)this.keyGroupRange.getStartKeyGroup(), (byte[])startKeyGroupPrefixBytes);
        byte[] stopKeyGroupPrefixBytes = new byte[this.keyGroupPrefixBytes];
        CompositeKeySerializationUtils.serializeKeyGroup((int)(this.keyGroupRange.getEndKeyGroup() + 1), (byte[])stopKeyGroupPrefixBytes);
        if (this.useIngestDbRestoreMode) {
            this.mergeStateHandlesWithClipAndIngest(stateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
        } else {
            StateHandleTransferSpec baseSpec = stateHandles.remove(0);
            this.mergeStateHandlesWithCopyFromTemporaryInstance(baseSpec, stateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
        }
        logger.info("Completed restoring backend with range {} in operator {} from multiple state handles with useIngestDbRestoreMode = {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, this.useIngestDbRestoreMode});
    }

    private void restoreBaseDBFromMainHandle(StateHandleTransferSpec handleSpec) throws Exception {
        KeyedBackendSerializationProxy<K> serializationProxy = this.readMetaData(handleSpec.getStateHandle().getMetaDataStateHandle());
        List stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots();
        this.forstHandle.openDB(this.createColumnFamilyDescriptors(stateMetaInfoSnapshots, true), stateMetaInfoSnapshots, (ICloseableRegistry)this.cancelStreamRegistry);
    }

    private void restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle incrementalHandle) {
        this.backendUID = incrementalHandle.getBackendIdentifier();
        this.restoredSstFiles.put(incrementalHandle.getCheckpointId(), incrementalHandle.getSharedStateHandles());
        this.lastCompletedCheckpointId = incrementalHandle.getCheckpointId();
        logger.info("Restored previous incremental files status in backend with range {} in operator {}: backend uuid {}, last checkpoint id {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, this.backendUID, this.lastCompletedCheckpointId});
    }

    private List<ColumnFamilyDescriptor> createColumnFamilyDescriptors(List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter) {
        ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(stateMetaInfoSnapshots.size());
        for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
            RegisteredStateMetaInfoBase metaInfoBase = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot((StateMetaInfoSnapshot)stateMetaInfoSnapshot);
            ColumnFamilyDescriptor columnFamilyDescriptor = ForStOperationUtils.createColumnFamilyDescriptor(metaInfoBase, this.forstHandle.getColumnFamilyOptionsFactory(), registerTtlCompactFilter ? this.forstHandle.getTtlCompactFiltersManager() : null, this.forstHandle.getWriteBufferManagerCapacity());
            columnFamilyDescriptors.add(columnFamilyDescriptor);
        }
        return columnFamilyDescriptors;
    }

    private void runAndReportDuration(RunnableWithException runnable, String metricName) throws Exception {
        SystemClock clock = SystemClock.getInstance();
        long startTime = clock.relativeTimeMillis();
        runnable.run();
        this.customInitializationMetrics.addMetric(metricName, clock.relativeTimeMillis() - startTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle metaStateHandle) throws Exception {
        FSDataInputStream inputStream = null;
        try {
            inputStream = metaStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable((AutoCloseable)inputStream);
            DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)inputStream);
            KeyedBackendSerializationProxy<K> keyedBackendSerializationProxy = this.readMetaData((DataInputView)in);
            return keyedBackendSerializationProxy;
        }
        finally {
            if (this.cancelStreamRegistry.unregisterCloseable((AutoCloseable)inputStream)) {
                inputStream.close();
            }
        }
    }

    KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView) throws IOException, StateMigrationException {
        KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
        serializationProxy.read(dataInputView);
        if (!this.isKeySerializerCompatibilityChecked) {
            TypeSerializer currentSerializer = this.keySerializerProvider.currentSchemaSerializer();
            TypeSerializerSchemaCompatibility keySerializerSchemaCompat = this.keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(serializationProxy.getKeySerializerSnapshot());
            if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
                throw new StateMigrationException("The new key serializer (" + currentSerializer + ") must be compatible with the previous key serializer (" + this.keySerializerProvider.previousSchemaSerializer() + ").");
            }
            this.isKeySerializerCompatibilityChecked = true;
        }
        return serializationProxy;
    }

    @Override
    public void close() throws Exception {
        this.forstHandle.close();
    }

    private void copyTempDbIntoBaseDb(RestoredDBInstance tmpRestoreDBInfo, ForStDBWriteBatchWrapper writeBatchWrapper, byte[] startKeyGroupPrefixBytes, byte[] stopKeyGroupPrefixBytes) throws Exception {
        logger.debug("Starting copy of state handle {} for backend with range {} in operator {} to base DB using temporary instance.", new Object[]{tmpRestoreDBInfo.srcStateHandle, this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier});
        List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
        List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
        for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) {
            ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx);
            ColumnFamilyHandle targetColumnFamilyHandle = this.forstHandle.getOrRegisterStateColumnFamilyHandle(null, (StateMetaInfoSnapshot)tmpRestoreDBInfo.stateMetaInfoSnapshots.get((int)descIdx), (ICloseableRegistry)this.cancelStreamRegistry).columnFamilyHandle;
            try (ForStIteratorWrapper iterator = ForStOperationUtils.getForStIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions);){
                iterator.seek(startKeyGroupPrefixBytes);
                while (iterator.isValid() && ForStIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
                    writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
                    iterator.next();
                }
                continue;
            }
        }
        logger.debug("Finished copy of state handle {} for backend with range {} in operator {} using temporary instance.", new Object[]{tmpRestoreDBInfo.srcStateHandle, this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier});
    }

    private void cleanUpPathQuietly(@Nonnull Path path) {
        try {
            this.getFileSystem(this.forstBasePath).delete(path, true);
        }
        catch (IOException ex) {
            logger.warn("Failed to clean up path " + path, (Throwable)ex);
        }
    }

    private void copyToBaseDBUsingTempDBs(List<StateHandleTransferSpec> toImportSpecs, byte[] startKeyGroupPrefixBytes, byte[] stopKeyGroupPrefixBytes) throws Exception {
        if (toImportSpecs.isEmpty()) {
            return;
        }
        logger.info("Starting to copy state handles for backend with range {} in operator {} using temporary instances.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
        try (ForStDBWriteBatchWrapper writeBatchWrapper = new ForStDBWriteBatchWrapper(this.forstHandle.getDb(), this.writeBatchSize);
             Closeable ignored = this.cancelStreamRegistry.registerCloseableTemporarily(writeBatchWrapper.getCancelCloseable());){
            for (StateHandleTransferSpec handleToCopy : toImportSpecs) {
                RestoredDBInstance restoredDBInstance = this.restoreTempDBInstance(handleToCopy);
                try {
                    this.copyTempDbIntoBaseDb(restoredDBInstance, writeBatchWrapper, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
                }
                finally {
                    if (restoredDBInstance == null) continue;
                    restoredDBInstance.close();
                }
            }
        }
        logger.info("Competed copying state handles for backend with range {} in operator {} using temporary instances.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
    }

    private void mergeStateHandlesWithCopyFromTemporaryInstance(StateHandleTransferSpec baseSpec, List<StateHandleTransferSpec> keyedStateHandles, byte[] startKeyGroupPrefixBytes, byte[] stopKeyGroupPrefixBytes) throws Exception {
        logger.info("Starting to merge state for backend with range {} in operator {} from multiple state handles using temporary instances.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
        this.initBaseDBFromSingleStateHandle(baseSpec);
        this.copyToBaseDBUsingTempDBs(keyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
        logger.info("Completed merging state for backend with range {} in operator {} from multiple state handles using temporary instances.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void mergeStateHandlesWithClipAndIngest(List<StateHandleTransferSpec> keyedStateHandles, byte[] startKeyGroupPrefixBytes, byte[] stopKeyGroupPrefixBytes) throws Exception {
        Path exportCfBasePath = new Path(this.forstBasePath, "export-cfs");
        this.getFileSystem(this.forstBasePath).mkdirs(exportCfBasePath);
        HashMap<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> exportedColumnFamilyMetaData = new HashMap<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>>(keyedStateHandles.size());
        ArrayList<StateHandleTransferSpec> notImportableHandles = new ArrayList<StateHandleTransferSpec>(keyedStateHandles.size());
        try {
            KeyGroupRange exportedSstKeyGroupsRange = this.exportColumnFamiliesWithSstDataInKeyGroupsRange(exportCfBasePath, keyedStateHandles, exportedColumnFamilyMetaData, notImportableHandles);
            if (exportedColumnFamilyMetaData.isEmpty()) {
                int bestStateHandleForInit = ForStIncrementalCheckpointUtils.findTheBestStateHandleForInitial(this.restoreStateHandles, this.keyGroupRange, this.overlapFractionThreshold);
                notImportableHandles.remove(bestStateHandleForInit);
                StateHandleTransferSpec baseSpec = new StateHandleTransferSpec(this.restoreStateHandles.get(bestStateHandleForInit), new Path(this.forstBasePath, "db"));
                this.transferAllStateHandles(Collections.singletonList(baseSpec));
                this.mergeStateHandlesWithCopyFromTemporaryInstance(baseSpec, notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
            } else {
                this.initBaseDBFromColumnFamilyImports(exportedColumnFamilyMetaData, exportedSstKeyGroupsRange);
                this.copyToBaseDBUsingTempDBs(notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
            }
        }
        finally {
            exportedColumnFamilyMetaData.values().forEach(IOUtils::closeAllQuietly);
            this.cleanUpPathQuietly(exportCfBasePath);
        }
    }

    public void exportColumnFamilies(RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles, List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases, Path exportBasePath, Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> resultOutput) throws RocksDBException, IOException {
        Preconditions.checkArgument((columnFamilyHandles.size() == registeredStateMetaInfoBases.size() ? 1 : 0) != 0, (Object)"Lists are aligned by index and must be of the same size!");
        try (Checkpoint checkpoint = Checkpoint.create((RocksDB)db);){
            for (int i = 0; i < columnFamilyHandles.size(); ++i) {
                RegisteredStateMetaInfoBase.Key stateMetaInfoAsKey = registeredStateMetaInfoBases.get(i).asMapKey();
                String uuid = UUID.randomUUID().toString();
                String subPathStr = this.optionsContainer.getPathContainer().getRemoteBasePath() != null ? exportBasePath.getName() + "/" + uuid : exportBasePath.toString() + "/" + uuid;
                ExportImportFilesMetaData exportedColumnFamilyMetaData = checkpoint.exportColumnFamily(columnFamilyHandles.get(i), subPathStr);
                FileStatus[] exportedSstFiles = this.getFileSystem(exportBasePath).listStatus(new Path(exportBasePath, uuid));
                if (exportedSstFiles != null) {
                    int sstFileCount = 0;
                    for (FileStatus exportedSstFile : exportedSstFiles) {
                        if (!exportedSstFile.getPath().getName().endsWith(".sst")) continue;
                        ++sstFileCount;
                    }
                    if (sstFileCount <= 0) continue;
                    resultOutput.computeIfAbsent(stateMetaInfoAsKey, key -> new ArrayList()).add(exportedColumnFamilyMetaData);
                    continue;
                }
                IOUtils.closeQuietly((AutoCloseable)exportedColumnFamilyMetaData);
            }
        }
    }

    private void initBaseDBFromColumnFamilyImports(Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> exportedColumnFamilyMetaData, KeyGroupRange exportKeyGroupRange) throws Exception {
        logger.info("Starting to import exported state handles for backend with range {} in operator {} using Clip/Ingest DB with exported range {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, exportKeyGroupRange.prettyPrintInterval()});
        this.forstHandle.openDB();
        for (Map.Entry<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> entry : exportedColumnFamilyMetaData.entrySet()) {
            this.forstHandle.registerStateColumnFamilyHandleWithImport(entry.getKey(), entry.getValue(), (ICloseableRegistry)this.cancelStreamRegistry);
        }
        ForStIncrementalCheckpointUtils.clipDBWithKeyGroupRange(this.forstHandle.getDb(), this.forstHandle.getColumnFamilyHandles(), this.keyGroupRange, exportKeyGroupRange, this.keyGroupPrefixBytes, this.useDeleteFilesInRange);
        logger.info("Completed importing exported state handles for backend with range {} in operator {} using Clip/Ingest DB.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
    }

    private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange(Path exportCfBasePath, List<StateHandleTransferSpec> stateHandleSpecs, Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> exportedColumnFamiliesOut, List<StateHandleTransferSpec> skipped) throws Exception {
        logger.info("Starting restore export for backend with range {} in operator {}.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
        int minExportKeyGroup = Integer.MAX_VALUE;
        int maxExportKeyGroup = Integer.MIN_VALUE;
        int index = 0;
        for (StateHandleTransferSpec stateHandleSpec : stateHandleSpecs) {
            IncrementalRemoteKeyedStateHandle stateHandle = stateHandleSpec.getStateHandle();
            String logLineSuffix = " for state handle at index " + index + " with proclaimed key-group range " + stateHandle.getKeyGroupRange().prettyPrintInterval() + " for backend with range " + this.keyGroupRange.prettyPrintInterval() + " in operator " + this.operatorIdentifier + ".";
            logger.debug("Opening temporary database" + logLineSuffix);
            try (RestoredDBInstance tmpRestoreDBInfo = this.restoreTempDBInstance(stateHandleSpec);){
                List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
                logger.debug("Checking actual keys of sst files" + logLineSuffix);
                ForStIncrementalCheckpointUtils.RangeCheckResult rangeCheckResult = ForStIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange(tmpRestoreDBInfo.db, this.keyGroupPrefixBytes, stateHandle.getKeyGroupRange());
                logger.info("{}" + logLineSuffix, (Object)rangeCheckResult);
                if (rangeCheckResult.allInRange()) {
                    logger.debug("Start exporting" + logLineSuffix);
                    List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases = tmpRestoreDBInfo.stateMetaInfoSnapshots.stream().map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot).collect(Collectors.toList());
                    this.exportColumnFamilies(tmpRestoreDBInfo.db, tmpColumnFamilyHandles, registeredStateMetaInfoBases, exportCfBasePath, exportedColumnFamiliesOut);
                    minExportKeyGroup = Math.min(minExportKeyGroup, stateHandle.getKeyGroupRange().getStartKeyGroup());
                    maxExportKeyGroup = Math.max(maxExportKeyGroup, stateHandle.getKeyGroupRange().getEndKeyGroup());
                    logger.debug("Done exporting" + logLineSuffix);
                } else {
                    skipped.add(stateHandleSpec);
                    logger.debug("Skipped export" + logLineSuffix);
                }
            }
            ++index;
        }
        KeyGroupRange exportedKeyGroupsRange = minExportKeyGroup <= maxExportKeyGroup ? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup) : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        logger.info("Completed restore export for backend with range {} in operator {}. {} exported handles with overall exported range {}. {} Skipped handles: {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, stateHandleSpecs.size() - skipped.size(), exportedKeyGroupsRange.prettyPrintInterval(), skipped.size(), skipped});
        return exportedKeyGroupsRange;
    }

    private RestoredDBInstance restoreTempDBInstance(StateHandleTransferSpec stateHandleSpec) throws Exception {
        KeyedBackendSerializationProxy<K> serializationProxy = this.readMetaData(stateHandleSpec.getStateHandle().getMetaDataStateHandle());
        List stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots();
        List<ColumnFamilyDescriptor> columnFamilyDescriptors = this.createColumnFamilyDescriptors(stateMetaInfoSnapshots, false);
        ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<ColumnFamilyHandle>(stateMetaInfoSnapshots.size() + 1);
        Object dbName = this.optionsContainer.getPathContainer().getRemoteBasePath() != null ? "/" + stateHandleSpec.getTransferDestination().getName() : stateHandleSpec.getTransferDestination().toString();
        DBOptions dbOptions = new DBOptions(this.forstHandle.getDbOptions());
        dbOptions.setDbLogDir("");
        RocksDB restoreDb = ForStOperationUtils.openDB((String)dbName, columnFamilyDescriptors, columnFamilyHandles, ForStOperationUtils.createColumnFamilyOptions(this.forstHandle.getColumnFamilyOptionsFactory(), "default"), dbOptions);
        return new RestoredDBInstance(restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots, stateHandleSpec.getStateHandle(), stateHandleSpec.getTransferDestination().toString());
    }

    private FileSystem getFileSystem(Path path) throws IOException {
        if (this.optionsContainer.getFileSystem() != null) {
            return this.optionsContainer.getFileSystem();
        }
        return path.getFileSystem();
    }

    public static class RestoredDBInstance
    implements AutoCloseable {
        @Nonnull
        final RocksDB db;
        @Nonnull
        final ColumnFamilyHandle defaultColumnFamilyHandle;
        @Nonnull
        final List<ColumnFamilyHandle> columnFamilyHandles;
        @Nonnull
        final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
        @Nonnull
        final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
        final ReadOptions readOptions;
        final IncrementalRemoteKeyedStateHandle srcStateHandle;
        final String instancePath;

        RestoredDBInstance(@Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, @Nonnull IncrementalRemoteKeyedStateHandle srcStateHandle, @Nonnull String instancePath) {
            this.db = db;
            this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0);
            this.columnFamilyHandles = columnFamilyHandles;
            this.columnFamilyDescriptors = columnFamilyDescriptors;
            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
            this.readOptions = new ReadOptions();
            this.srcStateHandle = srcStateHandle;
            this.instancePath = instancePath;
        }

        @Override
        public void close() {
            ArrayList<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<ColumnFamilyOptions>(this.columnFamilyDescriptors.size() + 1);
            this.columnFamilyDescriptors.forEach(cfd -> columnFamilyOptions.add(cfd.getOptions()));
            ForStOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, this.defaultColumnFamilyHandle);
            IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamilyHandle);
            IOUtils.closeAllQuietly(this.columnFamilyHandles);
            IOUtils.closeQuietly((AutoCloseable)this.db);
            IOUtils.closeAllQuietly(columnFamilyOptions);
            IOUtils.closeQuietly((AutoCloseable)this.readOptions);
        }
    }
}

