/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.fs.hadoop;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.hadoop.HadoopSecuredFileSystem;
import org.apache.paimon.hadoop.SerializableConfiguration;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.FunctionWithException;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ReflectionUtils;

public class HadoopFileIO
implements FileIO {
    private static final long serialVersionUID = 1L;
    protected SerializableConfiguration hadoopConf;
    private Options options;
    protected volatile transient Map<Pair<String, String>, FileSystem> fsMap;
    private volatile transient AtomicReference<Method> renameMethodRef;

    @VisibleForTesting
    public void setFileSystem(Path path, FileSystem fs) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = this.path(path);
        this.getFileSystem(hadoopPath, p -> fs);
    }

    @Override
    public boolean isObjectStore() {
        return false;
    }

    @Override
    public void configure(CatalogContext context) {
        this.hadoopConf = new SerializableConfiguration(context.hadoopConf());
        this.options = context.options();
    }

    public Configuration hadoopConf() {
        return this.hadoopConf.get();
    }

    @Override
    public SeekableInputStream newInputStream(Path path) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = this.path(path);
        return new HadoopSeekableInputStream(this.getFileSystem(hadoopPath).open(hadoopPath));
    }

    @Override
    public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = this.path(path);
        return new HadoopPositionOutputStream(this.getFileSystem(hadoopPath).create(hadoopPath, overwrite));
    }

    @Override
    public FileStatus getFileStatus(Path path) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = this.path(path);
        return new HadoopFileStatus(this.getFileSystem(hadoopPath).getFileStatus(hadoopPath));
    }

    @Override
    public FileStatus[] listStatus(Path path) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = this.path(path);
        FileStatus[] statuses = new FileStatus[]{};
        org.apache.hadoop.fs.FileStatus[] hadoopStatuses = this.getFileSystem(hadoopPath).listStatus(hadoopPath);
        if (hadoopStatuses != null) {
            statuses = new FileStatus[hadoopStatuses.length];
            for (int i = 0; i < hadoopStatuses.length; ++i) {
                statuses[i] = new HadoopFileStatus(hadoopStatuses[i]);
            }
        }
        return statuses;
    }

    @Override
    public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean recursive) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = this.path(path);
        final org.apache.hadoop.fs.RemoteIterator hadoopIter = this.getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
        return new RemoteIterator<FileStatus>(){

            @Override
            public boolean hasNext() throws IOException {
                return hadoopIter.hasNext();
            }

            @Override
            public FileStatus next() throws IOException {
                org.apache.hadoop.fs.FileStatus hadoopStatus = (org.apache.hadoop.fs.FileStatus)hadoopIter.next();
                return new HadoopFileStatus(hadoopStatus);
            }
        };
    }

    @Override
    public boolean exists(Path path) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = this.path(path);
        return this.getFileSystem(hadoopPath).exists(hadoopPath);
    }

    @Override
    public boolean delete(Path path, boolean recursive) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = this.path(path);
        return this.getFileSystem(hadoopPath).delete(hadoopPath, recursive);
    }

    @Override
    public boolean mkdirs(Path path) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = this.path(path);
        return this.getFileSystem(hadoopPath).mkdirs(hadoopPath);
    }

    @Override
    public boolean rename(Path src, Path dst) throws IOException {
        org.apache.hadoop.fs.Path hadoopSrc = this.path(src);
        org.apache.hadoop.fs.Path hadoopDst = this.path(dst);
        return this.getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
    }

    @Override
    public void overwriteFileUtf8(Path path, String content) throws IOException {
        boolean success = this.tryAtomicOverwriteViaRename(path, content);
        if (!success) {
            FileIO.super.overwriteFileUtf8(path, content);
        }
    }

    private org.apache.hadoop.fs.Path path(Path path) {
        return new org.apache.hadoop.fs.Path(path.toUri());
    }

    @VisibleForTesting
    FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
        return this.getFileSystem(path, this::createFileSystem);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private FileSystem getFileSystem(org.apache.hadoop.fs.Path path, FunctionWithException<org.apache.hadoop.fs.Path, FileSystem, IOException> creator) throws IOException {
        String authority;
        URI uri;
        String scheme;
        Pair key;
        Map<Pair<String, String>, FileSystem> map;
        FileSystem fs;
        if (this.fsMap == null) {
            HadoopFileIO hadoopFileIO = this;
            synchronized (hadoopFileIO) {
                if (this.fsMap == null) {
                    this.fsMap = new ConcurrentHashMap<Pair<String, String>, FileSystem>();
                }
            }
        }
        if ((fs = (map = this.fsMap).get(key = Pair.of((Object)(scheme = (uri = path.toUri()).getScheme()), (Object)(authority = uri.getAuthority())))) == null) {
            fs = creator.apply(path);
            map.put((Pair<String, String>)key, fs);
        }
        return fs;
    }

    protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
        Configuration conf = this.hadoopConf.get();
        FileSystem fileSystem = path.getFileSystem(conf);
        fileSystem = HadoopSecuredFileSystem.trySecureFileSystem(fileSystem, this.options, conf);
        return fileSystem;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean tryAtomicOverwriteViaRename(Path dst, String content) throws IOException {
        Method renameMethod;
        org.apache.hadoop.fs.Path hadoopDst = this.path(dst);
        FileSystem fs = this.getFileSystem(hadoopDst);
        if (this.renameMethodRef == null) {
            HadoopFileIO hadoopFileIO = this;
            synchronized (hadoopFileIO) {
                if (this.renameMethodRef == null) {
                    Method method;
                    try {
                        method = ReflectionUtils.getMethod(fs.getClass(), "rename", 3);
                    }
                    catch (NoSuchMethodException e) {
                        method = null;
                    }
                    this.renameMethodRef = new AtomicReference<Method>(method);
                }
            }
        }
        if ((renameMethod = this.renameMethodRef.get()) == null) {
            return false;
        }
        boolean renameDone = false;
        Path tempPath = dst.createTempPath();
        org.apache.hadoop.fs.Path hadoopTemp = this.path(tempPath);
        try {
            try (PositionOutputStream out2 = this.newOutputStream(tempPath, false);){
                OutputStreamWriter writer = new OutputStreamWriter((OutputStream)out2, StandardCharsets.UTF_8);
                writer.write(content);
                writer.flush();
            }
            renameMethod.invoke((Object)fs, hadoopTemp, hadoopDst, new Options.Rename[]{Options.Rename.OVERWRITE});
            renameDone = true;
            this.tryRemoveCrcFile(hadoopTemp);
            boolean out2 = true;
            return out2;
        }
        catch (IllegalAccessException | InvocationTargetException e) {
            throw new IOException(e);
        }
        finally {
            if (!renameDone) {
                this.deleteQuietly(tempPath);
            }
        }
    }

    private void tryRemoveCrcFile(org.apache.hadoop.fs.Path path) throws IOException {
        block3: {
            try {
                org.apache.hadoop.fs.Path checksumFile = new org.apache.hadoop.fs.Path(path.getParent(), String.format(".%s.crc", path.getName()));
                FileSystem fs = this.getFileSystem(checksumFile);
                if (fs.exists(checksumFile)) {
                    fs.delete(checksumFile, true);
                }
            }
            catch (Throwable t) {
                if (!(t instanceof VirtualMachineError) && !(t instanceof ThreadDeath) && !(t instanceof LinkageError)) break block3;
                throw t;
            }
        }
    }

    private static class HadoopFileStatus
    implements FileStatus {
        private final org.apache.hadoop.fs.FileStatus status;

        private HadoopFileStatus(org.apache.hadoop.fs.FileStatus status) {
            this.status = status;
        }

        @Override
        public long getLen() {
            return this.status.getLen();
        }

        @Override
        public boolean isDir() {
            return this.status.isDirectory();
        }

        @Override
        public Path getPath() {
            return new Path(this.status.getPath().toUri());
        }

        @Override
        public long getModificationTime() {
            return this.status.getModificationTime();
        }

        @Override
        public long getAccessTime() {
            return this.status.getAccessTime();
        }

        @Override
        public String getOwner() {
            return this.status.getOwner();
        }
    }

    private static class HadoopPositionOutputStream
    extends PositionOutputStream {
        private final FSDataOutputStream out;

        private HadoopPositionOutputStream(FSDataOutputStream out) {
            this.out = out;
        }

        @Override
        public long getPos() throws IOException {
            return this.out.getPos();
        }

        @Override
        public void write(int b) throws IOException {
            this.out.write(b);
        }

        @Override
        public void write(byte[] b) throws IOException {
            this.out.write(b);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.out.write(b, off, len);
        }

        @Override
        public void flush() throws IOException {
            this.out.hflush();
        }

        @Override
        public void close() throws IOException {
            this.out.close();
        }
    }

    private static class HadoopSeekableInputStream
    extends SeekableInputStream {
        private static final int MIN_SKIP_BYTES = 0x100000;
        private final FSDataInputStream in;

        private HadoopSeekableInputStream(FSDataInputStream in) {
            this.in = in;
        }

        @Override
        public void seek(long seekPos) throws IOException {
            long delta = seekPos - this.getPos();
            if (delta > 0L && delta <= 0x100000L) {
                this.skipFully(delta);
            } else if (delta != 0L) {
                this.forceSeek(seekPos);
            }
        }

        @Override
        public long getPos() throws IOException {
            return this.in.getPos();
        }

        @Override
        public int read() throws IOException {
            return this.in.read();
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            return this.in.read(b, off, len);
        }

        @Override
        public void close() throws IOException {
            this.in.close();
        }

        public void forceSeek(long seekPos) throws IOException {
            this.in.seek(seekPos);
        }

        public void skipFully(long bytes) throws IOException {
            while (bytes > 0L) {
                bytes -= this.in.skip(bytes);
            }
        }
    }
}

