/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.fs;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalBlockLocation;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.state.forst.ForStConfigurableOptions;
import org.apache.flink.state.forst.fs.ByteBufferReadableFSDataInputStream;
import org.apache.flink.state.forst.fs.ByteBufferWritableFSDataOutputStream;
import org.apache.flink.state.forst.fs.cache.BundledCacheLimitPolicy;
import org.apache.flink.state.forst.fs.cache.CacheLimitPolicy;
import org.apache.flink.state.forst.fs.cache.CachedDataInputStream;
import org.apache.flink.state.forst.fs.cache.CachedDataOutputStream;
import org.apache.flink.state.forst.fs.cache.FileBasedCache;
import org.apache.flink.state.forst.fs.cache.SizeBasedCacheLimitPolicy;
import org.apache.flink.state.forst.fs.cache.SpaceBasedCacheLimitPolicy;
import org.apache.flink.state.forst.fs.filemapping.FSDataOutputStreamWithEntry;
import org.apache.flink.state.forst.fs.filemapping.FileBackedMappingEntrySource;
import org.apache.flink.state.forst.fs.filemapping.FileMappingManager;
import org.apache.flink.state.forst.fs.filemapping.FileOwnership;
import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider;
import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
import org.apache.flink.state.forst.fs.filemapping.MappingEntrySource;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForStFlinkFileSystem
extends FileSystem
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ForStFlinkFileSystem.class);
    private static final int DEFAULT_INPUT_STREAM_CAPACITY = 32;
    private final FileSystem localFS;
    private final FileSystem delegateFS;
    private final String remoteBase;
    @Nullable
    private final FileBasedCache fileBasedCache;
    @Nonnull
    private final FileMappingManager fileMappingManager;

    public ForStFlinkFileSystem(FileSystem delegateFS, String remoteBase, String localBase, @Nullable FileBasedCache fileBasedCache) {
        this.localFS = FileSystem.getLocalFileSystem();
        this.delegateFS = delegateFS;
        this.remoteBase = remoteBase;
        this.fileBasedCache = fileBasedCache;
        this.fileMappingManager = new FileMappingManager(delegateFS, remoteBase, localBase);
    }

    protected ForStFlinkFileSystem(ForStFlinkFileSystem forStFlinkFileSystem) {
        this.localFS = forStFlinkFileSystem.localFS;
        this.delegateFS = forStFlinkFileSystem.delegateFS;
        this.remoteBase = forStFlinkFileSystem.remoteBase;
        this.fileBasedCache = forStFlinkFileSystem.fileBasedCache;
        this.fileMappingManager = forStFlinkFileSystem.fileMappingManager;
    }

    public static ForStFlinkFileSystem get(URI uri) throws IOException {
        return new ForStFlinkFileSystem(FileSystem.get((URI)uri), uri.toString(), System.getProperty("java.io.tmpdir"), null);
    }

    public static ForStFlinkFileSystem get(URI uri, Path localBase, FileBasedCache fileBasedCache) throws IOException {
        Preconditions.checkNotNull((Object)localBase, (String)"localBase is null, remote uri: %s.", (Object[])new Object[]{uri});
        return new ForStFlinkFileSystem(FileSystem.get((URI)uri), uri.toString(), localBase.toString(), fileBasedCache);
    }

    public static FileBasedCache getFileBasedCache(ReadableConfig config, Path cacheBase, Path remoteForStPath, long cacheCapacity, long cacheReservedSize, MetricGroup metricGroup) throws IOException {
        boolean useSpaceBasedCache;
        if (cacheBase == null || cacheCapacity <= 0L && cacheReservedSize <= 0L) {
            return null;
        }
        if (cacheBase.getFileSystem().equals(remoteForStPath.getFileSystem())) {
            LOG.info("Skip creating ForSt cache since the cache and primary path are on the same file system.");
            return null;
        }
        if (!cacheBase.getFileSystem().mkdirs(cacheBase)) {
            throw new IOException(String.format("Could not create ForSt cache directory at %s.", cacheBase));
        }
        CacheLimitPolicy cacheLimitPolicy = null;
        long targetSstFileSize = ((MemorySize)config.get(ForStConfigurableOptions.TARGET_FILE_SIZE_BASE)).getBytes();
        boolean useSizeBasedCache = cacheCapacity > 0L;
        boolean bl = useSpaceBasedCache = cacheReservedSize > 0L && SpaceBasedCacheLimitPolicy.worksOn(new File(cacheBase.toString()));
        if (useSizeBasedCache && useSpaceBasedCache) {
            cacheLimitPolicy = new BundledCacheLimitPolicy(new SizeBasedCacheLimitPolicy(cacheCapacity, targetSstFileSize), new SpaceBasedCacheLimitPolicy(new File(cacheBase.toString()), cacheReservedSize, targetSstFileSize));
        } else if (useSizeBasedCache) {
            cacheLimitPolicy = new SizeBasedCacheLimitPolicy(cacheCapacity, targetSstFileSize);
        } else if (useSpaceBasedCache) {
            cacheLimitPolicy = new SpaceBasedCacheLimitPolicy(new File(cacheBase.toString()), cacheReservedSize, targetSstFileSize);
        } else {
            return null;
        }
        return new FileBasedCache(config, cacheLimitPolicy, ForStFlinkFileSystem.getUnguardedFileSystem(cacheBase), cacheBase, metricGroup);
    }

    public FileSystem getDelegateFS() {
        return this.delegateFS;
    }

    public String getRemoteBase() {
        return this.remoteBase;
    }

    public ByteBufferWritableFSDataOutputStream create(Path path) throws IOException {
        return this.create(path, FileSystem.WriteMode.OVERWRITE);
    }

    public synchronized ByteBufferWritableFSDataOutputStream create(Path dbFilePath, FileSystem.WriteMode overwriteMode) throws IOException {
        MappingEntry createdMappingEntry = this.fileMappingManager.createNewFile(dbFilePath, overwriteMode == FileSystem.WriteMode.OVERWRITE, this.fileBasedCache);
        FileBackedMappingEntrySource source = (FileBackedMappingEntrySource)createdMappingEntry.getSource();
        Path sourceRealPath = source.getFilePath();
        FileSystem fileSystem = ForStFlinkFileSystem.getUnguardedFileSystem(sourceRealPath);
        Object outputStream = fileSystem.create(sourceRealPath, overwriteMode);
        outputStream = new FSDataOutputStreamWithEntry((FSDataOutputStream)outputStream, createdMappingEntry);
        CachedDataOutputStream cachedDataOutputStream = this.createCachedDataOutputStream(dbFilePath, sourceRealPath, (FSDataOutputStream)outputStream, createdMappingEntry.getFileOwnership());
        LOG.trace("Create file: dbFilePath: {}, sourceRealPath: {}, cachedDataOutputStream: {}", new Object[]{dbFilePath, sourceRealPath, cachedDataOutputStream});
        return new ByteBufferWritableFSDataOutputStream((FSDataOutputStream)(cachedDataOutputStream == null ? outputStream : cachedDataOutputStream));
    }

    public synchronized ByteBufferReadableFSDataInputStream open(Path dbFilePath, int bufferSize) throws IOException {
        MappingEntry mappingEntry = this.fileMappingManager.mappingEntry(dbFilePath.toString());
        Preconditions.checkNotNull((Object)((Object)mappingEntry));
        MappingEntrySource source = mappingEntry.getSource();
        return new ByteBufferReadableFSDataInputStream(() -> {
            FSDataInputStream inputStream = source.openInputStream(bufferSize);
            CachedDataInputStream cachedDataInputStream = this.createCachedDataInputStream(dbFilePath, source, inputStream, mappingEntry.getFileOwnership());
            return cachedDataInputStream == null ? inputStream : cachedDataInputStream;
        }, 32, source.getSize());
    }

    public synchronized ByteBufferReadableFSDataInputStream open(Path dbFilePath) throws IOException {
        MappingEntry mappingEntry = this.fileMappingManager.mappingEntry(dbFilePath.toString());
        Preconditions.checkNotNull((Object)((Object)mappingEntry));
        MappingEntrySource source = mappingEntry.getSource();
        return new ByteBufferReadableFSDataInputStream(() -> {
            FSDataInputStream inputStream = source.openInputStream();
            CachedDataInputStream cachedDataInputStream = this.createCachedDataInputStream(dbFilePath, source, inputStream, mappingEntry.getFileOwnership());
            return cachedDataInputStream == null ? inputStream : cachedDataInputStream;
        }, 32, source.getSize());
    }

    public synchronized boolean rename(Path src, Path dst) throws IOException {
        return this.fileMappingManager.renameFile(src.toString(), dst.toString());
    }

    public synchronized Path getWorkingDirectory() {
        return this.delegateFS.getWorkingDirectory();
    }

    public synchronized Path getHomeDirectory() {
        return this.delegateFS.getHomeDirectory();
    }

    public synchronized URI getUri() {
        return this.delegateFS.getUri();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized boolean exists(Path f) throws IOException {
        MappingEntry mappingEntry = this.fileMappingManager.mappingEntry(f.toString());
        if (mappingEntry == null) {
            return this.delegateFS.exists(f) && this.delegateFS.getFileStatus(f).isDir();
        }
        if (FileOwnershipDecider.shouldAlwaysBeLocal(f, mappingEntry.getFileOwnership())) {
            return this.localFS.exists(mappingEntry.getSourcePath());
        }
        MappingEntry mappingEntry2 = mappingEntry;
        synchronized (mappingEntry2) {
            if (mappingEntry.isWriting()) {
                return true;
            }
        }
        return this.delegateFS.exists(mappingEntry.getSourcePath());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized FileStatus getFileStatus(Path path) throws IOException {
        MappingEntry mappingEntry = this.fileMappingManager.mappingEntry(path.toString());
        if (mappingEntry == null) {
            return new FileStatusWrapper(this.delegateFS.getFileStatus(path), path);
        }
        if (FileOwnershipDecider.shouldAlwaysBeLocal(path, mappingEntry.getFileOwnership())) {
            return new FileStatusWrapper(this.localFS.getFileStatus(mappingEntry.getSourcePath()), path);
        }
        MappingEntry mappingEntry2 = mappingEntry;
        synchronized (mappingEntry2) {
            if (mappingEntry.isWriting()) {
                return new DummyFSFileStatus(path);
            }
        }
        return new FileStatusWrapper(this.delegateFS.getFileStatus(mappingEntry.getSourcePath()), path);
    }

    public synchronized BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
        Path path = file.getPath();
        if (file instanceof FileStatusWrapper) {
            if (FileOwnershipDecider.shouldAlwaysBeLocal(path)) {
                return this.localFS.getFileBlockLocations(((FileStatusWrapper)file).delegate, start, len);
            }
            return this.delegateFS.getFileBlockLocations(((FileStatusWrapper)file).delegate, start, len);
        }
        if (file instanceof DummyFSFileStatus) {
            return new BlockLocation[]{new LocalBlockLocation(0L)};
        }
        throw new IOException("file is not an instance from ForStFlinkFileSystem.");
    }

    public synchronized FileStatus[] listStatus(Path path) throws IOException {
        ArrayList<FileStatus> fileStatuses = new ArrayList<FileStatus>();
        Object pathStr = path.toString();
        if (!((String)pathStr).endsWith("/")) {
            pathStr = (String)pathStr + "/";
        }
        List<String> mappingFiles = this.fileMappingManager.listByPrefix((String)pathStr);
        for (String mappingFile : mappingFiles) {
            String relativePath = mappingFile.substring(((String)pathStr).length());
            int slashIndex = relativePath.indexOf(47);
            if (slashIndex != -1) continue;
            fileStatuses.add(this.getFileStatus(new Path(mappingFile)));
        }
        return fileStatuses.toArray(new FileStatus[0]);
    }

    public synchronized boolean delete(Path path, boolean recursive) throws IOException {
        return this.fileMappingManager.deleteFileOrDirectory(path, recursive);
    }

    public synchronized boolean mkdirs(Path path) throws IOException {
        return this.delegateFS.mkdirs(path);
    }

    public synchronized boolean isDistributedFS() {
        return this.delegateFS.isDistributedFS();
    }

    public synchronized int link(Path src, Path dst) throws IOException {
        return this.fileMappingManager.link(src.toString(), dst.toString());
    }

    public synchronized int link(String src, Path dst) throws IOException {
        return this.fileMappingManager.link(src, dst.toString());
    }

    public synchronized void registerReusedRestoredFile(String key, StreamStateHandle stateHandle, Path dbFilePath) {
        MappingEntry mappingEntry = this.fileMappingManager.registerReusedRestoredFile(key, stateHandle, dbFilePath, this.fileBasedCache);
    }

    @Nullable
    public synchronized MappingEntry getMappingEntry(Path path) {
        return this.fileMappingManager.mappingEntry(path.toString());
    }

    public synchronized void giveUpOwnership(Path path, StreamStateHandle stateHandle) {
        this.fileMappingManager.giveUpOwnership(path, stateHandle);
    }

    private static FileSystem getUnguardedFileSystem(Path path) throws IOException {
        return FileSystem.getUnguardedFileSystem((URI)path.toUri());
    }

    @Nullable
    private CachedDataOutputStream createCachedDataOutputStream(Path dbFilePath, Path srcRealPath, FSDataOutputStream outputStream, FileOwnership fileOwnership) throws IOException {
        if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath, fileOwnership)) {
            return null;
        }
        return this.fileBasedCache == null ? null : this.fileBasedCache.create(outputStream, srcRealPath);
    }

    @Nullable
    private CachedDataInputStream createCachedDataInputStream(Path dbFilePath, MappingEntrySource source, FSDataInputStream inputStream, FileOwnership fileOwnership) throws IOException {
        if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath, fileOwnership) || !source.cacheable()) {
            return null;
        }
        return this.fileBasedCache == null ? null : this.fileBasedCache.open(source.getFilePath(), inputStream);
    }

    @Override
    public void close() throws IOException {
        if (this.fileBasedCache != null) {
            this.fileBasedCache.close();
        }
    }

    static class DummyFSFileStatus
    implements FileStatus {
        private final Path path;

        DummyFSFileStatus(Path path) {
            this.path = path;
        }

        public long getLen() {
            return 0L;
        }

        public long getBlockSize() {
            return 0L;
        }

        public short getReplication() {
            return 0;
        }

        public long getModificationTime() {
            return 0L;
        }

        public long getAccessTime() {
            return 0L;
        }

        public boolean isDir() {
            return false;
        }

        public Path getPath() {
            return this.path;
        }
    }

    public static class FileStatusWrapper
    implements FileStatus {
        private final FileStatus delegate;
        private final Path path;

        public FileStatusWrapper(FileStatus delegate, Path path) {
            this.delegate = delegate;
            this.path = path;
        }

        public long getLen() {
            return this.delegate.getLen();
        }

        public long getBlockSize() {
            return this.delegate.getBlockSize();
        }

        public short getReplication() {
            return this.delegate.getReplication();
        }

        public long getModificationTime() {
            return this.delegate.getModificationTime();
        }

        public long getAccessTime() {
            return this.delegate.getAccessTime();
        }

        public boolean isDir() {
            return this.delegate.isDir();
        }

        public Path getPath() {
            return this.path;
        }
    }
}

