/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.filemerging;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.OutputStreamAndPath;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.filemerging.BlockingPhysicalFilePool;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingMetricGroup;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.NonBlockingPhysicalFilePool;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFilePool;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FileMergingSnapshotManagerBase
implements FileMergingSnapshotManager {
    private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class);
    private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
    private final String id;
    protected final Executor ioExecutor;
    protected final Object lock = new Object();
    @GuardedBy(value="lock")
    protected TreeMap<Long, Set<LogicalFile>> uploadedStates = new TreeMap();
    private final Map<LogicalFile.LogicalFileId, LogicalFile> knownLogicalFiles = new ConcurrentHashMap<LogicalFile.LogicalFileId, LogicalFile>();
    protected FileSystem fs;
    protected Path checkpointDir;
    protected Path sharedStateDir;
    protected Path taskOwnedStateDir;
    protected int writeBufferSize;
    private boolean fileSystemInitiated = false;
    protected boolean shouldSyncAfterClosingLogicalFile;
    protected long maxPhysicalFileSize;
    protected PhysicalFilePool.Type filePoolType;
    protected final float maxSpaceAmplification;
    protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile;
    private final Object notifyLock = new Object();
    @GuardedBy(value="notifyLock")
    private final TreeMap<Long, Set<FileMergingSnapshotManager.SubtaskKey>> notifiedSubtaskCheckpoint = new TreeMap();
    @GuardedBy(value="notifyLock")
    private final TreeSet<Long> notifiedCheckpoint = new TreeSet();
    private final Map<FileMergingSnapshotManager.SubtaskKey, Path> managedSharedStateDir = new ConcurrentHashMap<FileMergingSnapshotManager.SubtaskKey, Path>();
    private final Map<FileMergingSnapshotManager.SubtaskKey, DirectoryHandleWithReferenceTrack> managedSharedStateDirHandles = new ConcurrentHashMap<FileMergingSnapshotManager.SubtaskKey, DirectoryHandleWithReferenceTrack>();
    protected Path managedExclusiveStateDir;
    protected DirectoryHandleWithReferenceTrack managedExclusiveStateDirHandle;
    protected FileMergingSnapshotManager.SpaceStat spaceStat;
    protected FileMergingMetricGroup metricGroup;

    public FileMergingSnapshotManagerBase(String id, long maxFileSize, PhysicalFilePool.Type filePoolType, float maxSpaceAmplification, Executor ioExecutor, MetricGroup parentMetricGroup) {
        this.id = id;
        this.maxPhysicalFileSize = maxFileSize;
        this.filePoolType = filePoolType;
        this.maxSpaceAmplification = maxSpaceAmplification < 1.0f ? Float.MAX_VALUE : maxSpaceAmplification;
        this.ioExecutor = ioExecutor;
        this.spaceStat = new FileMergingSnapshotManager.SpaceStat();
        this.metricGroup = new FileMergingMetricGroup(parentMetricGroup, this.spaceStat);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void initFileSystem(FileSystem fileSystem, Path checkpointBaseDir, Path sharedStateDir, Path taskOwnedStateDir, int writeBufferSize) throws IllegalArgumentException {
        Object object = this.lock;
        synchronized (object) {
            if (this.fileSystemInitiated) {
                Preconditions.checkArgument(checkpointBaseDir.equals(this.checkpointDir), "The checkpoint base dir is not deterministic across subtasks.");
                Preconditions.checkArgument(sharedStateDir.equals(this.sharedStateDir), "The shared checkpoint dir is not deterministic across subtasks.");
                Preconditions.checkArgument(taskOwnedStateDir.equals(this.taskOwnedStateDir), "The task-owned checkpoint dir is not deterministic across subtasks.");
                return;
            }
            this.fs = fileSystem;
            this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir);
            this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir);
            this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir);
            this.shouldSyncAfterClosingLogicalFile = FileMergingSnapshotManagerBase.shouldSyncAfterClosingLogicalFile(fileSystem);
            Path managedExclusivePath = new Path(taskOwnedStateDir, FileMergingSnapshotManagerBase.uriEscape(this.id));
            boolean newCreated = this.createManagedDirectory(managedExclusivePath);
            this.managedExclusiveStateDir = managedExclusivePath;
            this.managedExclusiveStateDirHandle = DirectoryHandleWithReferenceTrack.wrap(DirectoryStreamStateHandle.of(managedExclusivePath), newCreated);
            this.writeBufferSize = writeBufferSize;
            this.fileSystemInitiated = true;
        }
    }

    @Override
    public void registerSubtaskForSharedStates(FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        String managedDirName = subtaskKey.getManagedDirName();
        Path managedPath = new Path(this.sharedStateDir, FileMergingSnapshotManagerBase.uriEscape(managedDirName));
        if (!this.managedSharedStateDir.containsKey(subtaskKey)) {
            boolean newCreated = this.createManagedDirectory(managedPath);
            this.managedSharedStateDir.put(subtaskKey, managedPath);
            this.managedSharedStateDirHandles.put(subtaskKey, DirectoryHandleWithReferenceTrack.wrap(DirectoryStreamStateHandle.of(managedPath), newCreated));
        }
    }

    @Override
    public void unregisterSubtask(FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        if (this.managedSharedStateDir.containsKey(subtaskKey)) {
            this.managedSharedStateDir.remove(subtaskKey);
            this.managedSharedStateDirHandles.get(subtaskKey).tryCleanupQuietly();
            this.managedSharedStateDirHandles.remove(subtaskKey);
        }
    }

    protected LogicalFile createLogicalFile(@Nonnull PhysicalFile physicalFile, long startOffset, long length, @Nonnull FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        LogicalFile.LogicalFileId fileID = LogicalFile.LogicalFileId.generateRandomId();
        LogicalFile file = new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey);
        this.knownLogicalFiles.put(fileID, file);
        if (physicalFile.isOwned()) {
            this.spaceStat.onLogicalFileCreate(length);
            this.spaceStat.onPhysicalFileUpdate(length);
        }
        return file;
    }

    @Nonnull
    protected PhysicalFile createPhysicalFile(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) throws IOException {
        Exception latestException = null;
        Path dirPath = this.getManagedDir(subtaskKey, scope);
        if (dirPath == null) {
            throw new IOException("Could not get " + String.valueOf((Object)scope) + " path for subtask " + String.valueOf(subtaskKey) + ", the directory may have not been created.");
        }
        for (int attempt = 0; attempt < 10; ++attempt) {
            try {
                OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware(this.fs, this.generatePhysicalFilePath(dirPath), FileSystem.WriteMode.NO_OVERWRITE);
                FSDataOutputStream outputStream = streamAndPath.stream();
                Path filePath = streamAndPath.path();
                PhysicalFile result = new PhysicalFile(outputStream, filePath, this.physicalFileDeleter, scope);
                this.updateFileCreationMetrics(filePath);
                return result;
            }
            catch (Exception e) {
                latestException = e;
                continue;
            }
        }
        throw new IOException("Could not open output stream for state file merging.", latestException);
    }

    @Override
    public FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream(final FileMergingSnapshotManager.SubtaskKey subtaskKey, final long checkpointId, final CheckpointedStateScope scope) {
        return new FileMergingCheckpointStateOutputStream(this.writeBufferSize, new FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy(){
            PhysicalFile physicalFile;
            LogicalFile logicalFile;

            @Override
            public Tuple2<FSDataOutputStream, Path> providePhysicalFile() throws IOException {
                this.physicalFile = FileMergingSnapshotManagerBase.this.getOrCreatePhysicalFileForCheckpoint(subtaskKey, checkpointId, scope);
                return new Tuple2<FSDataOutputStream, Path>(this.physicalFile.getOutputStream(), this.physicalFile.getFilePath());
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public SegmentFileStateHandle closeStreamAndCreateStateHandle(Path filePath, long startPos, long stateSize) throws IOException {
                if (this.physicalFile == null) {
                    return null;
                }
                this.logicalFile = FileMergingSnapshotManagerBase.this.createLogicalFile(this.physicalFile, startPos, stateSize, subtaskKey);
                this.logicalFile.advanceLastCheckpointId(checkpointId);
                Object object = FileMergingSnapshotManagerBase.this.lock;
                synchronized (object) {
                    FileMergingSnapshotManagerBase.this.uploadedStates.computeIfAbsent(checkpointId, key -> new HashSet()).add(this.logicalFile);
                }
                FileMergingSnapshotManagerBase.this.returnPhysicalFileForNextReuse(subtaskKey, checkpointId, this.physicalFile);
                return new SegmentFileStateHandle(this.physicalFile.getFilePath(), startPos, stateSize, scope, this.logicalFile.getFileId());
            }

            @Override
            public void closeStreamExceptionally() throws IOException {
                if (this.physicalFile != null) {
                    if (this.logicalFile != null) {
                        FileMergingSnapshotManagerBase.this.discardSingleLogicalFile(this.logicalFile, checkpointId);
                    } else {
                        this.physicalFile.close();
                        this.physicalFile.deleteIfNecessary();
                    }
                }
            }
        });
    }

    private void updateFileCreationMetrics(Path path) {
        this.spaceStat.onPhysicalFileCreate();
        LOG.debug("Create a new physical file {} for checkpoint file merging.", (Object)path);
    }

    protected Path generatePhysicalFilePath(Path dirPath) {
        String fileName = UUID.randomUUID().toString();
        return new Path(dirPath, fileName);
    }

    @VisibleForTesting
    boolean isResponsibleForFile(Path filePath) {
        Path parent = filePath.getParent();
        return parent.equals(this.managedExclusiveStateDir) || this.managedSharedStateDir.containsValue(parent);
    }

    protected final void deletePhysicalFile(Path filePath, long size) {
        this.ioExecutor.execute(() -> {
            try {
                this.fs.delete(filePath, false);
                this.spaceStat.onPhysicalFileDelete(size);
                LOG.debug("Physical file deleted: {}.", (Object)filePath);
            }
            catch (IOException e) {
                LOG.warn("Fail to delete file: {}", (Object)filePath);
            }
        });
    }

    protected final PhysicalFilePool createPhysicalPool() {
        switch (this.filePoolType) {
            case NON_BLOCKING: {
                return new NonBlockingPhysicalFilePool(this.maxPhysicalFileSize, this::createPhysicalFile);
            }
            case BLOCKING: {
                return new BlockingPhysicalFilePool(this.maxPhysicalFileSize, this::createPhysicalFile);
            }
        }
        throw new UnsupportedOperationException("Unsupported type of physical file pool: " + String.valueOf((Object)this.filePoolType));
    }

    @Nonnull
    protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey var1, long var2, CheckpointedStateScope var4) throws IOException;

    protected abstract void returnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey var1, long var2, PhysicalFile var4) throws IOException;

    protected void discardCheckpoint(long checkpointId) throws IOException {
        this.controlSpace();
    }

    @Override
    public void notifyCheckpointStart(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId) {
        if (this.fileSystemInitiated) {
            this.managedSharedStateDirHandles.computeIfPresent(subtaskKey, (k, v) -> {
                v.addReferenceWhenCheckpointStart(checkpointId);
                return v;
            });
            this.managedExclusiveStateDirHandle.addReferenceWhenCheckpointStart(checkpointId);
        }
    }

    @Override
    public void notifyCheckpointComplete(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId) throws Exception {
        if (this.fileSystemInitiated) {
            this.managedSharedStateDirHandles.computeIfPresent(subtaskKey, (k, v) -> {
                v.handoverOwnershipWhenCheckpointComplete(checkpointId);
                return v;
            });
            this.managedExclusiveStateDirHandle.handoverOwnershipWhenCheckpointComplete(checkpointId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyCheckpointAborted(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId) throws Exception {
        if (this.fileSystemInitiated) {
            this.managedSharedStateDirHandles.computeIfPresent(subtaskKey, (k, v) -> {
                v.removeReferenceWhenCheckpointAbort(checkpointId);
                return v;
            });
            this.managedExclusiveStateDirHandle.removeReferenceWhenCheckpointAbort(checkpointId);
        }
        Object object = this.lock;
        synchronized (object) {
            Set<LogicalFile> logicalFilesForCurrentCp = this.uploadedStates.get(checkpointId);
            if (logicalFilesForCurrentCp == null) {
                return;
            }
            if (this.discardLogicalFiles(subtaskKey, checkpointId, logicalFilesForCurrentCp)) {
                this.uploadedStates.remove(checkpointId);
            }
        }
        this.notifyReleaseCheckpoint(subtaskKey, checkpointId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyCheckpointSubsumed(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId) throws Exception {
        if (this.fileSystemInitiated) {
            this.managedSharedStateDirHandles.computeIfPresent(subtaskKey, (k, v) -> {
                v.handoverOwnershipWhenCheckpointSubsumed(checkpointId);
                return v;
            });
            this.managedExclusiveStateDirHandle.handoverOwnershipWhenCheckpointSubsumed(checkpointId);
        }
        Object object = this.lock;
        synchronized (object) {
            Iterator uploadedStatesIterator = this.uploadedStates.headMap(checkpointId, true).entrySet().iterator();
            while (uploadedStatesIterator.hasNext()) {
                Map.Entry entry = uploadedStatesIterator.next();
                if (!this.discardLogicalFiles(subtaskKey, checkpointId, (Set)entry.getValue())) continue;
                uploadedStatesIterator.remove();
            }
        }
        this.notifyReleaseCheckpoint(subtaskKey, checkpointId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyReleaseCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId) throws IOException {
        Object object = this.notifyLock;
        synchronized (object) {
            if (this.notifiedCheckpoint.contains(checkpointId)) {
                return;
            }
            Set knownSubtask = this.notifiedSubtaskCheckpoint.computeIfAbsent(checkpointId, e -> new HashSet());
            knownSubtask.add(subtaskKey);
            if (knownSubtask.containsAll(this.managedSharedStateDir.keySet())) {
                this.tryDiscardCheckpoint(checkpointId);
            }
            if (this.notifiedSubtaskCheckpoint.size() > 16) {
                this.notifiedSubtaskCheckpoint.pollFirstEntry();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryDiscardCheckpoint(long checkpointId) throws IOException {
        Object object = this.notifyLock;
        synchronized (object) {
            if (!this.notifiedCheckpoint.contains(checkpointId)) {
                this.notifiedCheckpoint.add(checkpointId);
                this.notifiedSubtaskCheckpoint.remove(checkpointId);
                this.discardCheckpoint(checkpointId);
                if (this.notifiedCheckpoint.size() > 16) {
                    this.notifiedCheckpoint.pollFirst();
                }
            }
        }
    }

    @Override
    public void reusePreviousStateHandle(long checkpointId, Collection<? extends StreamStateHandle> stateHandles) {
        for (StreamStateHandle streamStateHandle : stateHandles) {
            LogicalFile file;
            if (streamStateHandle instanceof SegmentFileStateHandle) {
                file = this.knownLogicalFiles.get(((SegmentFileStateHandle)streamStateHandle).getLogicalFileId());
                if (file == null) continue;
                file.advanceLastCheckpointId(checkpointId);
                continue;
            }
            if (!(streamStateHandle instanceof PlaceholderStreamStateHandle) || !((PlaceholderStreamStateHandle)streamStateHandle).isFileMerged() || (file = this.knownLogicalFiles.get(new LogicalFile.LogicalFileId(streamStateHandle.getStreamStateHandleID().getKeyString()))) == null) continue;
            file.advanceLastCheckpointId(checkpointId);
        }
    }

    private void controlSpace() {
        block2: {
            if (this.maxSpaceAmplification == Float.MAX_VALUE || !((float)this.spaceStat.logicalFileSize.get() * this.maxSpaceAmplification < (float)this.spaceStat.physicalFileSize.get())) break block2;
            long goalPhysicalSize = Math.round((float)this.spaceStat.logicalFileSize.get() * this.maxSpaceAmplification);
            AtomicLong aliveSize = new AtomicLong(0L);
            HashSet knownPhysicalFiles = new HashSet();
            this.knownLogicalFiles.values().stream().map(LogicalFile::getPhysicalFile).forEach(file -> {
                if (file.isCouldReuse() && knownPhysicalFiles.add(file)) {
                    aliveSize.addAndGet(file.getSize());
                }
            });
            if (aliveSize.get() > goalPhysicalSize) {
                PhysicalFile file2;
                TreeSet sortedPhysicalFile = new TreeSet((a, b) -> Long.compare(b.wastedSize(), a.wastedSize()));
                knownPhysicalFiles.stream().filter(PhysicalFile::closed).forEach(sortedPhysicalFile::add);
                Iterator iterator = sortedPhysicalFile.iterator();
                while (iterator.hasNext() && ((file2 = (PhysicalFile)iterator.next()).checkReuseOnSpaceAmplification(this.maxSpaceAmplification) || aliveSize.addAndGet(-file2.wastedSize()) > goalPhysicalSize)) {
                }
            }
        }
    }

    @Override
    public boolean couldReusePreviousStateHandle(StreamStateHandle stateHandle) {
        LogicalFile file;
        if (stateHandle instanceof SegmentFileStateHandle) {
            LogicalFile file2 = this.knownLogicalFiles.get(((SegmentFileStateHandle)stateHandle).getLogicalFileId());
            if (file2 != null) {
                return file2.getPhysicalFile().isCouldReuse();
            }
        } else if (stateHandle instanceof PlaceholderStreamStateHandle && ((PlaceholderStreamStateHandle)stateHandle).isFileMerged() && (file = this.knownLogicalFiles.get(new LogicalFile.LogicalFileId(stateHandle.getStreamStateHandleID().getKeyString()))) != null) {
            return file.getPhysicalFile().isCouldReuse();
        }
        return false;
    }

    public void discardSingleLogicalFile(LogicalFile logicalFile, long checkpointId) throws IOException {
        logicalFile.discardWithCheckpointId(checkpointId);
        if (logicalFile.getPhysicalFile().isOwned()) {
            this.spaceStat.onLogicalFileDelete(logicalFile.getLength());
        }
    }

    private boolean discardLogicalFiles(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, Set<LogicalFile> logicalFiles) throws Exception {
        Iterator<LogicalFile> logicalFileIterator = logicalFiles.iterator();
        while (logicalFileIterator.hasNext()) {
            LogicalFile logicalFile = logicalFileIterator.next();
            if (!logicalFile.getSubtaskKey().equals(subtaskKey) || logicalFile.getLastUsedCheckpointID() > checkpointId) continue;
            this.discardSingleLogicalFile(logicalFile, checkpointId);
            logicalFileIterator.remove();
            this.knownLogicalFiles.remove(logicalFile.getFileId());
        }
        if (logicalFiles.isEmpty()) {
            this.tryDiscardCheckpoint(checkpointId);
            return true;
        }
        return false;
    }

    @Override
    public Path getManagedDir(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) {
        if (scope.equals((Object)CheckpointedStateScope.SHARED)) {
            return this.managedSharedStateDir.get(subtaskKey);
        }
        return this.managedExclusiveStateDir;
    }

    @Override
    public DirectoryStreamStateHandle getManagedDirStateHandle(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) {
        if (scope.equals((Object)CheckpointedStateScope.SHARED)) {
            DirectoryHandleWithReferenceTrack handleWithTrack = this.managedSharedStateDirHandles.get(subtaskKey);
            return handleWithTrack != null ? handleWithTrack.getHandle() : null;
        }
        return this.managedExclusiveStateDirHandle.getHandle();
    }

    static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) {
        return true;
    }

    private static String uriEscape(String input) {
        return input.replaceAll("[;/?:@&=+$,\\[\\]]", "-");
    }

    private boolean createManagedDirectory(Path managedPath) {
        try {
            FileStatus fileStatus = null;
            try {
                fileStatus = this.fs.getFileStatus(managedPath);
            }
            catch (FileNotFoundException fileNotFoundException) {
                // empty catch block
            }
            if (fileStatus == null) {
                this.fs.mkdirs(managedPath);
                LOG.info("Created a directory {} for checkpoint file-merging.", (Object)managedPath);
                return true;
            }
            if (fileStatus.isDir()) {
                LOG.info("Reusing previous directory {} for checkpoint file-merging.", (Object)managedPath);
                return false;
            }
            throw new FlinkRuntimeException("The managed path " + String.valueOf(managedPath) + " for file-merging is occupied by another file. Cannot create directory.");
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Cannot create directory " + String.valueOf(managedPath) + " for file-merging ", e);
        }
    }

    @Override
    public void close() throws IOException {
        if (this.fileSystemInitiated) {
            this.quietlyCleanupManagedDir();
        }
    }

    private void quietlyCleanupManagedDir() {
        this.managedSharedStateDirHandles.forEach((subtaskKey, handleWithTrack) -> handleWithTrack.tryCleanupQuietly());
        this.managedExclusiveStateDirHandle.tryCleanupQuietly();
    }

    @VisibleForTesting
    public String getId() {
        return this.id;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void restoreStateHandles(long checkpointId, FileMergingSnapshotManager.SubtaskKey subtaskKey, Stream<SegmentFileStateHandle> stateHandles) {
        Object object = this.lock;
        synchronized (object) {
            Set restoredLogicalFiles = this.uploadedStates.computeIfAbsent(checkpointId, id -> new HashSet());
            HashMap knownPhysicalFiles = new HashMap();
            this.knownLogicalFiles.values().stream().map(LogicalFile::getPhysicalFile).forEach(file -> knownPhysicalFiles.putIfAbsent(file.getFilePath(), file));
            stateHandles.forEach(fileHandle -> {
                PhysicalFile physicalFile = knownPhysicalFiles.computeIfAbsent(fileHandle.getFilePath(), path -> {
                    boolean managedByFileMergingManager = this.fileSystemInitiated && this.isManagedByFileMergingManager((Path)path, subtaskKey, fileHandle.getScope());
                    PhysicalFile file = new PhysicalFile(null, (Path)path, this.physicalFileDeleter, fileHandle.getScope(), managedByFileMergingManager);
                    try {
                        file.updateSize(this.getFileSize(file));
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    if (managedByFileMergingManager) {
                        this.spaceStat.onPhysicalFileCreate();
                        this.spaceStat.onPhysicalFileUpdate(file.getSize());
                    }
                    return file;
                });
                LogicalFile.LogicalFileId logicalFileId = fileHandle.getLogicalFileId();
                LogicalFile logicalFile = new LogicalFile(logicalFileId, physicalFile, fileHandle.getStartPos(), fileHandle.getStateSize(), subtaskKey);
                if (physicalFile.isOwned()) {
                    this.spaceStat.onLogicalFileCreate(logicalFile.getLength());
                }
                this.knownLogicalFiles.put(logicalFileId, logicalFile);
                logicalFile.advanceLastCheckpointId(checkpointId);
                restoredLogicalFiles.add(logicalFile);
            });
        }
    }

    private long getFileSize(PhysicalFile file) throws IOException {
        FileStatus fileStatus = file.getFilePath().getFileSystem().getFileStatus(file.getFilePath());
        if (fileStatus == null || fileStatus.isDir()) {
            throw new FileNotFoundException("File " + String.valueOf(file.getFilePath()) + " does not exist.");
        }
        return fileStatus.getLen();
    }

    private boolean isManagedByFileMergingManager(Path filePath, FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) {
        if (scope == CheckpointedStateScope.SHARED) {
            Path managedDir = this.managedSharedStateDir.get(subtaskKey);
            return filePath.toString().startsWith(managedDir.toString());
        }
        if (scope == CheckpointedStateScope.EXCLUSIVE) {
            return filePath.toString().startsWith(this.managedExclusiveStateDir.toString());
        }
        throw new UnsupportedOperationException("Unsupported CheckpointStateScope " + String.valueOf((Object)scope));
    }

    @VisibleForTesting
    public LogicalFile getLogicalFile(LogicalFile.LogicalFileId fileId) {
        return this.knownLogicalFiles.get(fileId);
    }

    @VisibleForTesting
    TreeMap<Long, Set<LogicalFile>> getUploadedStates() {
        return this.uploadedStates;
    }

    @VisibleForTesting
    boolean isCheckpointDiscard(long checkpointId) {
        return this.notifiedCheckpoint.contains(checkpointId);
    }

    protected static class DirectoryHandleWithReferenceTrack {
        private final DirectoryStreamStateHandle directoryHandle;
        private final Set<Long> refCheckpointIds;
        private boolean tracking;

        DirectoryHandleWithReferenceTrack(DirectoryStreamStateHandle directoryHandle, boolean own) {
            this.directoryHandle = directoryHandle;
            this.refCheckpointIds = new HashSet<Long>();
            this.tracking = own;
        }

        static DirectoryHandleWithReferenceTrack wrap(DirectoryStreamStateHandle directoryHandle, boolean own) {
            return new DirectoryHandleWithReferenceTrack(directoryHandle, own);
        }

        DirectoryStreamStateHandle getHandle() {
            return this.directoryHandle;
        }

        void addReferenceWhenCheckpointStart(long checkpointId) {
            if (this.tracking) {
                LOG.debug("checkpoint:{} start, add reference to file-merging managed shared dir : {}", (Object)checkpointId, (Object)this.directoryHandle.getDirectory());
                this.refCheckpointIds.add(checkpointId);
            }
        }

        void removeReferenceWhenCheckpointAbort(long checkpointId) {
            if (this.tracking) {
                LOG.debug("checkpoint:{} aborted, remove reference to file-merging managed shared dir : {}", (Object)checkpointId, (Object)this.directoryHandle.getDirectory());
                this.refCheckpointIds.remove(checkpointId);
            }
        }

        void handoverOwnershipWhenCheckpointComplete(long checkpointId) {
            if (this.tracking) {
                LOG.debug("checkpoint:{} complete, handover ownership of file-merging managed shared dir to JobManager : {}", (Object)checkpointId, (Object)this.directoryHandle.getDirectory());
                this.tracking = false;
                this.refCheckpointIds.clear();
            }
        }

        void handoverOwnershipWhenCheckpointSubsumed(long checkpointId) {
            if (this.tracking) {
                LOG.debug("checkpoint:{} subsumed, handover ownership of file-merging managed shared dir to JobManager : {}", (Object)checkpointId, (Object)this.directoryHandle.getDirectory());
                this.tracking = false;
                this.refCheckpointIds.clear();
            }
        }

        void tryCleanupQuietly() {
            if (this.tracking && this.refCheckpointIds.isEmpty() && this.directoryHandle != null) {
                try {
                    this.directoryHandle.discardState();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
    }
}

