package org.apache.doris.load;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.DorisFE;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.util.CommandResult;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.common.util.Util;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/DppScheduler.class */
public class DppScheduler {
    private static final String DPP_OUTPUT_DIR = "export";
    private static final String JOB_CONFIG_FILE = "jobconfig.json";
    private static final int DEFAULT_REDUCE_NUM = 1000;
    private static final long GB = 1073741824;
    private static final String ETL_OUTPUT_PATH = "%s%s/%d/%s/%s";
    private static final String ETL_JOB_NAME = "palo2__%s__%s";
    private static final String HADOOP_BISTREAMING_CMD = "%s bistreaming %s -D mapred.job.name=\"%s\" -input %s -output %s -mapper \"sh mapred/mapper.sh\" -reducer \"sh mapred/reducer.sh '\\\"%s\\\"'\" -partitioner com.baidu.sos.mapred.lib.MapIntPartitioner -cacheArchive %s/dpp/x86_64-scm-linux-gnu.tar.gz#tc -cacheArchive %s/dpp/pypy.tar.gz#pypy -cacheArchive %s/dpp/palo_dpp_mr.tar.gz#mapred -numReduceTasks %d -file \"%s\" ";
    private static final String HADOOP_STATUS_CMD = "%s job %s -status %s";
    private static final String HADOOP_KILL_CMD = "%s job %s -kill %s";
    private static final String HADOOP_LS_CMD = "%s fs %s -ls %s";
    private static final String HADOOP_COUNT_CMD = "%s fs %s -count %s";
    private static final String HADOOP_TEST_CMD = "%s fs %s -test %s %s";
    private static final String HADOOP_MKDIR_CMD = "%s fs %s -mkdir %s";
    private static final String HADOOP_RMR_CMD = "%s fs %s -rmr %s";
    private static final String HADOOP_PUT_CMD = "%s fs %s -put %s %s";
    private static final long HADOOP_SPEED_LIMIT_KB = 10240;
    private String hadoopConfig;
    private String applicationsPath;
    private static final Logger LOG = LogManager.getLogger(DppScheduler.class);
    private static final String HADOOP_CLIENT = DorisFE.DORIS_HOME_DIR + Config.dpp_hadoop_client_path;
    private static final String JOB_CONFIG_DIR = DorisFE.DORIS_HOME_DIR + "/temp/job_conf";
    private static final String LOCAL_DPP_DIR = DorisFE.DORIS_HOME_DIR + "/lib/dpp/" + FeConstants.dpp_version;
    private static final ConcurrentMap<String, Object> DPP_LOCK_MAP = Maps.newConcurrentMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/doris/load/DppScheduler$InputSizeInvalidException.class */
    public class InputSizeInvalidException extends LoadException {
        public InputSizeInvalidException(String str) {
            super(str);
        }
    }

    public DppScheduler(DppConfig dppConfig) {
        this.hadoopConfig = getHadoopConfigsStr(dppConfig.getHadoopConfigs());
        this.applicationsPath = dppConfig.getFsDefaultName() + dppConfig.getApplicationsPath();
    }

