/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.task.datax;

import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.log.SensitiveDataConverter;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.datax.DataxParameters;
import org.apache.dolphinscheduler.plugin.task.datax.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.datax.DataxUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataxTask
extends AbstractTask {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DataxTask.class);
    public static final String JVM_PARAM = "--jvm=\"-Xms%sG -Xmx%sG\" ";
    public static final String CUSTOM_PARAM = " -D%s='%s'";
    private static final String PYTHON_LAUNCHER = "${PYTHON_LAUNCHER}";
    private static final String SELECT_ALL_CHARACTER = "*";
    private static final String POST_JDBC_INFO_REGEX = "(?<=(post jdbc info:)).*(?=)";
    private static final String DATAX_LAUNCHER = "${DATAX_LAUNCHER}";
    private static final int DATAX_CHANNEL_COUNT = 1;
    private DataxParameters dataXParameters;
    private ShellCommandExecutor shellCommandExecutor;
    private TaskExecutionContext taskExecutionContext;
    private DataxTaskExecutionContext dataxTaskExecutionContext;

    public DataxTask(TaskExecutionContext taskExecutionContext) {
        super(taskExecutionContext);
        this.taskExecutionContext = taskExecutionContext;
        this.shellCommandExecutor = new ShellCommandExecutor(arg_0 -> ((DataxTask)this).logHandle(arg_0), taskExecutionContext);
    }

    public void init() {
        this.dataXParameters = (DataxParameters)((Object)JSONUtils.parseObject((String)this.taskExecutionContext.getTaskParams(), DataxParameters.class));
        log.info("Initialize datax task params {}", (Object)JSONUtils.toPrettyJsonString((Object)((Object)this.dataXParameters)));
        if (this.dataXParameters == null || !this.dataXParameters.checkParameters()) {
            throw new RuntimeException("datax task params is not valid");
        }
        SensitiveDataConverter.addMaskPattern((String)POST_JDBC_INFO_REGEX);
        this.dataxTaskExecutionContext = this.dataXParameters.generateExtendedContext(this.taskExecutionContext.getResourceParametersHelper());
    }

    public void handle(TaskCallBack taskCallBack) throws TaskException {
        try {
            Map paramsMap = this.taskExecutionContext.getPrepareParamsMap();
            IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder().properties(ParameterUtils.convert((Map)paramsMap)).appendScript(this.buildCommand(this.buildDataxJsonFile(paramsMap), paramsMap));
            TaskResponse commandExecuteResult = this.shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
            this.setExitStatusCode(commandExecuteResult.getExitStatusCode());
            this.setProcessId(commandExecuteResult.getProcessId());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("The current DataX task has been interrupted", (Throwable)e);
            this.setExitStatusCode(-1);
            throw new TaskException("The current DataX task has been interrupted", (Throwable)e);
        }
        catch (Exception e) {
            log.error("datax task error", (Throwable)e);
            this.setExitStatusCode(-1);
            throw new TaskException("Execute DataX task failed", (Throwable)e);
        }
    }

    public void cancel() throws TaskException {
        try {
            this.shellCommandExecutor.cancelApplication();
        }
        catch (Exception e) {
            throw new TaskException("cancel application error", (Throwable)e);
        }
    }

    private String buildDataxJsonFile(Map<String, Property> paramsMap) throws Exception {
        String json;
        String fileName = String.format("%s/%s_job.json", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskAppId());
        Path path = new File(fileName).toPath();
        if (Files.exists(path, new LinkOption[0])) {
            return fileName;
        }
        if (this.dataXParameters.getCustomConfig() == Flag.YES.ordinal()) {
            json = this.dataXParameters.getJson().replaceAll("\\r\\n", System.lineSeparator());
        } else {
            ObjectNode job = JSONUtils.createObjectNode();
            job.putArray("content").addAll(this.buildDataxJobContentJson());
            job.set("setting", (JsonNode)this.buildDataxJobSettingJson());
            ObjectNode root = JSONUtils.createObjectNode();
            root.set("job", (JsonNode)job);
            root.set("core", (JsonNode)this.buildDataxCoreJson());
            json = root.toString();
        }
        json = ParameterUtils.convertParameterPlaceholders((String)json, (Map)ParameterUtils.convert(paramsMap));
        log.debug("datax job json : {}", (Object)json);
        FileUtils.writeStringToFile((File)new File(fileName), (String)json, (Charset)StandardCharsets.UTF_8);
        return fileName;
    }

    private List<ObjectNode> buildDataxJobContentJson() {
        BaseConnectionParam dataSourceCfg = (BaseConnectionParam)DataSourceUtils.buildConnectionParams((DbType)this.dataxTaskExecutionContext.getSourcetype(), (String)this.dataxTaskExecutionContext.getSourceConnectionParams());
        BaseConnectionParam dataTargetCfg = (BaseConnectionParam)DataSourceUtils.buildConnectionParams((DbType)this.dataxTaskExecutionContext.getTargetType(), (String)this.dataxTaskExecutionContext.getTargetConnectionParams());
        ArrayList<ObjectNode> readerConnArr = new ArrayList<ObjectNode>();
        ObjectNode readerConn = JSONUtils.createObjectNode();
        ArrayNode sqlArr = readerConn.putArray("querySql");
        for (String sql : new String[]{this.dataXParameters.getSql()}) {
            sqlArr.add(sql);
        }
        ArrayNode urlArr = readerConn.putArray("jdbcUrl");
        urlArr.add(DataSourceUtils.getJdbcUrl((DbType)DbType.valueOf((String)this.dataXParameters.getDsType()), (ConnectionParam)dataSourceCfg));
        readerConnArr.add(readerConn);
        ObjectNode readerParam = JSONUtils.createObjectNode();
        readerParam.put("username", dataSourceCfg.getUser());
        readerParam.put("password", PasswordUtils.decodePassword((String)dataSourceCfg.getPassword()));
        readerParam.putArray("connection").addAll(readerConnArr);
        ObjectNode reader = JSONUtils.createObjectNode();
        reader.put("name", DataxUtils.getReaderPluginName(this.dataxTaskExecutionContext.getSourcetype()));
        reader.set("parameter", (JsonNode)readerParam);
        ArrayList<ObjectNode> writerConnArr = new ArrayList<ObjectNode>();
        ObjectNode writerConn = JSONUtils.createObjectNode();
        ArrayNode tableArr = writerConn.putArray("table");
        tableArr.add(this.dataXParameters.getTargetTable());
        writerConn.put("jdbcUrl", DataSourceUtils.getJdbcUrl((DbType)DbType.valueOf((String)this.dataXParameters.getDtType()), (ConnectionParam)dataTargetCfg));
        writerConnArr.add(writerConn);
        ObjectNode writerParam = JSONUtils.createObjectNode();
        writerParam.put("username", dataTargetCfg.getUser());
        writerParam.put("password", PasswordUtils.decodePassword((String)dataTargetCfg.getPassword()));
        String[] columns = this.parsingSqlColumnNames(this.dataxTaskExecutionContext.getSourcetype(), this.dataxTaskExecutionContext.getTargetType(), dataSourceCfg, this.dataXParameters.getSql());
        ArrayNode columnArr = writerParam.putArray("column");
        for (String column : columns) {
            columnArr.add(column);
        }
        writerParam.putArray("connection").addAll(writerConnArr);
        if (CollectionUtils.isNotEmpty(this.dataXParameters.getPreStatements())) {
            ArrayNode preSqlArr = writerParam.putArray("preSql");
            for (String preSql : this.dataXParameters.getPreStatements()) {
                preSqlArr.add(preSql);
            }
        }
        if (CollectionUtils.isNotEmpty(this.dataXParameters.getPostStatements())) {
            ArrayNode postSqlArr = writerParam.putArray("postSql");
            for (String postSql : this.dataXParameters.getPostStatements()) {
                postSqlArr.add(postSql);
            }
        }
        ObjectNode writer = JSONUtils.createObjectNode();
        writer.put("name", DataxUtils.getWriterPluginName(this.dataxTaskExecutionContext.getTargetType()));
        writer.set("parameter", (JsonNode)writerParam);
        ArrayList<ObjectNode> contentList = new ArrayList<ObjectNode>();
        ObjectNode content = JSONUtils.createObjectNode();
        content.set("reader", (JsonNode)reader);
        content.set("writer", (JsonNode)writer);
        contentList.add(content);
        return contentList;
    }

    private ObjectNode buildDataxJobSettingJson() {
        ObjectNode speed = JSONUtils.createObjectNode();
        speed.put("channel", 1);
        if (this.dataXParameters.getJobSpeedByte() > 0) {
            speed.put("byte", this.dataXParameters.getJobSpeedByte());
        }
        if (this.dataXParameters.getJobSpeedRecord() > 0) {
            speed.put("record", this.dataXParameters.getJobSpeedRecord());
        }
        ObjectNode errorLimit = JSONUtils.createObjectNode();
        errorLimit.put("record", 0);
        errorLimit.put("percentage", 0);
        ObjectNode setting = JSONUtils.createObjectNode();
        setting.set("speed", (JsonNode)speed);
        setting.set("errorLimit", (JsonNode)errorLimit);
        return setting;
    }

    private ObjectNode buildDataxCoreJson() {
        ObjectNode speed = JSONUtils.createObjectNode();
        speed.put("channel", 1);
        if (this.dataXParameters.getJobSpeedByte() > 0) {
            speed.put("byte", this.dataXParameters.getJobSpeedByte());
        }
        if (this.dataXParameters.getJobSpeedRecord() > 0) {
            speed.put("record", this.dataXParameters.getJobSpeedRecord());
        }
        ObjectNode channel = JSONUtils.createObjectNode();
        channel.set("speed", (JsonNode)speed);
        ObjectNode transport = JSONUtils.createObjectNode();
        transport.set("channel", (JsonNode)channel);
        ObjectNode core = JSONUtils.createObjectNode();
        core.set("transport", (JsonNode)transport);
        return core;
    }

    protected String buildCommand(String jobConfigFilePath, Map<String, Property> paramsMap) {
        return "${PYTHON_LAUNCHER} ${DATAX_LAUNCHER} " + this.loadJvmEnv(this.dataXParameters) + this.addCustomParameters(paramsMap) + " " + jobConfigFilePath;
    }

    private StringBuilder addCustomParameters(Map<String, Property> paramsMap) {
        if (paramsMap == null || paramsMap.size() == 0) {
            return new StringBuilder();
        }
        StringBuilder customParameters = new StringBuilder("-p \"");
        for (Map.Entry<String, Property> entry : paramsMap.entrySet()) {
            customParameters.append(String.format(CUSTOM_PARAM, entry.getKey(), entry.getValue().getValue()));
        }
        customParameters.replace(4, 5, "");
        customParameters.append("\"");
        return customParameters;
    }

    public String loadJvmEnv(DataxParameters dataXParameters) {
        int xms = Math.max(dataXParameters.getXms(), 1);
        int xmx = Math.max(dataXParameters.getXmx(), 1);
        return String.format(JVM_PARAM, xms, xmx);
    }

    private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg, String sql) {
        String[] columnNames = this.tryGrammaticalAnalysisSqlColumnNames(sourceType, sql, dataSourceCfg.getCompatibleMode());
        if (columnNames == null || columnNames.length == 0) {
            log.info("try to execute sql analysis query column name");
            columnNames = this.tryExecuteSqlResolveColumnNames(sourceType, dataSourceCfg, sql);
        }
        this.notNull(columnNames, String.format("parsing sql columns failed : %s", sql));
        return DataxUtils.convertKeywordsColumns(targetType, columnNames);
    }

    private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql, String compatibleMode) {
        String[] columnNames;
        try {
            SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql);
            if (StringUtils.isNotBlank((CharSequence)compatibleMode)) {
                parser = DataxUtils.getSqlStatementParser(compatibleMode, sql);
            }
            if (parser == null) {
                log.warn("database driver [{}] is not support grammatical analysis sql", (Object)dbType);
                return new String[0];
            }
            SQLStatement sqlStatement = parser.parseStatement();
            SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement;
            SQLSelect sqlSelect = sqlSelectStatement.getSelect();
            List selectItemList = null;
            if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) {
                SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery();
                selectItemList = block.getSelectList();
            } else if (sqlSelect.getQuery() instanceof SQLUnionQuery) {
                SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery();
                SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight();
                selectItemList = block.getSelectList();
            }
            this.notNull(selectItemList, String.format("select query type [%s] is not support", sqlSelect.getQuery().toString()));
            columnNames = new String[selectItemList.size()];
            for (int i = 0; i < selectItemList.size(); ++i) {
                SQLSelectItem item = (SQLSelectItem)selectItemList.get(i);
                String columnName = null;
                if (item.getAlias() != null) {
                    columnName = item.getAlias();
                } else if (item.getExpr() != null) {
                    SQLPropertyExpr expr;
                    if (item.getExpr() instanceof SQLPropertyExpr) {
                        expr = (SQLPropertyExpr)item.getExpr();
                        columnName = expr.getName();
                    } else if (item.getExpr() instanceof SQLIdentifierExpr) {
                        expr = (SQLIdentifierExpr)item.getExpr();
                        columnName = expr.getName();
                    }
                } else {
                    throw new RuntimeException(String.format("grammatical analysis sql column [ %s ] failed", item));
                }
                if (SELECT_ALL_CHARACTER.equals(item.toString())) {
                    log.info("sql contains *, grammatical analysis failed");
                    return new String[0];
                }
                if (columnName == null) {
                    throw new RuntimeException(String.format("grammatical analysis sql column [ %s ] failed", item));
                }
                columnNames[i] = columnName;
            }
        }
        catch (Exception e) {
            log.warn(e.getMessage(), (Throwable)e);
            return new String[0];
        }
        return columnNames;
    }

    public String[] tryExecuteSqlResolveColumnNames(DbType sourceType, BaseConnectionParam baseDataSource, String sql) {
        String[] columnNames;
        sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql);
        sql = sql.replace(";", "");
        try (Connection connection = DataSourceClientProvider.getAdHocConnection((DbType)sourceType, (ConnectionParam)baseDataSource);
             PreparedStatement stmt = connection.prepareStatement(sql);
             ResultSet resultSet = stmt.executeQuery();){
            ResultSetMetaData md = resultSet.getMetaData();
            int num = md.getColumnCount();
            columnNames = new String[num];
            for (int i = 1; i <= num; ++i) {
                columnNames[i - 1] = md.getColumnName(i).replace("t.", "");
            }
        }
        catch (SQLException | ExecutionException e) {
            log.error(e.getMessage(), (Throwable)e);
            return null;
        }
        return columnNames;
    }

    public AbstractParameters getParameters() {
        return this.dataXParameters;
    }

    private void notNull(Object obj, String message) {
        if (obj == null) {
            throw new RuntimeException(message);
        }
    }
}

