package org.apache.doris.load.loadv2;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.doris.DorisFE;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.SetUserPropertyVar;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/loadv2/SparkRepository.class */
public class SparkRepository {
    public static final String REPOSITORY_DIR = "__spark_repository__";
    public static final String PREFIX_ARCHIVE = "__archive_";
    public static final String PREFIX_LIB = "__lib_";
    public static final String SPARK_2X = "spark-2x";
    private static final String PATH_DELIMITER = "/";
    private static final String FILE_NAME_SEPARATOR = "_";
    private static final String DPP_RESOURCE_DIR = "/spark-dpp/";
    private static final String SPARK_RESOURCE = "/jars/spark-2x.zip";
    private String remoteRepositoryPath;
    private BrokerDesc brokerDesc;
    private String localSpark2xPath;
    private static final Logger LOG = LogManager.getLogger(SparkRepository.class);
    public static final String SPARK_DPP_JAR = "spark-dpp-" + Config.spark_dpp_version + "-jar-with-dependencies.jar";
    public static final String SPARK_DPP = "spark-dpp-" + Config.spark_dpp_version + "-jar-with-dependencies";
    private String currentDppVersion = Config.spark_dpp_version;
    private SparkArchive currentArchive = new SparkArchive(getRemoteArchivePath(this.currentDppVersion), this.currentDppVersion);
    private String localDppPath = DorisFE.DORIS_HOME_DIR + DPP_RESOURCE_DIR + SPARK_DPP_JAR;

    /* loaded from: input_file:org/apache/doris/load/loadv2/SparkRepository$SparkArchive.class */
    public static class SparkArchive {
        public String remotePath;
        public String version;
        public List<SparkLibrary> libraries = Lists.newArrayList();

        public SparkArchive(String str, String str2) {
            this.remotePath = str;
            this.version = str2;
        }

        public SparkLibrary getDppLibrary() {
            return this.libraries.stream().filter(sparkLibrary -> {
                return sparkLibrary.libType == SparkLibrary.LibType.DPP;
            }).findFirst().orElse(null);
        }

        public SparkLibrary getSpark2xLibrary() {
            return this.libraries.stream().filter(sparkLibrary -> {
                return sparkLibrary.libType == SparkLibrary.LibType.SPARK2X;
            }).findFirst().orElse(null);
        }
    }

    /* loaded from: input_file:org/apache/doris/load/loadv2/SparkRepository$SparkLibrary.class */
    public static class SparkLibrary {
        public String remotePath;
        public String md5sum;
        public long size;
        public LibType libType;

        /* loaded from: input_file:org/apache/doris/load/loadv2/SparkRepository$SparkLibrary$LibType.class */
        public enum LibType {
            DPP,
            SPARK2X
        }

        public SparkLibrary(String str, String str2, LibType libType, long j) {
            this.remotePath = str;
            this.md5sum = str2;
            this.libType = libType;
            this.size = j;
        }
    }

    public SparkRepository(String str, BrokerDesc brokerDesc) {
        this.remoteRepositoryPath = str;
        this.brokerDesc = brokerDesc;
        if (Strings.isNullOrEmpty(Config.spark_resource_path)) {
            this.localSpark2xPath = Config.spark_home_default_dir + SPARK_RESOURCE;
        } else {
            this.localSpark2xPath = Config.spark_resource_path;
        }
    }

    public void prepare() throws LoadException {
        initRepository();
    }

    private void initRepository() throws LoadException {
        LOG.info("start to init remote repository. local dpp: {}", this.localDppPath);
        boolean z = false;
        boolean z2 = false;
        if (!Strings.isNullOrEmpty(this.remoteRepositoryPath) && this.brokerDesc != null) {
            if (checkCurrentArchiveExists()) {
                String remoteArchivePath = getRemoteArchivePath(this.currentDppVersion);
                ArrayList newArrayList = Lists.newArrayList();
                getLibraries(remoteArchivePath, newArrayList);
                if (newArrayList.size() != 2) {
                    z = true;
                    z2 = true;
                } else {
                    this.currentArchive.libraries.addAll(newArrayList);
                    Iterator<SparkLibrary> it = this.currentArchive.libraries.iterator();
                    while (true) {
                        if (it.hasNext()) {
                            SparkLibrary next = it.next();
                            String str = null;
                            switch (next.libType) {
                                case DPP:
                                    str = getMd5String(this.localDppPath);
                                    break;
                                case SPARK2X:
                                    str = getMd5String(this.localSpark2xPath);
                                    break;
                                default:
                                    Preconditions.checkState(false, "wrong library type: " + next.libType);
                                    break;
                            }
                            if (!str.equals(next.md5sum)) {
                                z = true;
                                z2 = true;
                            }
                        }
                    }
                }
            } else {
                z = true;
            }
        }
        if (z) {
            uploadArchive(z2);
        }
        LOG.info("init spark repository success, current dppVersion={}, archive path={}, libraries size={}", this.currentDppVersion, this.currentArchive.remotePath, Integer.valueOf(this.currentArchive.libraries.size()));
    }

    private boolean checkCurrentArchiveExists() {
        boolean z = false;
        Preconditions.checkNotNull(this.remoteRepositoryPath);
        String remoteArchivePath = getRemoteArchivePath(this.currentDppVersion);
        try {
            z = BrokerUtil.checkPathExist(remoteArchivePath, this.brokerDesc);
            LOG.info("check archive exists in repository, {}", Boolean.valueOf(z));
        } catch (UserException e) {
            LOG.warn("Failed to check remote archive exist, path={}, version={}", remoteArchivePath, this.currentDppVersion);
        }
        return z;
    }

