/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.filemerging;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.OutputStreamAndPath;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FileMergingSnapshotManagerBase
implements FileMergingSnapshotManager {
    private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class);
    private final String id;
    protected final Executor ioExecutor;
    protected FileSystem fs;
    protected Path checkpointDir;
    protected Path sharedStateDir;
    protected Path taskOwnedStateDir;
    private boolean fileSystemInitiated = false;
    protected boolean shouldSyncAfterClosingLogicalFile;
    protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile;
    private final Map<FileMergingSnapshotManager.SubtaskKey, Path> managedSharedStateDir = new ConcurrentHashMap<FileMergingSnapshotManager.SubtaskKey, Path>();
    protected Path managedExclusiveStateDir;

    public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
        this.id = id;
        this.ioExecutor = ioExecutor;
    }

    @Override
    public void initFileSystem(FileSystem fileSystem, Path checkpointBaseDir, Path sharedStateDir, Path taskOwnedStateDir) throws IllegalArgumentException {
        if (this.fileSystemInitiated) {
            Preconditions.checkArgument((boolean)checkpointBaseDir.equals((Object)this.checkpointDir), (Object)"The checkpoint base dir is not deterministic across subtasks.");
            Preconditions.checkArgument((boolean)sharedStateDir.equals((Object)this.sharedStateDir), (Object)"The shared checkpoint dir is not deterministic across subtasks.");
            Preconditions.checkArgument((boolean)taskOwnedStateDir.equals((Object)this.taskOwnedStateDir), (Object)"The task-owned checkpoint dir is not deterministic across subtasks.");
            return;
        }
        this.fs = fileSystem;
        this.checkpointDir = (Path)Preconditions.checkNotNull((Object)checkpointBaseDir);
        this.sharedStateDir = (Path)Preconditions.checkNotNull((Object)sharedStateDir);
        this.taskOwnedStateDir = (Path)Preconditions.checkNotNull((Object)taskOwnedStateDir);
        this.fileSystemInitiated = true;
        this.shouldSyncAfterClosingLogicalFile = FileMergingSnapshotManagerBase.shouldSyncAfterClosingLogicalFile(fileSystem);
        Path managedExclusivePath = new Path(taskOwnedStateDir, this.id);
        this.createManagedDirectory(managedExclusivePath);
        this.managedExclusiveStateDir = managedExclusivePath;
    }

    @Override
    public void registerSubtaskForSharedStates(FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        String managedDirName = subtaskKey.getManagedDirName();
        Path managedPath = new Path(this.sharedStateDir, managedDirName);
        if (!this.managedSharedStateDir.containsKey(subtaskKey)) {
            this.createManagedDirectory(managedPath);
            this.managedSharedStateDir.put(subtaskKey, managedPath);
        }
    }

    protected LogicalFile createLogicalFile(@Nonnull PhysicalFile physicalFile, int startOffset, int length, @Nonnull FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        LogicalFile.LogicalFileId fileID = LogicalFile.LogicalFileId.generateRandomId();
        return new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey);
    }

    @Nonnull
    protected PhysicalFile createPhysicalFile(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) throws IOException {
        Exception latestException = null;
        Path dirPath = this.getManagedDir(subtaskKey, scope);
        if (dirPath == null) {
            throw new IOException("Could not get " + (Object)((Object)scope) + " path for subtask " + subtaskKey + ", the directory may have not been created.");
        }
        for (int attempt = 0; attempt < 10; ++attempt) {
            try {
                OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware((FileSystem)this.fs, (Path)this.generatePhysicalFilePath(dirPath), (FileSystem.WriteMode)FileSystem.WriteMode.NO_OVERWRITE);
                FSDataOutputStream outputStream = streamAndPath.stream();
                Path filePath = streamAndPath.path();
                PhysicalFile result = new PhysicalFile(outputStream, filePath, this.physicalFileDeleter, scope);
                this.updateFileCreationMetrics(filePath);
                return result;
            }
            catch (Exception e) {
                latestException = e;
                continue;
            }
        }
        throw new IOException("Could not open output stream for state file merging.", latestException);
    }

    private void updateFileCreationMetrics(Path path) {
        LOG.debug("Create a new physical file {} for checkpoint file merging.", (Object)path);
    }

    protected Path generatePhysicalFilePath(Path dirPath) {
        String fileName = UUID.randomUUID().toString();
        return new Path(dirPath, fileName);
    }

    protected final void deletePhysicalFile(Path filePath) {
        this.ioExecutor.execute(() -> {
            try {
                this.fs.delete(filePath, false);
                LOG.debug("Physical file deleted: {}.", (Object)filePath);
            }
            catch (IOException e) {
                LOG.warn("Fail to delete file: {}", (Object)filePath);
            }
        });
    }

    @Nonnull
    protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey var1, long var2, CheckpointedStateScope var4) throws IOException;

    protected abstract void returnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey var1, long var2, PhysicalFile var4) throws IOException;

    @Override
    public Path getManagedDir(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) {
        if (scope.equals((Object)CheckpointedStateScope.SHARED)) {
            return this.managedSharedStateDir.get(subtaskKey);
        }
        return this.managedExclusiveStateDir;
    }

    static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) {
        return true;
    }

    private void createManagedDirectory(Path managedPath) {
        block6: {
            try {
                FileStatus fileStatus = null;
                try {
                    fileStatus = this.fs.getFileStatus(managedPath);
                }
                catch (FileNotFoundException fileNotFoundException) {
                    // empty catch block
                }
                if (fileStatus == null) {
                    this.fs.mkdirs(managedPath);
                    LOG.info("Created a directory {} for checkpoint file-merging.", (Object)managedPath);
                    break block6;
                }
                if (fileStatus.isDir()) {
                    LOG.info("Reusing previous directory {} for checkpoint file-merging.", (Object)managedPath);
                    break block6;
                }
                throw new FlinkRuntimeException("The managed path " + managedPath + " for file-merging is occupied by another file. Cannot create directory.");
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Cannot create directory " + managedPath + " for file-merging ", (Throwable)e);
            }
        }
    }

    @Override
    public void close() throws IOException {
    }
}

