package org.apache.doris.fs.remote.dfs;

import com.google.common.base.Preconditions;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.URI;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.fs.operations.HDFSFileOperations;
import org.apache.doris.fs.operations.HDFSOpParams;
import org.apache.doris.fs.operations.OpParams;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.mysql.MysqlServerStatusFlag;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/fs/remote/dfs/DFSFileSystem.class */
public class DFSFileSystem extends RemoteFileSystem {
    private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class);
    private HDFSFileOperations operations;

    public DFSFileSystem(Map<String, String> map) {
        this(StorageBackend.StorageType.HDFS, map);
    }

    public DFSFileSystem(StorageBackend.StorageType storageType, Map<String, String> map) {
        super(storageType.name(), storageType);
        this.operations = null;
        this.properties.putAll(map);
    }

    @Override // org.apache.doris.fs.remote.RemoteFileSystem
    protected FileSystem nativeFileSystem(String str) throws UserException {
        if (this.dfsFileSystem != null) {
            return this.dfsFileSystem;
        }
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        for (Map.Entry<String, String> entry : this.properties.entrySet()) {
            hdfsConfiguration.set(entry.getKey(), entry.getValue());
        }
        try {
            this.dfsFileSystem = (FileSystem) login(hdfsConfiguration).doAs(() -> {
                try {
                    return FileSystem.get(new Path(str).toUri(), hdfsConfiguration);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            Preconditions.checkNotNull(this.dfsFileSystem);
            this.operations = new HDFSFileOperations(this.dfsFileSystem);
            return this.dfsFileSystem;
        } catch (SecurityException e) {
            throw new UserException(e);
        }
    }

    private UserGroupInformation login(Configuration configuration) throws UserException {
        if (AuthType.KERBEROS.getDesc().equals(configuration.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, (String) null))) {
            try {
                UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
                String str = configuration.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL);
                LOG.debug("Current login user: {}", loginUser.getUserName());
                if (loginUser.hasKerberosCredentials() && loginUser.getUserName().equals(str)) {
                    loginUser.checkTGTAndReloginFromKeytab();
                    return loginUser;
                }
            } catch (IOException e) {
                LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e);
                return doLogin(configuration);
            }
        }
        return doLogin(configuration);
    }

    private UserGroupInformation doLogin(Configuration configuration) throws UserException {
        if (!AuthType.KERBEROS.getDesc().equals(configuration.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, (String) null))) {
            String str = configuration.get(HdfsResource.HADOOP_USER_NAME);
            if (str == null) {
                str = IcebergExternalCatalog.ICEBERG_HADOOP;
                LOG.debug(HdfsResource.HADOOP_USER_NAME + " is unset, use default user: hadoop");
            }
            UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(str);
            UserGroupInformation.setLoginUser(createRemoteUser);
            LOG.info("Login by proxy user, hadoop.username: {}", str);
            return createRemoteUser;
        }
        configuration.set(HdfsResource.HADOOP_KERBEROS_AUTHORIZATION, "true");
        String str2 = configuration.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL);
        String str3 = configuration.get(HdfsResource.HADOOP_KERBEROS_KEYTAB);
        UserGroupInformation.setConfiguration(configuration);
        try {
            UserGroupInformation loginUserFromKeytabAndReturnUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(str2, str3);
            UserGroupInformation.setLoginUser(loginUserFromKeytabAndReturnUGI);
            LOG.info("Login by kerberos authentication with principal: {}", str2);
            return loginUserFromKeytabAndReturnUGI;
        } catch (IOException e) {
            throw new UserException(e);
        }
    }

    @Override // org.apache.doris.fs.FileSystem
    public Status downloadWithFileSize(String str, String str2, long j) {
        LOG.debug("download from {} to {}, file size: {}.", str, str2, Long.valueOf(j));
        long currentTimeMillis = System.currentTimeMillis();
        HDFSOpParams of = OpParams.of(str);
        Status openReader = this.operations.openReader(of);
        if (openReader != Status.OK) {
            return openReader;
        }
        FSDataInputStream fsDataInputStream = of.fsDataInputStream();
        LOG.info("finished to open reader. download {} to {}.", str, str2);
        File file = new File(str2);
        if (file.exists()) {
            try {
                Files.walk(Paths.get(str2, new String[0]), FileVisitOption.FOLLOW_LINKS).sorted(Comparator.reverseOrder()).map((v0) -> {
                    return v0.toFile();
                }).forEach((v0) -> {
                    v0.delete();
                });
            } catch (IOException e) {
                return new Status(Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + str2 + ", msg: " + e.getMessage());
            }
        }
        try {
            if (!file.createNewFile()) {
                return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + str2);
            }
            Status status = Status.OK;
            try {
                try {
                    BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(Files.newOutputStream(file.toPath(), new OpenOption[0]));
                    Throwable th = null;
                    long j2 = j;
                    long j3 = 0;
                    while (j2 > 0) {
                        try {
                            try {
                                long min = Math.min(j2, SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES);
                                try {
                                    ByteBuffer readStreamBuffer = readStreamBuffer(fsDataInputStream, j3, min);
                                    if (min != readStreamBuffer.array().length) {
                                        LOG.warn("the actual read length does not equal to the expected read length: {} vs. {}, file: {}", Integer.valueOf(readStreamBuffer.array().length), Long.valueOf(min), str);
                                    }
                                    bufferedOutputStream.write(readStreamBuffer.array());
                                    j3 += readStreamBuffer.array().length;
                                    j2 -= readStreamBuffer.array().length;
                                } catch (Exception e2) {
                                    String format = String.format("failed to read. current read offset: %d, read length: %d, file size: %d, file: %s. msg: %s", Long.valueOf(j3), Long.valueOf(min), Long.valueOf(j), str, e2.getMessage());
                                    LOG.warn(format);
                                    status = new Status(Status.ErrCode.COMMON_ERROR, format);
                                }
                            } finally {
                            }
                        } catch (Throwable th2) {
                            if (bufferedOutputStream != null) {
                                if (th != null) {
                                    try {
                                        bufferedOutputStream.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    bufferedOutputStream.close();
                                }
                            }
                            throw th2;
                        }
                    }
                    if (bufferedOutputStream != null) {
                        if (0 != 0) {
                            try {
                                bufferedOutputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            bufferedOutputStream.close();
                        }
                    }
                    Status closeReader = this.operations.closeReader(OpParams.of(fsDataInputStream));
                    if (!closeReader.ok()) {
                        LOG.warn(closeReader.getErrMsg());
                        if (status.ok()) {
                            status = closeReader;
                        }
                    }
                    LOG.info("finished to download from {} to {} with size: {}. cost {} ms", str, str2, Long.valueOf(j), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    return status;
                } catch (IOException e3) {
                    Status status2 = new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e3.getMessage());
                    Status closeReader2 = this.operations.closeReader(OpParams.of(fsDataInputStream));
                    if (!closeReader2.ok()) {
                        LOG.warn(closeReader2.getErrMsg());
                        if (status.ok()) {
                        }
                    }
                    return status2;
                }
            } catch (Throwable th5) {
                Status closeReader3 = this.operations.closeReader(OpParams.of(fsDataInputStream));
                if (!closeReader3.ok()) {
                    LOG.warn(closeReader3.getErrMsg());
                    if (status.ok()) {
                    }
                }
                throw th5;
            }
        } catch (IOException e4) {
            return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + str2 + ", msg: " + e4.getMessage());
        }
    }

    private static ByteBuffer readStreamBuffer(FSDataInputStream fSDataInputStream, long j, long j2) throws IOException {
        ByteBuffer wrap;
        synchronized (fSDataInputStream) {
            try {
                long pos = fSDataInputStream.getPos();
                if (pos != j) {
                    LOG.debug("invalid offset, current read offset is " + pos + " is not equal to request offset " + j + " seek to it");
                    try {
                        fSDataInputStream.seek(j);
                    } catch (IOException e) {
                        throw new IOException(String.format("current read offset %d is not equal to %d, and could not seek to it, msg: %s", Long.valueOf(pos), Long.valueOf(j), e.getMessage()));
                    }
                }
                byte[] bArr = j2 > 131072 ? new byte[131072] : new byte[(int) j2];
                try {
                    int readBytesFully = readBytesFully(fSDataInputStream, bArr);
                    if (readBytesFully < 0) {
                        throw new IOException("end of file reached");
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("read buffer from input stream, buffer size:" + bArr.length + ", read length:" + readBytesFully);
                    }
                    wrap = ByteBuffer.wrap(bArr, 0, readBytesFully);
                } catch (IOException e2) {
                    LOG.error("errors while read data from stream", e2);
                    throw new IOException("errors while read data from stream " + e2.getMessage());
                }
            } catch (IOException e3) {
                LOG.error("errors while get file pos from output stream", e3);
                throw new IOException("errors while get file pos from output stream", e3);
            }
        }
        return wrap;
    }

    private static int readBytesFully(FSDataInputStream fSDataInputStream, byte[] bArr) throws IOException {
        int i;
        int read;
        int i2 = 0;
        while (true) {
            i = i2;
            if (i >= bArr.length || (read = fSDataInputStream.read(bArr, i, bArr.length - i)) <= 0) {
                break;
            }
            i2 = i + read;
        }
        return i;
    }

    @Override // org.apache.doris.fs.FileSystem
    public Status exists(String str) {
        try {
            return !nativeFileSystem(str).exists(new Path(URI.create(str).getPath())) ? new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + str) : Status.OK;
        } catch (Exception e) {
            LOG.error("errors while check path exist " + str, e);
            return new Status(Status.ErrCode.COMMON_ERROR, "failed to check remote path exist: " + str + ". msg: " + e.getMessage());
        }
    }

    @Override // org.apache.doris.fs.FileSystem
    public Status directUpload(String str, String str2) {
        HDFSOpParams of = OpParams.of(str2);
        Status openWriter = this.operations.openWriter(of);
        if (openWriter != Status.OK) {
            return openWriter;
        }
        FSDataOutputStream fsDataOutputStream = of.fsDataOutputStream();
        LOG.info("finished to open writer. directly upload to remote path {}.", str2);
        Status status = Status.OK;
        try {
            try {
                fsDataOutputStream.writeBytes(str);
                Status closeWriter = this.operations.closeWriter(OpParams.of(fsDataOutputStream));
                if (!closeWriter.ok()) {
                    LOG.warn(closeWriter.getErrMsg());
                    if (status.ok()) {
                        status = closeWriter;
                    }
                }
            } catch (IOException e) {
                LOG.error("errors while write data to output stream", e);
                status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: " + e.getMessage());
                Status closeWriter2 = this.operations.closeWriter(OpParams.of(fsDataOutputStream));
                if (!closeWriter2.ok()) {
                    LOG.warn(closeWriter2.getErrMsg());
                    if (status.ok()) {
                        status = closeWriter2;
                    }
                }
            }
            return status;
        } catch (Throwable th) {
            Status closeWriter3 = this.operations.closeWriter(OpParams.of(fsDataOutputStream));
            if (!closeWriter3.ok()) {
                LOG.warn(closeWriter3.getErrMsg());
                if (status.ok()) {
                }
            }
            throw th;
        }
    }

    @Override // org.apache.doris.fs.FileSystem
    public Status upload(String str, String str2) {
        long currentTimeMillis = System.currentTimeMillis();
        LOG.debug("local path {}, remote path {}", str, str2);
        HDFSOpParams of = OpParams.of(str2);
        Status openWriter = this.operations.openWriter(of);
        if (openWriter != Status.OK) {
            return openWriter;
        }
        FSDataOutputStream fsDataOutputStream = of.fsDataOutputStream();
        LOG.info("finished to open writer. directly upload to remote path {}.", str2);
        File file = new File(str);
        long length = file.length();
        byte[] bArr = new byte[MysqlServerStatusFlag.SERVER_STATUS_METADATA_CHANGED];
        Status status = new Status(Status.ErrCode.OK, "");
        try {
            try {
                try {
                    BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file));
                    Throwable th = null;
                    long j = 0;
                    while (true) {
                        try {
                            try {
                                int read = bufferedInputStream.read(bArr);
                                if (read == -1) {
                                    break;
                                }
                                try {
                                    fsDataOutputStream.write(bArr, 0, read);
                                    j += read;
                                } catch (IOException e) {
                                    LOG.error("errors while write data to output stream", e);
                                    status = new Status(Status.ErrCode.COMMON_ERROR, String.format("failed to write hdfs. current write offset: %d, write length: %d, file length: %d, file: %s, msg: errors while write data to output stream", Long.valueOf(j), Integer.valueOf(read), Long.valueOf(length), str2));
                                }
                            } finally {
                            }
                        } catch (Throwable th2) {
                            if (bufferedInputStream != null) {
                                if (th != null) {
                                    try {
                                        bufferedInputStream.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    bufferedInputStream.close();
                                }
                            }
                            throw th2;
                        }
                    }
                    if (bufferedInputStream != null) {
                        if (0 != 0) {
                            try {
                                bufferedInputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            bufferedInputStream.close();
                        }
                    }
                    Status closeWriter = this.operations.closeWriter(OpParams.of(fsDataOutputStream));
                    if (!closeWriter.ok()) {
                        LOG.warn(closeWriter.getErrMsg());
                        if (status.ok()) {
                            status = closeWriter;
                        }
                    }
                    if (status.ok()) {
                        LOG.info("finished to upload {} to remote path {}. cost: {} ms", str, str2, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    }
                    return status;
                } catch (FileNotFoundException e2) {
                    Status status2 = new Status(Status.ErrCode.COMMON_ERROR, "encounter file not found exception: " + e2.getMessage());
                    Status closeWriter2 = this.operations.closeWriter(OpParams.of(fsDataOutputStream));
                    if (!closeWriter2.ok()) {
                        LOG.warn(closeWriter2.getErrMsg());
                        if (status.ok()) {
                        }
                    }
                    return status2;
                }
            } catch (IOException e3) {
                Status status3 = new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e3.getMessage());
                Status closeWriter3 = this.operations.closeWriter(OpParams.of(fsDataOutputStream));
                if (!closeWriter3.ok()) {
                    LOG.warn(closeWriter3.getErrMsg());
                    if (status.ok()) {
                    }
                }
                return status3;
            }
        } catch (Throwable th5) {
            Status closeWriter4 = this.operations.closeWriter(OpParams.of(fsDataOutputStream));
            if (!closeWriter4.ok()) {
                LOG.warn(closeWriter4.getErrMsg());
                if (status.ok()) {
                }
            }
            throw th5;
        }
    }

    @Override // org.apache.doris.fs.FileSystem
    public Status rename(String str, String str2) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            URI create = URI.create(str);
            URI create2 = URI.create(str2);
            if (!create.getAuthority().trim().equals(create2.getAuthority().trim())) {
                return new Status(Status.ErrCode.COMMON_ERROR, "only allow rename in same file system");
            }
            if (!nativeFileSystem(str2).rename(new Path(create.getPath()), new Path(create2.getPath()))) {
                return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename " + str + " to " + str2);
            }
            LOG.info("finished to rename {} to  {}. cost: {} ms", str, str2, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            return Status.OK;
        } catch (IOException e) {
            LOG.error("errors while rename path from " + str + " to " + str2);
            return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename remote " + str + " to " + str2 + ", msg: " + e.getMessage());
        } catch (UserException e2) {
            return new Status(Status.ErrCode.COMMON_ERROR, e2.getMessage());
        }
    }

    @Override // org.apache.doris.fs.FileSystem
    public Status delete(String str) {
        try {
            nativeFileSystem(str).delete(new Path(URI.create(str).getPath()), true);
            LOG.info("finished to delete remote path {}.", str);
            return Status.OK;
        } catch (IOException e) {
            LOG.error("errors while delete path " + str);
            return new Status(Status.ErrCode.COMMON_ERROR, "failed to delete remote path: " + str + ", msg: " + e.getMessage());
        } catch (UserException e2) {
            return new Status(Status.ErrCode.COMMON_ERROR, e2.getMessage());
        }
    }

    @Override // org.apache.doris.fs.FileSystem
    public Status list(String str, List<RemoteFile> list, boolean z) {
        try {
            FileStatus[] globStatus = nativeFileSystem(str).globStatus(new Path(URI.create(str).getPath()));
            if (globStatus == null) {
                LOG.info("no files in path " + str);
                return Status.OK;
            }
            for (FileStatus fileStatus : globStatus) {
                list.add(new RemoteFile(z ? fileStatus.getPath().getName() : fileStatus.getPath().toString(), !fileStatus.isDirectory(), fileStatus.isDirectory() ? -1L : fileStatus.getLen(), fileStatus.getBlockSize(), fileStatus.getModificationTime()));
            }
            LOG.info("finish list path {}", str);
            return Status.OK;
        } catch (FileNotFoundException e) {
            LOG.info("file not found: " + e.getMessage());
            return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage());
        } catch (Exception e2) {
            LOG.error("errors while get file status ", e2);
            return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e2.getMessage());
        }
    }

    @Override // org.apache.doris.fs.FileSystem
    public Status makeDir(String str) {
        return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not implemented.");
    }
}
