/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.util.ResourceGuard;
import org.forstdb.RocksDB;
import org.forstdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForStNativeFullSnapshotStrategy<K>
extends ForStSnapshotStrategyBase<K, ForStSnapshotStrategyBase.ForStNativeSnapshotResources> {
    private static final Logger LOG = LoggerFactory.getLogger(ForStNativeFullSnapshotStrategy.class);
    private static final String DESCRIPTION = "Asynchronous full ForStDB snapshot";
    protected final ForStStateDataTransfer stateTransfer;

    public ForStNativeFullSnapshotStrategy(@Nonnull RocksDB db, @Nonnull ResourceGuard resourceGuard, @Nonnull ForStResourceContainer resourceContainer, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, int keyGroupPrefixBytes, @Nonnull UUID backendUID, @Nonnull ForStStateDataTransfer stateTransfer) {
        this(DESCRIPTION, db, resourceGuard, resourceContainer, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, backendUID, stateTransfer);
    }

    public ForStNativeFullSnapshotStrategy(@Nonnull String description, @Nonnull RocksDB db, @Nonnull ResourceGuard resourceGuard, @Nonnull ForStResourceContainer resourceContainer, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, int keyGroupPrefixBytes, @Nonnull UUID backendUID, @Nonnull ForStStateDataTransfer stateTransfer) {
        super(description, db, resourceGuard, resourceContainer, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, backendUID);
        this.stateTransfer = stateTransfer;
    }

    @Override
    protected ForStSnapshotStrategyBase.PreviousSnapshot snapshotMetaData(long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
        for (Map.Entry stateMetaInfoEntry : this.kvStateInformation.entrySet()) {
            stateMetaInfoSnapshots.add(((ForStOperationUtils.ForStKvStateInfo)stateMetaInfoEntry.getValue()).metaInfo.snapshot());
        }
        return EMPTY_PREVIOUS_SNAPSHOT;
    }

    @Override
    public void close() {
        this.stateTransfer.close();
    }

    public void notifyCheckpointAborted(long abortedCheckpointId) {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }

    public ForStSnapshotStrategyBase.ForStNativeSnapshotResources syncPrepareResources(long checkpointId) throws Exception {
        ArrayList<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<StateMetaInfoSnapshot>(this.kvStateInformation.size());
        ForStSnapshotStrategyBase.PreviousSnapshot previousSnapshot = this.snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
        ResourceGuard.Lease lease = this.resourceGuard.acquireResource();
        this.db.disableFileDeletions();
        try {
            RocksDB.LiveFiles liveFiles = this.db.getLiveFiles(true);
            List<Path> liveFilesPath = liveFiles.files.stream().map(file -> new Path(this.resourceContainer.getDbPath(), file)).filter(file -> !file.getName().equals("CURRENT")).collect(Collectors.toList());
            Path manifestFile = liveFilesPath.stream().filter(file -> file.getName().startsWith("MANIFEST-")).findAny().get();
            this.logLiveFiles(checkpointId, liveFiles.manifestFileSize, liveFilesPath);
            return new ForStSnapshotStrategyBase.ForStNativeSnapshotResources(stateMetaInfoSnapshots, liveFiles.manifestFileSize, liveFilesPath, manifestFile, previousSnapshot, () -> {
                try {
                    this.db.enableFileDeletions(false);
                    lease.close();
                    LOG.info("Release one file deletion lock with ForStNativeSnapshotResources, backendUID:{}, checkpointId:{}.", (Object)this.backendUID, (Object)checkpointId);
                }
                catch (RocksDBException e) {
                    LOG.error("Enable file deletion failed, backendUID:{}, checkpointId:{}.", new Object[]{this.backendUID, checkpointId, e});
                }
            });
        }
        catch (Exception e) {
            LOG.error("Exception thrown when prepare snapshot resources, enable file deletion and rethrow the exception, backendUID:{}, checkpointId:{}", (Object)this.backendUID, (Object)checkpointId);
            this.db.enableFileDeletions(false);
            lease.close();
            throw e;
        }
    }

    private void logLiveFiles(long checkpointId, long manifestFileSize, List<Path> liveFilesPath) {
        if (LOG.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder("    manifestFileSize:").append(manifestFileSize).append("\n");
            liveFilesPath.forEach(e -> sb.append("    file : ").append(e).append("\n"));
            LOG.debug("Backend:{} live files for checkpoint:{} : \n{}", new Object[]{this.backendUID, checkpointId, sb});
        }
    }

    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(ForStSnapshotStrategyBase.ForStNativeSnapshotResources syncPartResource, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        if (syncPartResource.stateMetaInfoSnapshots.isEmpty()) {
            return registry -> SnapshotResult.empty();
        }
        return new ForStNativeFullSnapshotOperation(checkpointOptions.getCheckpointType().getSharingFilesStrategy(), checkpointId, streamFactory, syncPartResource);
    }

    private final class ForStNativeFullSnapshotOperation
    extends ForStSnapshotStrategyBase.ForStSnapshotOperation {
        @Nonnull
        private final ForStSnapshotStrategyBase.ForStNativeSnapshotResources snapshotResources;
        @Nonnull
        private final SnapshotType.SharingFilesStrategy sharingFilesStrategy;

        private ForStNativeFullSnapshotOperation(SnapshotType.SharingFilesStrategy sharingFilesStrategy, @Nonnull long checkpointId, @Nonnull CheckpointStreamFactory checkpointStreamFactory, ForStSnapshotStrategyBase.ForStNativeSnapshotResources snapshotResources) {
            super(checkpointId, checkpointStreamFactory);
            this.snapshotResources = snapshotResources;
            this.sharingFilesStrategy = sharingFilesStrategy;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
            boolean completed = false;
            try {
                SnapshotResult<StreamStateHandle> metaStateHandle = ForStNativeFullSnapshotStrategy.this.materializeMetaData(snapshotCloseableRegistry, this.tmpResourcesRegistry, this.snapshotResources.stateMetaInfoSnapshots, this.checkpointId, this.checkpointStreamFactory);
                ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> privateFiles = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>();
                long checkpointedSize = metaStateHandle.getStateSize();
                IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(ForStNativeFullSnapshotStrategy.this.backendUID, ForStNativeFullSnapshotStrategy.this.keyGroupRange, this.checkpointId, Collections.emptyList(), privateFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot(), checkpointedSize += this.uploadSnapshotFiles(privateFiles, snapshotCloseableRegistry, this.tmpResourcesRegistry));
                completed = true;
                SnapshotResult snapshotResult = SnapshotResult.of((StateObject)jmIncrementalKeyedStateHandle);
                return snapshotResult;
            }
            finally {
                this.snapshotResources.release();
                if (!completed) {
                    try {
                        this.tmpResourcesRegistry.close();
                    }
                    catch (Exception e) {
                        LOG.warn("Could not properly clean tmp resources.", (Throwable)e);
                    }
                }
            }
        }

        private long uploadSnapshotFiles(@Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> privateFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception {
            long uploadedSize = 0L;
            if (this.snapshotResources.liveFiles.size() > 0) {
                List<IncrementalKeyedStateHandle.HandleAndLocalPath> uploadedFiles = ForStNativeFullSnapshotStrategy.this.stateTransfer.transferFilesToCheckpointFs(this.sharingFilesStrategy, this.snapshotResources.liveFiles.stream().filter(file -> !file.getName().endsWith("CURRENT") && !file.getName().startsWith("MANIFEST-")).collect(Collectors.toList()), this.checkpointStreamFactory, CheckpointedStateScope.EXCLUSIVE, snapshotCloseableRegistry, tmpResourcesRegistry, true);
                uploadedSize += uploadedFiles.stream().mapToLong(e -> e.getStateSize()).sum();
                privateFiles.addAll(uploadedFiles);
                IncrementalKeyedStateHandle.HandleAndLocalPath manifestFileTransferResult = ForStNativeFullSnapshotStrategy.this.stateTransfer.transferFileToCheckpointFs(SnapshotType.SharingFilesStrategy.NO_SHARING, this.snapshotResources.manifestFilePath, this.snapshotResources.manifestFileSize, this.checkpointStreamFactory, CheckpointedStateScope.EXCLUSIVE, snapshotCloseableRegistry, tmpResourcesRegistry, true);
                privateFiles.add(manifestFileTransferResult);
                uploadedSize += manifestFileTransferResult.getStateSize();
                IncrementalKeyedStateHandle.HandleAndLocalPath currentFileWriteResult = ForStNativeFullSnapshotStrategy.this.stateTransfer.writeFileToCheckpointFs("CURRENT", this.snapshotResources.getCurrentFileContent(), this.checkpointStreamFactory, CheckpointedStateScope.EXCLUSIVE, snapshotCloseableRegistry, tmpResourcesRegistry);
                privateFiles.add(currentFileWriteResult);
                uploadedSize += currentFileWriteResult.getStateSize();
            }
            return uploadedSize;
        }
    }
}

