/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.rocksdb.snapshot;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend;
import org.apache.flink.state.rocksdb.RocksDBStateUploader;
import org.apache.flink.state.rocksdb.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksIncrementalSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksIncrementalSnapshotStrategy.class);
    private static final String DESCRIPTION = "Asynchronous incremental RocksDB snapshot";
    @Nonnull
    private final SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> uploadedSstFiles;
    private long lastCompletedCheckpointId;
    private final RocksDBStateUploader stateUploader;

    public RocksIncrementalSnapshotStrategy(@Nonnull RocksDB db, @Nonnull ResourceGuard rocksDBResourceGuard, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull File instanceBasePath, @Nonnull UUID backendUID, @Nonnull SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> uploadedStateHandles, @Nonnull RocksDBStateUploader rocksDBStateUploader, long lastCompletedCheckpointId) {
        super(DESCRIPTION, db, rocksDBResourceGuard, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, localRecoveryConfig, instanceBasePath, backendUID);
        this.uploadedSstFiles = new TreeMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>>(uploadedStateHandles);
        this.stateUploader = rocksDBStateUploader;
        this.lastCompletedCheckpointId = lastCompletedCheckpointId;
    }

    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources snapshotResources, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        RocksDBSnapshotStrategyBase.PreviousSnapshot previousSnapshot;
        if (snapshotResources.stateMetaInfoSnapshots.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", (Object)timestamp);
            }
            return registry -> SnapshotResult.empty();
        }
        SnapshotType.SharingFilesStrategy sharingFilesStrategy = checkpointOptions.getCheckpointType().getSharingFilesStrategy();
        switch (sharingFilesStrategy) {
            case FORWARD_BACKWARD: {
                previousSnapshot = snapshotResources.previousSnapshot;
                break;
            }
            case FORWARD: 
            case NO_SHARING: {
                previousSnapshot = EMPTY_PREVIOUS_SNAPSHOT;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported sharing files strategy: " + String.valueOf(sharingFilesStrategy));
            }
        }
        return new RocksDBIncrementalSnapshotOperation(checkpointId, checkpointStreamFactory, snapshotResources.snapshotDirectory, previousSnapshot, sharingFilesStrategy, snapshotResources.stateMetaInfoSnapshots);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long completedCheckpointId) {
        SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap = this.uploadedSstFiles;
        synchronized (sortedMap) {
            if (completedCheckpointId > this.lastCompletedCheckpointId && this.uploadedSstFiles.containsKey(completedCheckpointId)) {
                this.uploadedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
                this.lastCompletedCheckpointId = completedCheckpointId;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointAborted(long abortedCheckpointId) {
        SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap = this.uploadedSstFiles;
        synchronized (sortedMap) {
            this.uploadedSstFiles.keySet().remove(abortedCheckpointId);
        }
    }

    @Override
    public void close() throws IOException {
        this.stateUploader.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected RocksDBSnapshotStrategyBase.PreviousSnapshot snapshotMetaData(long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
        Collection confirmedSstFiles;
        long lastCompletedCheckpoint;
        SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap = this.uploadedSstFiles;
        synchronized (sortedMap) {
            lastCompletedCheckpoint = this.lastCompletedCheckpointId;
            confirmedSstFiles = (Collection)this.uploadedSstFiles.get(lastCompletedCheckpoint);
            LOG.trace("Use confirmed SST files for checkpoint {}: {}", (Object)checkpointId, (Object)confirmedSstFiles);
        }
        LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} assuming the following (shared) confirmed files as base: {}.", new Object[]{checkpointId, lastCompletedCheckpoint, confirmedSstFiles});
        for (Map.Entry entry : this.kvStateInformation.entrySet()) {
            stateMetaInfoSnapshots.add(((RocksDBKeyedStateBackend.RocksDbKvStateInfo)entry.getValue()).metaInfo.snapshot());
        }
        return new RocksDBSnapshotStrategyBase.PreviousSnapshot(confirmedSstFiles);
    }

    private final class RocksDBIncrementalSnapshotOperation
    extends RocksDBSnapshotStrategyBase.RocksDBSnapshotOperation {
        @Nonnull
        private final RocksDBSnapshotStrategyBase.PreviousSnapshot previousSnapshot;
        @Nonnull
        private final SnapshotType.SharingFilesStrategy sharingFilesStrategy;

        private RocksDBIncrementalSnapshotOperation(@Nonnull long checkpointId, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull SnapshotDirectory localBackupDirectory, @Nonnull RocksDBSnapshotStrategyBase.PreviousSnapshot previousSnapshot, @Nonnull SnapshotType.SharingFilesStrategy sharingFilesStrategy, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
            super(RocksIncrementalSnapshotStrategy.this, checkpointId, checkpointStreamFactory, localBackupDirectory, stateMetaInfoSnapshots);
            this.previousSnapshot = previousSnapshot;
            this.sharingFilesStrategy = sharingFilesStrategy;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
            boolean completed = false;
            SnapshotResult<StreamStateHandle> metaStateHandle = null;
            ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> sstFiles = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>();
            ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> miscFiles = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>();
            ArrayList<StreamStateHandle> reusedHandle = new ArrayList<StreamStateHandle>();
            try {
                metaStateHandle = RocksIncrementalSnapshotStrategy.this.materializeMetaData(snapshotCloseableRegistry, this.tmpResourcesRegistry, this.stateMetaInfoSnapshots, this.checkpointId, this.checkpointStreamFactory);
                Preconditions.checkNotNull(metaStateHandle, (String)"Metadata was not properly created.");
                Preconditions.checkNotNull((Object)((StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot()), (String)"Metadata for job manager was not properly created.");
                long checkpointedSize = metaStateHandle.getStateSize();
                IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot(), checkpointedSize += this.uploadSnapshotFiles(sstFiles, miscFiles, snapshotCloseableRegistry, this.tmpResourcesRegistry, reusedHandle));
                Optional<KeyedStateHandle> localSnapshot = this.getLocalSnapshot((StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles);
                SnapshotResult snapshotResult = localSnapshot.map(keyedStateHandle -> SnapshotResult.withLocalState((StateObject)jmIncrementalKeyedStateHandle, (StateObject)keyedStateHandle)).orElseGet(() -> SnapshotResult.of((StateObject)jmIncrementalKeyedStateHandle));
                completed = true;
                SnapshotResult snapshotResult2 = snapshotResult;
                return snapshotResult2;
            }
            finally {
                if (!completed) {
                    RocksIncrementalSnapshotStrategy.this.cleanupIncompleteSnapshot(this.tmpResourcesRegistry, this.localBackupDirectory);
                } else {
                    this.checkpointStreamFactory.reusePreviousStateHandle(reusedHandle);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private long uploadSnapshotFiles(@Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> sstFiles, @Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> miscFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry, @Nonnull List<StreamStateHandle> reusedHandle) throws Exception {
            Preconditions.checkState((boolean)this.localBackupDirectory.exists());
            Path[] files = this.localBackupDirectory.listDirectory();
            long uploadedSize = 0L;
            if (files != null) {
                ArrayList<Path> sstFilePaths = new ArrayList<Path>(files.length);
                ArrayList<Path> miscFilePaths = new ArrayList<Path>(files.length);
                this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
                CheckpointedStateScope stateScope = this.sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED;
                sstFiles.stream().map(IncrementalKeyedStateHandle.HandleAndLocalPath::getHandle).forEach(reusedHandle::add);
                List<IncrementalKeyedStateHandle.HandleAndLocalPath> sstFilesUploadResult = RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, stateScope, snapshotCloseableRegistry, tmpResourcesRegistry);
                uploadedSize += sstFilesUploadResult.stream().mapToLong(IncrementalKeyedStateHandle.HandleAndLocalPath::getStateSize).sum();
                sstFiles.addAll(sstFilesUploadResult);
                List<IncrementalKeyedStateHandle.HandleAndLocalPath> miscFilesUploadResult = RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, stateScope, snapshotCloseableRegistry, tmpResourcesRegistry);
                uploadedSize += miscFilesUploadResult.stream().mapToLong(IncrementalKeyedStateHandle.HandleAndLocalPath::getStateSize).sum();
                miscFiles.addAll(miscFilesUploadResult);
                SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap = RocksIncrementalSnapshotStrategy.this.uploadedSstFiles;
                synchronized (sortedMap) {
                    switch (this.sharingFilesStrategy) {
                        case FORWARD_BACKWARD: 
                        case FORWARD: {
                            RocksIncrementalSnapshotStrategy.this.uploadedSstFiles.put(this.checkpointId, Collections.unmodifiableList(sstFiles));
                            break;
                        }
                        case NO_SHARING: {
                            break;
                        }
                        default: {
                            throw new IllegalArgumentException("Unsupported sharing files strategy: " + String.valueOf(this.sharingFilesStrategy));
                        }
                    }
                }
            }
            return uploadedSize;
        }

        private void createUploadFilePaths(Path[] files, List<IncrementalKeyedStateHandle.HandleAndLocalPath> sstFiles, List<Path> sstFilePaths, List<Path> miscFilePaths) {
            for (Path filePath : files) {
                String fileName = filePath.getFileName().toString();
                if (fileName.endsWith(".sst")) {
                    Optional<StreamStateHandle> uploaded = this.previousSnapshot.getUploaded(fileName);
                    if (uploaded.isPresent() && this.checkpointStreamFactory.couldReuseStateHandle(uploaded.get())) {
                        sstFiles.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)uploaded.get(), (String)fileName));
                        continue;
                    }
                    sstFilePaths.add(filePath);
                    continue;
                }
                miscFilePaths.add(filePath);
            }
        }
    }
}