    private void uploadArchive(boolean z) throws LoadException {
        try {
            String remoteArchivePath = getRemoteArchivePath(this.currentDppVersion);
            if (z) {
                BrokerUtil.deletePath(remoteArchivePath, this.brokerDesc);
                this.currentArchive.libraries.clear();
            }
            String str = this.localDppPath;
            String fileName = getFileName("/", str);
            String str2 = remoteArchivePath + "/" + assemblyFileName(PREFIX_LIB, "", fileName, "");
            upload(str, str2);
            String md5String = getMd5String(str);
            long fileSize = getFileSize(str);
            String str3 = remoteArchivePath + "/" + assemblyFileName(PREFIX_LIB, md5String, fileName, "");
            rename(str2, str3);
            this.currentArchive.libraries.add(new SparkLibrary(str3, md5String, SparkLibrary.LibType.DPP, fileSize));
            String str4 = this.localSpark2xPath;
            String str5 = remoteArchivePath + "/" + assemblyFileName(PREFIX_LIB, "", SPARK_2X, ".zip");
            upload(str4, str5);
            String md5String2 = getMd5String(str4);
            long fileSize2 = getFileSize(str4);
            String str6 = remoteArchivePath + "/" + assemblyFileName(PREFIX_LIB, md5String2, SPARK_2X, ".zip");
            rename(str5, str6);
            this.currentArchive.libraries.add(new SparkLibrary(str6, md5String2, SparkLibrary.LibType.SPARK2X, fileSize2));
            LOG.info("finished to upload archive to repository, currentDppVersion={}, path={}", this.currentDppVersion, remoteArchivePath);
        } catch (UserException e) {
            throw new LoadException(e.getMessage());
        }
    }

    private void getLibraries(String str, List<SparkLibrary> list) throws LoadException {
        SparkLibrary.LibType libType;
        ArrayList<TBrokerFileStatus> newArrayList = Lists.newArrayList();
        try {
            BrokerUtil.parseFile(str + "/*", this.brokerDesc, newArrayList);
            for (TBrokerFileStatus tBrokerFileStatus : newArrayList) {
                String fileName = getFileName("/", tBrokerFileStatus.path);
                if (fileName.startsWith(PREFIX_LIB)) {
                    String[] split = unwrap(PREFIX_LIB, fileName).split("_");
                    if (split.length != 2) {
                        continue;
                    } else {
                        String str2 = split[0];
                        if (Strings.isNullOrEmpty(str2)) {
                            continue;
                        } else {
                            String str3 = split[1];
                            if (str3.equals(SPARK_DPP)) {
                                libType = SparkLibrary.LibType.DPP;
                            } else {
                                if (!str3.equals(SPARK_2X)) {
                                    throw new LoadException("Invalid library type: " + str3);
                                }
                                libType = SparkLibrary.LibType.SPARK2X;
                            }
                            SparkLibrary sparkLibrary = new SparkLibrary(tBrokerFileStatus.path, str2, libType, tBrokerFileStatus.size);
                            list.add(sparkLibrary);
                            LOG.info("get Libraries from remote archive, archive path={}, library={}, md5sum={}, size={}", str, sparkLibrary.remotePath, sparkLibrary.md5sum, Long.valueOf(sparkLibrary.size));
                        }
                    }
                }
            }
        } catch (UserException e) {
            throw new LoadException(e.getMessage());
        }
    }

    public String getMd5String(String str) throws LoadException {
        try {
            String md5Hex = DigestUtils.md5Hex(new FileInputStream(new File(str)));
            Preconditions.checkNotNull(md5Hex);
            LOG.debug("get md5sum from file {}, md5sum={}", str, md5Hex);
            return md5Hex;
        } catch (FileNotFoundException e) {
            throw new LoadException("file " + str + " does not exist");
        } catch (IOException e2) {
            throw new LoadException("failed to get md5sum from file " + str);
        }
    }

    public long getFileSize(String str) throws LoadException {
        long length = new File(str).length();
        if (length <= 0) {
            throw new LoadException("failed to get size from file " + str);
        }
        return length;
    }

    private void upload(String str, String str2) throws LoadException {
        try {
            BrokerUtil.writeFile(str, str2, this.brokerDesc);
            LOG.info("finished to upload file, localPath={}, remotePath={}", str, str2);
        } catch (UserException e) {
            throw new LoadException("failed to upload lib to repository, srcPath=" + str + " destPath=" + str2 + " message=" + e.getMessage());
        }
    }

    private void rename(String str, String str2) throws LoadException {
        try {
            BrokerUtil.rename(str, str2, this.brokerDesc);
            LOG.info("finished to rename file, originPath={}, destPath={}", str, str2);
        } catch (UserException e) {
            throw new LoadException("failed to rename file from " + str + " to " + str2 + ", message=" + e.getMessage());
        }
    }

    public SparkArchive getCurrentArchive() {
        return this.currentArchive;
    }

    private static String getFileName(String str, String str2) {
        return str2.substring(str2.lastIndexOf(str) + 1);
    }

    private static String unwrap(String str, String str2) {
        int lastIndexOf = str2.lastIndexOf(SetUserPropertyVar.DOT_SEPARATOR);
        return str2.substring(str.length(), lastIndexOf > 0 ? lastIndexOf : str2.length());
    }

    private static String joinPrefix(String str, String str2) {
        return str + str2;
    }

    public static String assemblyFileName(String str, String str2, String str3, String str4) {
        return str + str2 + "_" + str3 + str4;
    }

    private String getRemoteArchivePath(String str) {
        return Joiner.on("/").join(this.remoteRepositoryPath, joinPrefix(PREFIX_ARCHIVE, str), new Object[0]);
    }
}