    private String getHadoopConfigsStr(Map<String, String> map) {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            newArrayList.add(String.format("%s=%s", entry.getKey(), entry.getValue()));
        }
        return String.format("-D %s", StringUtils.join(newArrayList, " -D "));
    }

    public EtlSubmitResult submitEtlJob(long j, String str, String str2, String str3, Map<String, Object> map, int i) {
        String str4 = null;
        TStatus tStatus = new TStatus();
        tStatus.setStatusCode(TStatusCode.OK);
        ArrayList newArrayList = Lists.newArrayList();
        tStatus.setErrorMsgs(newArrayList);
        if (i > 0) {
            LOG.warn("submit etl retry[{}] > 0. check dpp application", Integer.valueOf(i));
            DPP_LOCK_MAP.putIfAbsent(str2, new Object());
            Preconditions.checkState(DPP_LOCK_MAP.containsKey(str2));
            synchronized (DPP_LOCK_MAP.get(str2)) {
                try {
                    prepareDppApplications();
                } catch (LoadException e) {
                    tStatus.setStatusCode(TStatusCode.CANCELLED);
                    newArrayList.add(e.getMessage());
                    return new EtlSubmitResult(tStatus, null);
                }
            }
        }
        String str5 = JOB_CONFIG_DIR + S3URI.PATH_DELIM + j;
        File file = new File(str5);
        if (!Util.deleteDirectory(file)) {
            String str6 = "delete config dir error. job: " + j;
            LOG.warn(str6 + ", path: {}", str5);
            tStatus.setStatusCode(TStatusCode.CANCELLED);
            newArrayList.add(str6);
            return new EtlSubmitResult(tStatus, null);
        }
        if (!file.mkdirs()) {
            String str7 = "create config file dir error. job: " + j;
            LOG.warn(str7 + ", path: {}", str5);
            tStatus.setStatusCode(TStatusCode.CANCELLED);
            newArrayList.add(str7);
            return new EtlSubmitResult(tStatus, null);
        }
        File file2 = new File(str5 + S3URI.PATH_DELIM + JOB_CONFIG_FILE);
        BufferedWriter bufferedWriter = null;
        try {
            try {
                bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file2), "UTF-8"));
                bufferedWriter.write(new Gson().toJson(map));
                bufferedWriter.flush();
                if (bufferedWriter != null) {
                    try {
                        bufferedWriter.close();
                    } catch (IOException e2) {
                        LOG.warn("close buffered writer error", e2);
                        tStatus.setStatusCode(TStatusCode.CANCELLED);
                        newArrayList.add(e2.getMessage());
                        return new EtlSubmitResult(tStatus, null);
                    }
                }
                Set<String> inputPaths = getInputPaths(map);
                String join = StringUtils.join(inputPaths, " -input ");
                try {
                    int calcReduceNumByInputSize = calcReduceNumByInputSize(inputPaths);
                    int calcReduceNumByTablet = calcReduceNumByTablet(map);
                    int min = Math.min(calcReduceNumByInputSize, calcReduceNumByTablet);
                    LOG.info("calculate reduce num. reduceNum: {}, reduceNumByInputSize: {}, reduceNumByTablet: {}", Integer.valueOf(min), Integer.valueOf(calcReduceNumByInputSize), Integer.valueOf(calcReduceNumByTablet));
                    String str8 = (String) map.get("output_path");
                    deleteEtlOutputPath(str8);
                    String format = String.format(HADOOP_BISTREAMING_CMD, HADOOP_CLIENT, this.hadoopConfig, String.format(ETL_JOB_NAME, str3, str), join, str8, this.hadoopConfig, this.applicationsPath, this.applicationsPath, this.applicationsPath, Integer.valueOf(min), file2.getAbsolutePath());
                    LOG.info(format);
                    String[] strArr = (String[]) Util.shellSplit(format).toArray(new String[0]);
                    BufferedReader bufferedReader = null;
                    long currentTimeMillis = System.currentTimeMillis();
                    try {
                        try {
                            Process exec = Runtime.getRuntime().exec(strArr);
                            BufferedReader bufferedReader2 = new BufferedReader(new InputStreamReader(exec.getErrorStream()));
                            int i2 = 0;
                            while (true) {
                                if (i2 >= 1000) {
                                    break;
                                }
                                String readLine = bufferedReader2.readLine();
                                LOG.info(readLine);
                                if (Strings.isNullOrEmpty(readLine)) {
                                    LOG.warn("submit etl job fail. job id: {}, label: {}", Long.valueOf(j), str);
                                    break;
                                }
                                if (readLine.toLowerCase().contains("error") || readLine.toLowerCase().contains("exception")) {
                                    newArrayList.add(readLine);
                                }
                                if (readLine.indexOf("Running job") != -1) {
                                    String[] split = readLine.split(ClusterNamespace.CLUSTER_DELIMITER);
                                    str4 = split[split.length - 1].trim();
                                    exec.destroy();
                                    break;
                                }
                                i2++;
                            }
                            Util.deleteDirectory(file);
                            LOG.info("finished submit hadoop job: {}. cost: {} ms", Long.valueOf(j), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                            if (bufferedReader2 != null) {
                                try {
                                    bufferedReader2.close();
                                } catch (IOException e3) {
                                    LOG.warn("close buffered reader error", e3);
                                    tStatus.setStatusCode(TStatusCode.CANCELLED);
                                    newArrayList.add(e3.getMessage());
                                    return new EtlSubmitResult(tStatus, null);
                                }
                            }
                            if (str4 == null) {
                                tStatus.setStatusCode(TStatusCode.CANCELLED);
                            }
                            return new EtlSubmitResult(tStatus, str4);
                        } catch (IOException e4) {
                            LOG.warn("submit etl job error", e4);
                            tStatus.setStatusCode(TStatusCode.CANCELLED);
                            newArrayList.add(e4.getMessage());
                            EtlSubmitResult etlSubmitResult = new EtlSubmitResult(tStatus, null);
                            Util.deleteDirectory(file);
                            LOG.info("finished submit hadoop job: {}. cost: {} ms", Long.valueOf(j), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                } catch (IOException e5) {
                                    LOG.warn("close buffered reader error", e5);
                                    tStatus.setStatusCode(TStatusCode.CANCELLED);
                                    newArrayList.add(e5.getMessage());
                                    return new EtlSubmitResult(tStatus, null);
                                }
                            }
                            return etlSubmitResult;
                        }
                    } catch (Throwable th) {
                        Util.deleteDirectory(file);
                        LOG.info("finished submit hadoop job: {}. cost: {} ms", Long.valueOf(j), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                            } catch (IOException e6) {
                                LOG.warn("close buffered reader error", e6);
                                tStatus.setStatusCode(TStatusCode.CANCELLED);
                                newArrayList.add(e6.getMessage());
                                return new EtlSubmitResult(tStatus, null);
                            }
                        }
                        throw th;
                    }
                } catch (InputSizeInvalidException e7) {
                    tStatus.setStatusCode(TStatusCode.CANCELLED);
                    newArrayList.add(e7.getMessage());
                    return new EtlSubmitResult(tStatus, null);
                }
            } catch (Throwable th2) {
                if (bufferedWriter != null) {
                    try {
                        bufferedWriter.close();
                    } catch (IOException e8) {
                        LOG.warn("close buffered writer error", e8);
                        tStatus.setStatusCode(TStatusCode.CANCELLED);
                        newArrayList.add(e8.getMessage());
                        return new EtlSubmitResult(tStatus, null);
                    }
                }
                throw th2;
            }
        } catch (IOException e9) {
            Util.deleteDirectory(file);
            String str9 = "create config file error. job: " + j;
            LOG.warn(str9 + ", file: {}", str5 + S3URI.PATH_DELIM + JOB_CONFIG_FILE);
            tStatus.setStatusCode(TStatusCode.CANCELLED);
            newArrayList.add(str9);
            EtlSubmitResult etlSubmitResult2 = new EtlSubmitResult(tStatus, null);
            if (bufferedWriter != null) {
                try {
                    bufferedWriter.close();
                } catch (IOException e10) {
                    LOG.warn("close buffered writer error", e10);
                    tStatus.setStatusCode(TStatusCode.CANCELLED);
                    newArrayList.add(e10.getMessage());
                    return new EtlSubmitResult(tStatus, null);
                }
            }
            return etlSubmitResult2;
        }
    }

    private void prepareDppApplications() throws LoadException {
        String[] strArr = {"LC_ALL=" + Config.locale};
        String str = this.applicationsPath + "/dpp";
        boolean z = false;
        File file = new File(LOCAL_DPP_DIR);
        if (!file.exists() || !file.isDirectory()) {
            LOG.warn("dpp dir does not exist");
            throw new LoadException("dpp dir does not exist");
        }
        File[] listFiles = file.listFiles();
        String format = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, this.hadoopConfig, "-d", str);
        LOG.info(format);
        if (Util.executeCommand(format, strArr).getReturnCode() == 0) {
            String format2 = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, this.hadoopConfig, str + "/*");
            LOG.info(format2);
            CommandResult executeCommand = Util.executeCommand(format2, strArr);
            if (executeCommand.getReturnCode() != 0) {
                LOG.warn("hadoop count error, result: {}", executeCommand);
                throw new LoadException("hadoop count error. msg: " + executeCommand.getStderr());
            }
            HashMap newHashMap = Maps.newHashMap();
            for (String str2 : executeCommand.getStdout().split("\n")) {
                String[] split = str2.trim().split(" +");
                if (split.length == 4) {
                    String str3 = split[3];
                    newHashMap.put(str3.substring(str3.lastIndexOf(S3URI.PATH_DELIM) + 1), Long.valueOf(Long.parseLong(split[2])));
                }
            }
            int length = listFiles.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                File file2 = listFiles[i];
                if (file2.isFile()) {
                    String name = file2.getName();
                    if (!newHashMap.containsKey(name)) {
                        LOG.info("hadoop dpp file does not exist. file: {}", name);
                        z = true;
                        break;
                    }
                    long length2 = file2.length();
                    long longValue = ((Long) newHashMap.get(name)).longValue();
                    if (length2 != longValue) {
                        LOG.info("dpp files size are different. file: {}, local: {}, hadoop: {}", name, Long.valueOf(length2), Long.valueOf(longValue));
                        z = true;
                        break;
                    }
                }
                i++;
            }
        } else {
            LOG.info("hadoop dir does not exist. dir: {}", str);
            z = true;
        }
        if (z) {
            String format3 = String.format(HADOOP_RMR_CMD, HADOOP_CLIENT, this.hadoopConfig, str);
            LOG.info(format3);
            Util.executeCommand(format3, strArr);
            String format4 = String.format(HADOOP_MKDIR_CMD, HADOOP_CLIENT, this.hadoopConfig, str);
            LOG.info(format4);
            Util.executeCommand(format4, strArr);
            String str4 = this.hadoopConfig + String.format(" -D speed.limit.kb=%d", Long.valueOf(HADOOP_SPEED_LIMIT_KB));
            for (File file3 : listFiles) {
                String format5 = String.format(HADOOP_PUT_CMD, HADOOP_CLIENT, str4, LOCAL_DPP_DIR + S3URI.PATH_DELIM + file3.getName(), str);
                LOG.info(format5);
                CommandResult executeCommand2 = Util.executeCommand(format5, strArr);
                if (executeCommand2.getReturnCode() != 0) {
                    LOG.warn("hadoop put fail. result: {}", executeCommand2);
                    throw new LoadException("hadoop put fail. msg: " + executeCommand2.getStderr());
                }
            }
        }
    }

    private Set<String> getInputPaths(Map<String, Object> map) {
        HashSet hashSet = new HashSet();
        Iterator it = ((Map) map.get("tables")).values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Map) ((Map) it.next()).get("source_file_schema")).values().iterator();
            while (it2.hasNext()) {
                hashSet.addAll((List) ((Map) it2.next()).get("file_urls"));
            }
        }
        return hashSet;
    }

    private int calcReduceNumByInputSize(Set<String> set) throws InputSizeInvalidException {
        String[] strArr = {"LC_ALL=" + Config.locale};
        String format = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, this.hadoopConfig, StringUtils.join(set, " "));
        LOG.info(format);
        CommandResult executeCommand = Util.executeCommand(format, strArr);
        if (executeCommand.getReturnCode() != 0) {
            LOG.warn("hadoop count error, result: {}", executeCommand);
            return 1000;
        }
        long j = 0;
        for (String str : executeCommand.getStdout().split("\n")) {
            String[] split = str.trim().split(" +");
            if (split.length == 4) {
                j += Long.parseLong(split[2]);
            }
        }
        if (0 == 0 || j <= 0 * 1073741824) {
            return j != 0 ? ((int) (j / Config.dpp_bytes_per_reduce)) + 1 : 0;
        }
        String str2 = "Input file size[" + (((float) j) / 1.0737418E9f) + "GB] exceeds system limit[0GB]";
        LOG.warn(str2);
        throw new InputSizeInvalidException(str2);
    }

    private int calcReduceNumByTablet(Map<String, Object> map) {
        int i = 0;
        Iterator it = ((Map) map.get("tables")).values().iterator();
        while (it.hasNext()) {
            for (Map map2 : ((Map) ((Map) it.next()).get("views")).values()) {
                if (map2.containsKey("hash_mod")) {
                    i += ((Integer) map2.get("hash_mod")).intValue();
                } else if (map2.containsKey("key_ranges")) {
                    i += ((List) map2.get("key_ranges")).size();
                }
            }
        }
        return i;
    }

    public EtlStatus getEtlJobStatus(String str) {
        EtlStatus etlStatus = new EtlStatus();
        etlStatus.setState(TEtlState.RUNNING);
        String format = String.format(HADOOP_STATUS_CMD, HADOOP_CLIENT, this.hadoopConfig, str);
        LOG.info(format);
        CommandResult executeCommand = Util.executeCommand(format, new String[]{"LC_ALL=" + Config.locale});
        String stdout = executeCommand.getStdout();
        if (executeCommand.getReturnCode() != 0) {
            if (stdout != null && stdout.contains("Could not find job")) {
                LOG.warn("cannot find hadoop etl job: {}", str);
                etlStatus.setState(TEtlState.CANCELLED);
            }
            return etlStatus;
        }
        HashMap hashMap = new HashMap();
        Map<String, String> hashMap2 = new HashMap<>();
        for (String str2 : stdout.split("\n")) {
            String[] split = str2.split(ClusterNamespace.CLUSTER_DELIMITER);
            if (split.length == 2) {
                hashMap.put(split[0].trim(), split[1].trim());
            }
            String[] split2 = str2.split("=");
            if (split2.length == 2) {
                hashMap2.put(split2[0].trim(), split2[1].trim());
            }
        }
        etlStatus.setStats(hashMap);
        etlStatus.setCounters(hashMap2);
        Iterator<String> it = hashMap2.keySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            if (next.startsWith("tracking URL")) {
                etlStatus.setTrackingUrl(next.substring(14) + "=" + hashMap2.get(next));
                break;
            }
        }
        if (hashMap.containsKey("job state")) {
            int parseInt = Integer.parseInt(hashMap.get("job state"));
            if (parseInt == 3 || parseInt == 5 || parseInt == 6) {
                etlStatus.setState(TEtlState.CANCELLED);
            } else if (parseInt == 2) {
                etlStatus.setState(TEtlState.FINISHED);
            } else {
                etlStatus.setState(TEtlState.RUNNING);
            }
        }
        return etlStatus;
    }

    public Map<String, Long> getEtlFiles(String str) {
        String[] strArr = {"LC_ALL=" + Config.locale};
        HashMap newHashMap = Maps.newHashMap();
        String str2 = str + S3URI.PATH_DELIM + DPP_OUTPUT_DIR;
        String format = String.format(HADOOP_LS_CMD, HADOOP_CLIENT, this.hadoopConfig, str2);
        LOG.info(format);
        CommandResult executeCommand = Util.executeCommand(format, strArr);
        if (executeCommand.getReturnCode() != 0) {
            String format2 = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, this.hadoopConfig, "-d", str);
            LOG.info(format2);
            if (Util.executeCommand(format2, strArr).getReturnCode() != 0) {
                LOG.info("hadoop dir does not exist. dir: {}", str);
                return null;
            }
            String format3 = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, this.hadoopConfig, "-d", str2);
            LOG.info(format3);
            if (Util.executeCommand(format3, strArr).getReturnCode() == 0) {
                return null;
            }
            LOG.info("hadoop dir does not exist. dir: {}", str2);
            return newHashMap;
        }
        for (String str3 : executeCommand.getStdout().split("\n")) {
            String[] split = str3.split(" +");
            if (split.length == 8) {
                String str4 = split[split.length - 1];
                long j = -1;
                try {
                    j = Long.parseLong(split[4]);
                } catch (NumberFormatException e) {
                    LOG.warn("file size format error. line: {}", str3);
                }
                newHashMap.put(str4, Long.valueOf(j));
            }
        }
        return newHashMap;
    }

    public void killEtlJob(String str) {
        String[] strArr = {"LC_ALL=" + Config.locale};
        String format = String.format(HADOOP_KILL_CMD, HADOOP_CLIENT, this.hadoopConfig, str);
        LOG.info(format);
        Util.executeCommand(format, strArr);
    }

    public void deleteEtlOutputPath(String str) {
        String[] strArr = {"LC_ALL=" + Config.locale};
        String format = String.format(HADOOP_RMR_CMD, HADOOP_CLIENT, this.hadoopConfig, str);
        LOG.info(format);
        Util.executeCommand(format, strArr);
    }

    public static String getEtlOutputPath(String str, String str2, long j, String str3, String str4) {
        return String.format(ETL_OUTPUT_PATH, str, str2, Long.valueOf(j), str3, str4);
    }
}
