/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.snapshot;

import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.RocksDB;

public class RocksNativeFullSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources> {
    private static final String DESCRIPTION = "Asynchronous full RocksDB snapshot";
    private final RocksDBStateUploader stateUploader;

    public RocksNativeFullSnapshotStrategy(@Nonnull RocksDB db, @Nonnull ResourceGuard rocksDBResourceGuard, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull File instanceBasePath, @Nonnull UUID backendUID, @Nonnull RocksDBStateUploader rocksDBStateUploader) {
        super(DESCRIPTION, db, rocksDBResourceGuard, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, localRecoveryConfig, instanceBasePath, backendUID);
        this.stateUploader = rocksDBStateUploader;
    }

    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources snapshotResources, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        if (snapshotResources.stateMetaInfoSnapshots.isEmpty()) {
            return registry -> SnapshotResult.empty();
        }
        return new RocksDBNativeFullSnapshotOperation(checkpointId, checkpointStreamFactory, snapshotResources.snapshotDirectory, snapshotResources.stateMetaInfoSnapshots);
    }

    public void notifyCheckpointComplete(long completedCheckpointId) {
    }

    public void notifyCheckpointAborted(long abortedCheckpointId) {
    }

    @Override
    protected RocksDBSnapshotStrategyBase.PreviousSnapshot snapshotMetaData(long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
        for (Map.Entry stateMetaInfoEntry : this.kvStateInformation.entrySet()) {
            stateMetaInfoSnapshots.add(((RocksDBKeyedStateBackend.RocksDbKvStateInfo)stateMetaInfoEntry.getValue()).metaInfo.snapshot());
        }
        return EMPTY_PREVIOUS_SNAPSHOT;
    }

    @Override
    public void close() {
        this.stateUploader.close();
    }

    private final class RocksDBNativeFullSnapshotOperation
    extends RocksDBSnapshotStrategyBase.RocksDBSnapshotOperation {
        private RocksDBNativeFullSnapshotOperation(@Nonnull long checkpointId, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull SnapshotDirectory localBackupDirectory, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
            super(checkpointId, checkpointStreamFactory, localBackupDirectory, stateMetaInfoSnapshots);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
            boolean completed = false;
            SnapshotResult<StreamStateHandle> metaStateHandle = null;
            ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> privateFiles = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>();
            try {
                metaStateHandle = RocksNativeFullSnapshotStrategy.this.materializeMetaData(snapshotCloseableRegistry, this.tmpResourcesRegistry, this.stateMetaInfoSnapshots, this.checkpointId, this.checkpointStreamFactory);
                Preconditions.checkNotNull(metaStateHandle, (String)"Metadata was not properly created.");
                Preconditions.checkNotNull((Object)metaStateHandle.getJobManagerOwnedSnapshot(), (String)"Metadata for job manager was not properly created.");
                long checkpointedSize = metaStateHandle.getStateSize();
                IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksNativeFullSnapshotStrategy.this.backendUID, RocksNativeFullSnapshotStrategy.this.keyGroupRange, this.checkpointId, Collections.emptyList(), privateFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot(), checkpointedSize += this.uploadSnapshotFiles(privateFiles, snapshotCloseableRegistry, this.tmpResourcesRegistry));
                Optional<KeyedStateHandle> localSnapshot = this.getLocalSnapshot((StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), Collections.emptyList());
                SnapshotResult snapshotResult = localSnapshot.map(keyedStateHandle -> SnapshotResult.withLocalState((StateObject)jmIncrementalKeyedStateHandle, (StateObject)keyedStateHandle)).orElseGet(() -> SnapshotResult.of((StateObject)jmIncrementalKeyedStateHandle));
                completed = true;
                SnapshotResult snapshotResult2 = snapshotResult;
                return snapshotResult2;
            }
            finally {
                if (!completed) {
                    RocksNativeFullSnapshotStrategy.this.cleanupIncompleteSnapshot(this.tmpResourcesRegistry, this.localBackupDirectory);
                }
            }
        }

        private long uploadSnapshotFiles(@Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> privateFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception {
            Preconditions.checkState((boolean)this.localBackupDirectory.exists());
            Path[] files = this.localBackupDirectory.listDirectory();
            long uploadedSize = 0L;
            if (files != null) {
                List<IncrementalKeyedStateHandle.HandleAndLocalPath> uploadedFiles = RocksNativeFullSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(Arrays.asList(files), this.checkpointStreamFactory, CheckpointedStateScope.EXCLUSIVE, snapshotCloseableRegistry, tmpResourcesRegistry);
                uploadedSize += uploadedFiles.stream().mapToLong(e -> e.getStateSize()).sum();
                privateFiles.addAll(uploadedFiles);
            }
            return uploadedSize;
        }
    }
}

