/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.processor;

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecuteProcessor
implements NettyRequestProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
    private final WorkerConfig workerConfig;
    private final TaskCallbackService taskCallbackService = (TaskCallbackService)SpringApplicationContext.getBean(TaskCallbackService.class);
    private AlertClientService alertClientService;
    private TaskPluginManager taskPluginManager;
    private final WorkerManagerThread workerManager;

    public TaskExecuteProcessor() {
        this.workerConfig = (WorkerConfig)SpringApplicationContext.getBean(WorkerConfig.class);
        this.workerManager = (WorkerManagerThread)SpringApplicationContext.getBean(WorkerManagerThread.class);
    }

    private void setTaskCache(TaskExecutionContext taskExecutionContext) {
        TaskExecutionContext preTaskCache = new TaskExecutionContext();
        preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        TaskRequest taskRequest = (TaskRequest)JSONUtils.parseObject((String)JSONUtils.toJsonString((Object)taskExecutionContext), TaskRequest.class);
        TaskExecutionContextCacheManager.cacheTaskExecutionContext((TaskRequest)taskRequest);
    }

    public TaskExecuteProcessor(AlertClientService alertClientService, TaskPluginManager taskPluginManager) {
        this();
        this.alertClientService = alertClientService;
        this.taskPluginManager = taskPluginManager;
    }

    public void process(Channel channel, Command command) {
        Preconditions.checkArgument((CommandType.TASK_EXECUTE_REQUEST == command.getType() ? 1 : 0) != 0, (Object)String.format("invalid command type : %s", command.getType()));
        TaskExecuteRequestCommand taskRequestCommand = (TaskExecuteRequestCommand)JSONUtils.parseObject((byte[])command.getBody(), TaskExecuteRequestCommand.class);
        logger.info("received command : {}", (Object)taskRequestCommand);
        if (taskRequestCommand == null) {
            logger.error("task execute request command is null");
            return;
        }
        String contextJson = taskRequestCommand.getTaskExecutionContext();
        TaskExecutionContext taskExecutionContext = (TaskExecutionContext)JSONUtils.parseObject((String)contextJson, TaskExecutionContext.class);
        if (taskExecutionContext == null) {
            logger.error("task execution context is null");
            return;
        }
        this.setTaskCache(taskExecutionContext);
        taskExecutionContext.setHost(NetUtils.getAddr((int)this.workerConfig.getListenPort()));
        if (CommonUtils.isSudoEnable() && this.workerConfig.getWorkerTenantAutoCreate()) {
            OSUtils.createUserIfAbsent((String)taskExecutionContext.getTenantCode());
        }
        ResponceCache.get().removeRecallCache(taskExecutionContext.getTaskInstanceId());
        this.taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
        boolean offer = this.workerManager.offer(new TaskExecuteThread(taskExecutionContext, this.taskCallbackService, this.alertClientService, this.taskPluginManager));
        if (!offer) {
            logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}", (Object)this.workerManager.getWaitSubmitQueueSize(), (Object)taskExecutionContext.getTaskInstanceId());
            this.sendRecallCommand(taskExecutionContext, channel);
            TaskExecutionContextCacheManager.removeByTaskInstanceId((Integer)taskExecutionContext.getTaskInstanceId());
        }
    }

    private void sendRecallCommand(TaskExecutionContext taskExecutionContext, Channel channel) {
        TaskRecallCommand taskRecallCommand = this.buildRecallCommand(taskExecutionContext);
        Command command = taskRecallCommand.convert2Command();
        ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), command, Event.WORKER_REJECT);
        this.taskCallbackService.changeRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
        this.taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), command);
        logger.info("send recall command successfully, taskId:{}, opaque:{}", (Object)taskExecutionContext.getTaskInstanceId(), (Object)command.getOpaque());
    }

    private TaskRecallCommand buildRecallCommand(TaskExecutionContext taskExecutionContext) {
        TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
        taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
        taskRecallCommand.setHost(taskExecutionContext.getHost());
        taskRecallCommand.setEvent(Event.WORKER_REJECT);
        taskRecallCommand.setStatus(ExecutionStatus.SUBMITTED_SUCCESS.getCode());
        return taskRecallCommand;
    }
}

