package org.apache.doris.load.loadv2;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.CommandResult;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.loadv2.SparkLauncherMonitor;
import org.apache.doris.load.loadv2.SparkLoadAppHandle;
import org.apache.doris.load.loadv2.SparkRepository;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TEtlState;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;

/* loaded from: input_file:org/apache/doris/load/loadv2/SparkEtlJobHandler.class */
public class SparkEtlJobHandler {
    private static final Logger LOG = LogManager.getLogger(SparkEtlJobHandler.class);
    private static final String CONFIG_FILE_NAME = "jobconfig.json";
    private static final String JOB_CONFIG_DIR = "configs";
    private static final String ETL_JOB_NAME = "doris__%s";
    private static final String LAUNCHER_LOG = "spark_launcher_%s_%s.log";
    private static final long GET_APPID_TIMEOUT_MS = 300000;
    private static final long EXEC_CMD_TIMEOUT_MS = 30000;
    private static final String YARN_STATUS_CMD = "%s --config %s application -status %s";
    private static final String YARN_KILL_CMD = "%s --config %s application -kill %s";
    private static final String SPARK_ETL_JOB_CLASS = "org.apache.doris.load.loadv2.etl.SparkEtlJob";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.doris.load.loadv2.SparkEtlJobHandler$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/doris/load/loadv2/SparkEtlJobHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState;

