/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.task.spark;

import io.fabric8.kubernetes.client.Config;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.spark.ProgramType;
import org.apache.dolphinscheduler.plugin.task.spark.SparkParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkTask
extends AbstractYarnTask {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SparkTask.class);
    private SparkParameters sparkParameters;
    private final TaskExecutionContext taskExecutionContext;

    public SparkTask(TaskExecutionContext taskExecutionContext) {
        super(taskExecutionContext);
        this.taskExecutionContext = taskExecutionContext;
    }

    public void init() {
        this.sparkParameters = (SparkParameters)((Object)JSONUtils.parseObject((String)this.taskExecutionContext.getTaskParams(), SparkParameters.class));
        if (null == this.sparkParameters) {
            log.error("Spark params is null");
            return;
        }
        if (!this.sparkParameters.checkParameters()) {
            throw new RuntimeException("spark task params is not valid");
        }
        log.info("Initialize spark task params {}", (Object)JSONUtils.toPrettyJsonString((Object)((Object)this.sparkParameters)));
    }

    protected String getScript() {
        ArrayList<String> args = new ArrayList<String>();
        String sparkCommand = this.sparkParameters.getProgramType() == ProgramType.SQL ? "${SPARK_HOME}/bin/spark-sql" : "${SPARK_HOME}/bin/spark-submit";
        args.add(sparkCommand);
        args.addAll(this.populateSparkOptions());
        return args.stream().collect(Collectors.joining(" "));
    }

    private List<String> populateSparkOptions() {
        String yarnQueue;
        ArrayList<String> args = new ArrayList<String>();
        String deployMode = StringUtils.isNotEmpty((CharSequence)this.sparkParameters.getDeployMode()) ? this.sparkParameters.getDeployMode() : "local";
        boolean onLocal = "local".equals(deployMode);
        boolean onNativeKubernetes = StringUtils.isNotEmpty((CharSequence)this.sparkParameters.getNamespace());
        String masterUrl = StringUtils.isNotEmpty((CharSequence)this.sparkParameters.getMaster()) ? this.sparkParameters.getMaster() : (onLocal ? deployMode : (onNativeKubernetes ? "k8s://" + Config.fromKubeconfig((String)this.taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml()).getMasterUrl() : "yarn"));
        args.add("--master");
        args.add(masterUrl);
        if (!onLocal) {
            args.add("--deploy-mode");
            args.add(deployMode);
        }
        ProgramType programType = this.sparkParameters.getProgramType();
        String mainClass = this.sparkParameters.getMainClass();
        if (programType != ProgramType.PYTHON && programType != ProgramType.SQL && StringUtils.isNotEmpty((CharSequence)mainClass)) {
            args.add("--class");
            args.add(mainClass);
        }
        this.populateSparkResourceDefinitions(args);
        String appName = this.sparkParameters.getAppName();
        if (StringUtils.isNotEmpty((CharSequence)appName)) {
            args.add("--name");
            args.add(ArgsUtils.escape((String)appName));
        }
        String others = this.sparkParameters.getOthers();
        if (!"local".equals(deployMode) && (StringUtils.isEmpty((CharSequence)others) || !others.contains("--queue")) && StringUtils.isNotEmpty((CharSequence)(yarnQueue = this.sparkParameters.getYarnQueue()))) {
            args.add("--queue");
            args.add(yarnQueue);
        }
        if (StringUtils.isNotEmpty((CharSequence)others)) {
            args.add(others);
        }
        if (onNativeKubernetes) {
            args.add(String.format("--conf spark.kubernetes.driver.label.%s=%s", "dolphinscheduler-label", this.taskExecutionContext.getTaskAppId()));
            args.add(String.format("--conf spark.kubernetes.namespace=%s", JSONUtils.toMap((String)this.sparkParameters.getNamespace()).get("name")));
        }
        ResourceInfo mainJar = this.sparkParameters.getMainJar();
        if (programType != ProgramType.SQL) {
            ResourceContext resourceContext = this.taskExecutionContext.getResourceContext();
            args.add(resourceContext.getResourceItem(mainJar.getResourceName()).getResourceAbsolutePathInLocal());
        }
        String mainArgs = this.sparkParameters.getMainArgs();
        if (programType != ProgramType.SQL && StringUtils.isNotEmpty((CharSequence)mainArgs)) {
            args.add(mainArgs);
        }
        if (ProgramType.SQL == programType) {
            String sqlContent = "";
            String resourceFileName = "";
            args.add("-f");
            if ("FILE".equals(this.sparkParameters.getSqlExecutionType())) {
                List<ResourceInfo> resourceInfos = this.sparkParameters.getResourceList();
                if (resourceInfos.size() > 1) {
                    log.warn("more than 1 files detected, use the first one by default");
                }
                try {
                    resourceFileName = resourceInfos.get(0).getResourceName();
                    ResourceContext resourceContext = this.taskExecutionContext.getResourceContext();
                    sqlContent = FileUtils.readFileToString((File)new File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()), (Charset)StandardCharsets.UTF_8);
                }
                catch (IOException e) {
                    log.error("read sql content from file {} error ", (Object)resourceFileName, (Object)e);
                    throw new TaskException("read sql content error", (Throwable)e);
                }
            } else {
                sqlContent = this.sparkParameters.getRawScript();
            }
            args.add(this.generateScriptFile(sqlContent));
        }
        return args;
    }

    private void populateSparkResourceDefinitions(List<String> args) {
        String executorMemory;
        int executorCores;
        int numExecutors;
        String driverMemory;
        int driverCores = this.sparkParameters.getDriverCores();
        if (driverCores > 0) {
            args.add(String.format("--conf spark.driver.cores=%d", driverCores));
        }
        if (StringUtils.isNotEmpty((CharSequence)(driverMemory = this.sparkParameters.getDriverMemory()))) {
            args.add(String.format("--conf spark.driver.memory=%s", driverMemory));
        }
        if ((numExecutors = this.sparkParameters.getNumExecutors()) > 0) {
            args.add(String.format("--conf spark.executor.instances=%d", numExecutors));
        }
        if ((executorCores = this.sparkParameters.getExecutorCores()) > 0) {
            args.add(String.format("--conf spark.executor.cores=%d", executorCores));
        }
        if (StringUtils.isNotEmpty((CharSequence)(executorMemory = this.sparkParameters.getExecutorMemory()))) {
            args.add(String.format("--conf spark.executor.memory=%s", executorMemory));
        }
    }

    private String generateScriptFile(String sqlContent) {
        String scriptFileName = String.format("%s/%s_node.sql", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskAppId());
        File file = new File(scriptFileName);
        Path path = file.toPath();
        if (!Files.exists(path, new LinkOption[0])) {
            String script = this.replaceParam(sqlContent);
            log.info("raw script : {}", (Object)script);
            log.info("task execute path : {}", (Object)this.taskExecutionContext.getExecutePath());
            Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-xr-x");
            FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
            try {
                if (SystemUtils.IS_OS_WINDOWS) {
                    Files.createFile(path, new FileAttribute[0]);
                } else {
                    if (!file.getParentFile().exists()) {
                        file.getParentFile().mkdirs();
                    }
                    Files.createFile(path, attr);
                }
                Files.write(path, script.getBytes(), StandardOpenOption.APPEND);
            }
            catch (IOException e) {
                throw new RuntimeException("generate spark sql script error", e);
            }
        }
        return scriptFileName;
    }

    private String replaceParam(String script) {
        script = script.replaceAll("\\r\\n", System.lineSeparator());
        Map paramsMap = this.taskExecutionContext.getPrepareParamsMap();
        script = ParameterUtils.convertParameterPlaceholders((String)script, (Map)ParameterUtils.convert((Map)paramsMap));
        return script;
    }

    public AbstractParameters getParameters() {
        return this.sparkParameters;
    }
}

