/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.runner;

import com.github.rholder.retry.RetryException;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecuteThread
implements Runnable,
Delayed {
    private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
    private TaskExecutionContext taskExecutionContext;
    private AbstractTask task;
    private TaskCallbackService taskCallbackService;
    private AlertClientService alertClientService;
    private TaskPluginManager taskPluginManager;

    public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, AlertClientService alertClientService) {
        this.taskExecutionContext = taskExecutionContext;
        this.taskCallbackService = taskCallbackService;
        this.alertClientService = alertClientService;
    }

    public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, AlertClientService alertClientService, TaskPluginManager taskPluginManager) {
        this.taskExecutionContext = taskExecutionContext;
        this.taskCallbackService = taskCallbackService;
        this.alertClientService = alertClientService;
        this.taskPluginManager = taskPluginManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(this.taskExecutionContext.getTaskInstanceId(), this.taskExecutionContext.getProcessInstanceId());
        try {
            this.logger.info("script path : {}", (Object)this.taskExecutionContext.getExecutePath());
            if (!OSUtils.getUserList().contains(this.taskExecutionContext.getTenantCode())) {
                String errorLog = String.format("tenantCode: %s does not exist", this.taskExecutionContext.getTenantCode());
                this.logger.error(errorLog);
                responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
                responseCommand.setEndTime(new Date());
                return;
            }
            if (this.taskExecutionContext.getStartTime() == null) {
                this.taskExecutionContext.setStartTime(new Date());
            }
            if (this.taskExecutionContext.getCurrentExecutionStatus() != ExecutionStatus.RUNNING_EXECUTION) {
                this.changeTaskExecutionStatusToRunning();
            }
            this.logger.info("the task begins to execute. task instance id: {}", (Object)this.taskExecutionContext.getTaskInstanceId());
            int dryRun = this.taskExecutionContext.getDryRun();
            if (dryRun == 0) {
                this.downloadResource(this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getResources(), this.logger);
            }
            this.taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
            this.taskExecutionContext.setDefinedParams(this.getGlobalParamsMap());
            this.taskExecutionContext.setTaskAppId(String.format("%s_%s", this.taskExecutionContext.getProcessInstanceId(), this.taskExecutionContext.getTaskInstanceId()));
            this.preBuildBusinessParams();
            TaskChannel taskChannel = this.taskPluginManager.getTaskChannelMap().get(this.taskExecutionContext.getTaskType());
            if (null == taskChannel) {
                throw new RuntimeException(String.format("%s Task Plugin Not Found,Please Check Config File.", this.taskExecutionContext.getTaskType()));
            }
            TaskRequest taskRequest = (TaskRequest)JSONUtils.parseObject((String)JSONUtils.toJsonString((Object)this.taskExecutionContext), TaskRequest.class);
            String taskLogName = LoggerUtils.buildTaskId((String)"TASK", (Long)this.taskExecutionContext.getProcessDefineCode(), (int)this.taskExecutionContext.getProcessDefineVersion(), (int)this.taskExecutionContext.getProcessInstanceId(), (int)this.taskExecutionContext.getTaskInstanceId());
            taskRequest.setTaskLogName(taskLogName);
            Thread.currentThread().setName(String.format("TaskLogInfo-%s", taskLogName));
            this.task = taskChannel.createTask(taskRequest);
            this.task.init();
            this.task.getParameters().setVarPool(this.taskExecutionContext.getVarPool());
            if (dryRun == 0) {
                this.task.handle();
                if (this.task.getNeedAlert()) {
                    this.sendAlert(this.task.getTaskAlertInfo());
                }
                responseCommand.setStatus(this.task.getExitStatus().getCode());
            } else {
                responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
                this.task.setExitStatusCode(0);
            }
            responseCommand.setEndTime(new Date());
            responseCommand.setProcessId(this.task.getProcessId());
            responseCommand.setAppIds(this.task.getAppIds());
            responseCommand.setVarPool(JSONUtils.toJsonString((Object)this.task.getParameters().getVarPool()));
            this.logger.info("task instance id : {},task final status : {}", (Object)this.taskExecutionContext.getTaskInstanceId(), (Object)this.task.getExitStatus());
        }
        catch (Throwable e) {
            this.logger.error("task scheduler failure", e);
            this.kill();
            responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
            responseCommand.setEndTime(new Date());
            responseCommand.setProcessId(this.task.getProcessId());
            responseCommand.setAppIds(this.task.getAppIds());
        }
        finally {
            TaskExecutionContextCacheManager.removeByTaskInstanceId((Integer)this.taskExecutionContext.getTaskInstanceId());
            ResponceCache.get().cache(this.taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
            this.taskCallbackService.sendResult(this.taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
            this.clearTaskExecPath();
        }
    }

    private void sendAlert(TaskAlertInfo taskAlertInfo) {
        this.alertClientService.sendAlert(taskAlertInfo.getAlertGroupId().intValue(), taskAlertInfo.getTitle(), taskAlertInfo.getContent());
    }

    private void clearTaskExecPath() {
        this.logger.info("develop mode is: {}", (Object)CommonUtils.isDevelopMode());
        if (!CommonUtils.isDevelopMode()) {
            String execLocalPath = this.taskExecutionContext.getExecutePath();
            if (StringUtils.isEmpty((String)execLocalPath)) {
                this.logger.warn("task: {} exec local path is empty.", (Object)this.taskExecutionContext.getTaskName());
                return;
            }
            if ("/".equals(execLocalPath)) {
                this.logger.warn("task: {} exec local path is '/', direct deletion is not allowed", (Object)this.taskExecutionContext.getTaskName());
                return;
            }
            try {
                FileUtils.deleteDirectory((File)new File(execLocalPath));
                this.logger.info("exec local path: {} cleared.", (Object)execLocalPath);
            }
            catch (IOException e) {
                this.logger.error("delete exec dir failed : {}", (Object)e.getMessage(), (Object)e);
            }
        }
    }

    private Map<String, String> getGlobalParamsMap() {
        HashMap<String, String> globalParamsMap = new HashMap<String, String>(16);
        String globalParamsStr = this.taskExecutionContext.getGlobalParams();
        if (globalParamsStr != null) {
            List globalParamsList = JSONUtils.toList((String)globalParamsStr, Property.class);
            globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));
        }
        return globalParamsMap;
    }

    public void kill() {
        if (this.task != null) {
            try {
                this.task.cancelApplication(true);
                ProcessUtils.killYarnJob(this.taskExecutionContext);
            }
            catch (Exception e) {
                this.logger.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    private void downloadResource(String execLocalPath, Map<String, String> projectRes, Logger logger) {
        if (MapUtils.isEmpty(projectRes)) {
            return;
        }
        Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();
        for (Map.Entry<String, String> resource : resEntries) {
            String fullName = resource.getKey();
            String tenantCode = resource.getValue();
            File resFile = new File(execLocalPath, fullName);
            if (!resFile.exists()) {
                try {
                    String resHdfsPath = HadoopUtils.getHdfsResourceFileName((String)tenantCode, (String)fullName);
                    logger.info("get resource file from hdfs :{}", (Object)resHdfsPath);
                    HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
                    continue;
                }
                catch (Exception e) {
                    logger.error(e.getMessage(), (Throwable)e);
                    throw new RuntimeException(e.getMessage());
                }
            }
            logger.info("file : {} exists ", (Object)resFile.getName());
        }
    }

    private void changeTaskExecutionStatusToRunning() {
        this.taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        Command ackCommand = this.buildAckCommand().convert2Command();
        try {
            RetryerUtils.retryCall(() -> {
                this.taskCallbackService.sendAck(this.taskExecutionContext.getTaskInstanceId(), ackCommand);
                return Boolean.TRUE;
            });
        }
        catch (RetryException | ExecutionException e) {
            this.logger.error(e.getMessage(), e);
        }
    }

    private TaskExecuteAckCommand buildAckCommand() {
        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
        ackCommand.setTaskInstanceId(this.taskExecutionContext.getTaskInstanceId());
        ackCommand.setStatus(this.taskExecutionContext.getCurrentExecutionStatus().getCode());
        ackCommand.setStartTime(this.taskExecutionContext.getStartTime());
        ackCommand.setLogPath(this.taskExecutionContext.getLogPath());
        ackCommand.setHost(this.taskExecutionContext.getHost());
        if (TaskType.SQL.getDesc().equalsIgnoreCase(this.taskExecutionContext.getTaskType()) || TaskType.PROCEDURE.getDesc().equalsIgnoreCase(this.taskExecutionContext.getTaskType())) {
            ackCommand.setExecutePath(null);
        } else {
            ackCommand.setExecutePath(this.taskExecutionContext.getExecutePath());
        }
        return ackCommand;
    }

    public TaskExecutionContext getTaskExecutionContext() {
        return this.taskExecutionContext;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(DateUtils.getRemainTime((Date)this.taskExecutionContext.getFirstSubmitTime(), (long)((long)this.taskExecutionContext.getDelayTime() * 60L)), TimeUnit.SECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (o == null) {
            return 1;
        }
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
    }

    private void preBuildBusinessParams() {
        HashMap<String, Property> paramsMap = new HashMap<String, Property>();
        if (this.taskExecutionContext.getScheduleTime() != null) {
            Date date = this.taskExecutionContext.getScheduleTime();
            String dateTime = DateUtils.format((Date)date, (String)"yyyyMMddHHmmss");
            Property p = new Property();
            p.setValue(dateTime);
            p.setProp("system.datetime");
            paramsMap.put("system.datetime", p);
        }
        this.taskExecutionContext.setParamsMap(paramsMap);
    }

    public AbstractTask getTask() {
        return this.task;
    }
}