        static {
            try {
                $SwitchMap$org$apache$doris$load$loadv2$SparkLoadAppHandle$State[SparkLoadAppHandle.State.FINISHED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$doris$load$loadv2$SparkLoadAppHandle$State[SparkLoadAppHandle.State.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$doris$load$loadv2$SparkLoadAppHandle$State[SparkLoadAppHandle.State.KILLED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$doris$load$loadv2$SparkLoadAppHandle$State[SparkLoadAppHandle.State.LOST.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState = new int[YarnApplicationState.values().length];
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.FINISHED.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.KILLED.ordinal()] = 3;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    public void submitEtlJob(long j, String str, EtlJobConfig etlJobConfig, SparkResource sparkResource, BrokerDesc brokerDesc, SparkLoadAppHandle sparkLoadAppHandle, SparkPendingTaskAttachment sparkPendingTaskAttachment) throws LoadException {
        deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc);
        if (!FeConstants.runningUnitTest) {
            initLocalDir();
        }
        SparkRepository.SparkArchive prepareArchive = sparkResource.prepareArchive();
        SparkRepository.SparkLibrary dppLibrary = prepareArchive.getDppLibrary();
        SparkRepository.SparkLibrary spark2xLibrary = prepareArchive.getSpark2xLibrary();
        String str2 = Config.spark_home_default_dir;
        String str3 = (etlJobConfig.outputPath + S3URI.PATH_DELIM + JOB_CONFIG_DIR + S3URI.PATH_DELIM) + CONFIG_FILE_NAME;
        String str4 = dppLibrary.remotePath;
        String str5 = spark2xLibrary.remotePath;
        String workingDir = sparkResource.getWorkingDir();
        String str6 = Config.spark_launcher_log_dir + S3URI.PATH_DELIM + String.format(LAUNCHER_LOG, Long.valueOf(j), str);
        Map<String, String> sparkConfigs = sparkResource.getSparkConfigs();
        if (Strings.isNullOrEmpty(sparkConfigs.get("spark.yarn.archive"))) {
            sparkConfigs.put("spark.yarn.archive", str5);
        }
        if (Strings.isNullOrEmpty(sparkConfigs.get("spark.yarn.stage.dir"))) {
            sparkConfigs.put("spark.yarn.stage.dir", workingDir);
        }
        LOG.info("submit etl spark job, sparkConfigs:{}", sparkConfigs);
        try {
            BrokerUtil.writeFile(etlJobConfig.configToJson().getBytes("UTF-8"), str3, brokerDesc);
            Map<String, String> envConfigsWithoutPrefix = sparkResource.getEnvConfigsWithoutPrefix();
            LOG.info("submit etl job,env:{}", envConfigsWithoutPrefix);
            SparkLauncher sparkLauncher = new SparkLauncher(envConfigsWithoutPrefix);
            sparkLauncher.setMaster(sparkResource.getMaster()).setDeployMode(sparkResource.getDeployMode().name().toLowerCase()).setAppResource(str4).setMainClass(SPARK_ETL_JOB_CLASS).setAppName(String.format(ETL_JOB_NAME, str)).setSparkHome(str2).addAppArgs(new String[]{str3}).redirectError();
            for (Map.Entry<String, String> entry : sparkResource.getSparkConfigs().entrySet()) {
                sparkLauncher.setConf(entry.getKey(), entry.getValue());
            }
            try {
                sparkLoadAppHandle.setProcess(sparkLauncher.launch());
                if (!FeConstants.runningUnitTest) {
                    SparkLauncherMonitor.LogMonitor createLogMonitor = SparkLauncherMonitor.createLogMonitor(sparkLoadAppHandle, sparkConfigs);
                    createLogMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
                    createLogMonitor.setRedirectLogPath(str6);
                    createLogMonitor.start();
                    try {
                        createLogMonitor.join();
                    } catch (InterruptedException e) {
                        createLogMonitor.interrupt();
                        throw new LoadException("start spark app failed. error: " + e.getMessage());
                    }
                }
                String appId = sparkLoadAppHandle.getAppId();
                SparkLoadAppHandle.State state = sparkLoadAppHandle.getState();
                if (fromSparkState(state) == TEtlState.CANCELLED) {
                    throw new LoadException("start spark app failed. error: spark app state: " + state.toString() + ", loadJobId:" + j);
                }
                if (appId == null) {
                    throw new LoadException("start spark app failed. error: Waiting too much time to get appId from handle. spark app state: " + state.toString() + ", loadJobId:" + j);
                }
                sparkPendingTaskAttachment.setAppId(appId);
                sparkPendingTaskAttachment.setHandle(sparkLoadAppHandle);
            } catch (IOException e2) {
                LOG.warn("start spark app failed. error: ", e2);
                throw new LoadException("start spark app failed. error: " + e2.getMessage());
            }
        } catch (UnsupportedEncodingException | UserException e3) {
            throw new LoadException(e3.getMessage());
        }
    }

    public EtlStatus getEtlJobStatus(SparkLoadAppHandle sparkLoadAppHandle, String str, long j, String str2, SparkResource sparkResource, BrokerDesc brokerDesc) throws LoadException {
        EtlStatus etlStatus = new EtlStatus();
        Preconditions.checkState((str == null || str.isEmpty()) ? false : true);
        if (sparkResource.isYarnMaster()) {
            String format = String.format(YARN_STATUS_CMD, sparkResource.getYarnClientPath(), sparkResource.prepareYarnConfig(), str);
            LOG.info(format);
            Map<String, String> envConfigsWithoutPrefix = sparkResource.getEnvConfigsWithoutPrefix();
            String[] strArr = new String[envConfigsWithoutPrefix.size() + 1];
            int i = 0 + 1;
            strArr[0] = "LC_ALL=" + Config.locale;
            if (envConfigsWithoutPrefix.size() > 0) {
                for (Map.Entry<String, String> entry : envConfigsWithoutPrefix.entrySet()) {
                    int i2 = i;
                    i++;
                    strArr[i2] = entry.getKey() + "=" + entry.getValue();
                }
            }
            LOG.info("getEtlJobStatus,appId:{}, loadJobId:{}, env:{},resource:{}", str, Long.valueOf(j), strArr, sparkResource);
            CommandResult executeCommand = Util.executeCommand(format, strArr, EXEC_CMD_TIMEOUT_MS);
            if (executeCommand.getReturnCode() != 0) {
                String stderr = executeCommand.getStderr();
                if (stderr != null && stderr.contains("doesn't exist in RM")) {
                    LOG.warn("spark app not found. spark app id: {}, load job id: {}", str, Long.valueOf(j));
                    etlStatus.setState(TEtlState.CANCELLED);
                    etlStatus.setFailMsg(stderr);
                }
                LOG.warn("yarn application status failed. spark app id: {}, load job id: {}, timeout: {}, msg: {}", str, Long.valueOf(j), Long.valueOf(EXEC_CMD_TIMEOUT_MS), stderr);
                etlStatus.setState(TEtlState.CANCELLED);
                etlStatus.setFailMsg(stderr);
                return etlStatus;
            }
            ApplicationReport report = new YarnApplicationReport(executeCommand.getStdout()).getReport();
            LOG.info("yarn application -status {}. load job id: {}, output: {}, report: {}", str, Long.valueOf(j), executeCommand.getStdout(), report);
            YarnApplicationState yarnApplicationState = report.getYarnApplicationState();
            FinalApplicationStatus finalApplicationStatus = report.getFinalApplicationStatus();
            etlStatus.setState(fromYarnState(yarnApplicationState, finalApplicationStatus));
            if (etlStatus.getState() == TEtlState.CANCELLED) {
                if (yarnApplicationState == YarnApplicationState.FINISHED) {
                    etlStatus.setFailMsg("spark app state: " + finalApplicationStatus.toString());
                } else {
                    etlStatus.setFailMsg("yarn app state: " + yarnApplicationState.toString());
                }
            }
            etlStatus.setTrackingUrl(sparkLoadAppHandle.getUrl() != null ? sparkLoadAppHandle.getUrl() : report.getTrackingUrl());
            etlStatus.setProgress((int) (report.getProgress() * 100.0f));
        } else {
            if (sparkLoadAppHandle == null) {
                etlStatus.setFailMsg("spark app handle is null");
                etlStatus.setState(TEtlState.CANCELLED);
                return etlStatus;
            }
            SparkLoadAppHandle.State state = sparkLoadAppHandle.getState();
            etlStatus.setState(fromSparkState(state));
            if (etlStatus.getState() == TEtlState.CANCELLED) {
                etlStatus.setFailMsg("spark app state: " + state.toString());
            }
            LOG.info("spark app id: {}, load job id: {}, app state: {}", str, Long.valueOf(j), state);
        }
        if (etlStatus.getState() == TEtlState.FINISHED || etlStatus.getState() == TEtlState.CANCELLED) {
            String dppResultFilePath = EtlJobConfig.getDppResultFilePath(str2);
            try {
                DppResult dppResult = (DppResult) new Gson().fromJson(new String(BrokerUtil.readFile(dppResultFilePath, brokerDesc, 0L), "UTF-8"), DppResult.class);
                if (dppResult != null) {
                    etlStatus.setDppResult(dppResult);
                    if (etlStatus.getState() == TEtlState.CANCELLED && !Strings.isNullOrEmpty(dppResult.failedReason)) {
                        etlStatus.setFailMsg(dppResult.failedReason);
                    }
                }
            } catch (UserException | JsonSyntaxException | UnsupportedEncodingException e) {
                LOG.warn("read broker file failed. path: {}", dppResultFilePath, e);
            }
        }
        return etlStatus;
    }

    public void killEtlJob(SparkLoadAppHandle sparkLoadAppHandle, String str, long j, SparkResource sparkResource) throws LoadException {
        if (!sparkResource.isYarnMaster()) {
            if (sparkLoadAppHandle != null) {
                sparkLoadAppHandle.stop();
                return;
            }
            return;
        }
        if (Strings.isNullOrEmpty(str)) {
            str = sparkLoadAppHandle.getAppId();
            if (Strings.isNullOrEmpty(str)) {
                sparkLoadAppHandle.kill();
                return;
            }
        }
        String format = String.format(YARN_KILL_CMD, sparkResource.getYarnClientPath(), sparkResource.prepareYarnConfig(), str);
        LOG.info(format);
        Map<String, String> envConfigsWithoutPrefix = sparkResource.getEnvConfigsWithoutPrefix();
        String[] strArr = new String[envConfigsWithoutPrefix.size() + 1];
        int i = 0 + 1;
        strArr[0] = "LC_ALL=" + Config.locale;
        if (envConfigsWithoutPrefix.size() > 0) {
            for (Map.Entry<String, String> entry : envConfigsWithoutPrefix.entrySet()) {
                int i2 = i;
                i++;
                strArr[i2] = entry.getKey() + "=" + entry.getValue();
            }
        }
        LOG.info("killEtlJob, env:{}", strArr);
        CommandResult executeCommand = Util.executeCommand(format, strArr, EXEC_CMD_TIMEOUT_MS);
        LOG.info("yarn application -kill {}, output: {}", str, executeCommand.getStdout());
        if (executeCommand.getReturnCode() != 0) {
            LOG.warn("yarn application kill failed. app id: {}, load job id: {}, msg: {}", str, Long.valueOf(j), executeCommand.getStderr());
        }
    }

    public Map<String, Long> getEtlFilePaths(String str, BrokerDesc brokerDesc) throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        ArrayList<TBrokerFileStatus> newArrayList = Lists.newArrayList();
        try {
            BrokerUtil.parseFile(str + "/*", brokerDesc, newArrayList);
            for (TBrokerFileStatus tBrokerFileStatus : newArrayList) {
                if (!tBrokerFileStatus.isDir) {
                    newHashMap.put(tBrokerFileStatus.getPath(), Long.valueOf(tBrokerFileStatus.getSize()));
                }
            }
            LOG.debug("get spark etl file paths. files map: {}", newHashMap);
            return newHashMap;
        } catch (UserException e) {
            throw new Exception(e);
        }
    }

    public static synchronized void initLocalDir() {
        File file = new File(Config.spark_launcher_log_dir);
        if (file.exists()) {
            return;
        }
        file.mkdirs();
    }

    public void deleteEtlOutputPath(String str, BrokerDesc brokerDesc) {
        try {
            BrokerUtil.deletePath(str, brokerDesc);
            LOG.info("delete path success. path: {}", str);
        } catch (UserException e) {
            LOG.warn("delete path failed. path: {}", str, e);
        }
    }

    private TEtlState fromYarnState(YarnApplicationState yarnApplicationState, FinalApplicationStatus finalApplicationStatus) {
        switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[yarnApplicationState.ordinal()]) {
            case 1:
                return finalApplicationStatus == FinalApplicationStatus.SUCCEEDED ? TEtlState.FINISHED : TEtlState.CANCELLED;
            case 2:
            case 3:
                return TEtlState.CANCELLED;
            default:
                return TEtlState.RUNNING;
        }
    }

    private TEtlState fromSparkState(SparkLoadAppHandle.State state) {
        switch (state) {
            case FINISHED:
                return TEtlState.FINISHED;
            case FAILED:
            case KILLED:
            case LOST:
                return TEtlState.CANCELLED;
            default:
                return TEtlState.RUNNING;
        }
    }
}
