/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.api.controller;

import com.google.common.collect.Lists;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.api.audit.OperatorLog;
import org.apache.dolphinscheduler.api.audit.enums.AuditType;
import org.apache.dolphinscheduler.api.controller.BaseController;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowBackFillRequest;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowTriggerRequest;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.utils.WorkflowUtils;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@Tag(name="EXECUTOR_TAG")
@RestController
@RequestMapping(value={"projects/{projectCode}/executors"})
public class ExecutorController
extends BaseController {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ExecutorController.class);
    @Autowired
    private ExecutorService execService;

    @Operation(summary="startWorkflowInstance", description="RUN_WORKFLOW_INSTANCE_NOTES")
    @Parameters(value={@Parameter(name="workflowDefinitionCode", description="WORKFLOW_DEFINITION_CODE", required=true, schema=@Schema(implementation=Long.class), example="100"), @Parameter(name="scheduleTime", description="SCHEDULE_TIME", required=true, schema=@Schema(implementation=String.class), example="2022-04-06 00:00:00,2022-04-06 00:00:00"), @Parameter(name="failureStrategy", description="FAILURE_STRATEGY", required=true, schema=@Schema(implementation=FailureStrategy.class)), @Parameter(name="startNodeList", description="START_NODE_LIST", schema=@Schema(implementation=String.class)), @Parameter(name="taskDependType", description="TASK_DEPEND_TYPE", schema=@Schema(implementation=TaskDependType.class)), @Parameter(name="execType", description="COMMAND_TYPE", schema=@Schema(implementation=CommandType.class)), @Parameter(name="warningType", description="WARNING_TYPE", required=true, schema=@Schema(implementation=WarningType.class)), @Parameter(name="warningGroupId", description="WARNING_GROUP_ID", schema=@Schema(implementation=int.class, example="100")), @Parameter(name="runMode", description="RUN_MODE", schema=@Schema(implementation=RunMode.class)), @Parameter(name="workflowInstancePriority", description="WORKFLOW_INSTANCE_PRIORITY", required=true, schema=@Schema(implementation=Priority.class)), @Parameter(name="workerGroup", description="WORKER_GROUP", schema=@Schema(implementation=String.class, example="default")), @Parameter(name="tenantCode", description="TENANT_CODE", schema=@Schema(implementation=String.class, example="default")), @Parameter(name="environmentCode", description="ENVIRONMENT_CODE", schema=@Schema(implementation=Long.class, example="-1")), @Parameter(name="timeout", description="TIMEOUT", schema=@Schema(implementation=int.class, example="100")), @Parameter(name="expectedParallelismNumber", description="EXPECTED_PARALLELISM_NUMBER", schema=@Schema(implementation=int.class, example="8")), @Parameter(name="dryRun", description="DRY_RUN", schema=@Schema(implementation=int.class, example="0")), @Parameter(name="testFlag", description="TEST_FLAG", schema=@Schema(implementation=int.class, example="0")), @Parameter(name="complementDependentMode", description="COMPLEMENT_DEPENDENT_MODE", schema=@Schema(implementation=ComplementDependentMode.class)), @Parameter(name="allLevelDependent", description="ALL_LEVEL_DEPENDENT", schema=@Schema(implementation=boolean.class, example="false")), @Parameter(name="executionOrder", description="EXECUTION_ORDER", schema=@Schema(implementation=ExecutionOrder.class))})
    @PostMapping(value={"start-workflow-instance"})
    @ResponseStatus(value=HttpStatus.OK)
    @ApiException(value=Status.START_WORKFLOW_INSTANCE_ERROR)
    @OperatorLog(auditType=AuditType.WORKFLOW_START)
    public Result<List<Integer>> triggerWorkflowDefinition(@Parameter(hidden=true) @RequestAttribute(value="session.user") User loginUser, @RequestParam(value="workflowDefinitionCode") long workflowDefinitionCode, @RequestParam(value="scheduleTime") String scheduleTime, @RequestParam(value="failureStrategy") FailureStrategy failureStrategy, @RequestParam(value="startNodeList", required=false) String startNodeList, @RequestParam(value="taskDependType", required=false, defaultValue="TASK_POST") TaskDependType taskDependType, @RequestParam(value="execType", required=false, defaultValue="START_PROCESS") CommandType execType, @RequestParam(value="warningType") WarningType warningType, @RequestParam(value="warningGroupId", required=false) Integer warningGroupId, @RequestParam(value="runMode", required=false) RunMode runMode, @RequestParam(value="workflowInstancePriority", required=false) Priority workflowInstancePriority, @RequestParam(value="workerGroup", required=false, defaultValue="default") String workerGroup, @RequestParam(value="tenantCode", required=false, defaultValue="default") String tenantCode, @RequestParam(value="environmentCode", required=false, defaultValue="-1") Long environmentCode, @RequestParam(value="startParams", required=false) String startParams, @RequestParam(value="expectedParallelismNumber", required=false) Integer expectedParallelismNumber, @RequestParam(value="dryRun", defaultValue="0", required=false) int dryRun, @RequestParam(value="testFlag", defaultValue="0") int testFlag, @RequestParam(value="complementDependentMode", required=false) ComplementDependentMode complementDependentMode, @RequestParam(value="allLevelDependent", required=false, defaultValue="false") boolean allLevelDependent, @RequestParam(value="executionOrder", required=false) ExecutionOrder executionOrder) {
        switch (execType) {
            case START_PROCESS: {
                WorkflowTriggerRequest workflowTriggerRequest = WorkflowTriggerRequest.builder().loginUser(loginUser).workflowDefinitionCode(workflowDefinitionCode).startNodes(startNodeList).failureStrategy(failureStrategy).taskDependType(taskDependType).execType(execType).warningType(warningType).warningGroupId(warningGroupId).workflowInstancePriority(workflowInstancePriority).workerGroup(workerGroup).tenantCode(tenantCode).environmentCode(environmentCode).startParamList(startParams).dryRun(Flag.of((int)dryRun)).testFlag(Flag.of((int)testFlag)).build();
                return Result.success(Lists.newArrayList((Object[])new Integer[]{this.execService.triggerWorkflowDefinition(workflowTriggerRequest)}));
            }
            case COMPLEMENT_DATA: {
                WorkflowBackFillRequest workflowBackFillRequest = WorkflowBackFillRequest.builder().loginUser(loginUser).workflowDefinitionCode(workflowDefinitionCode).startNodes(startNodeList).failureStrategy(failureStrategy).taskDependType(taskDependType).execType(execType).warningType(warningType).warningGroupId(warningGroupId).backfillRunMode(runMode).workflowInstancePriority(workflowInstancePriority).workerGroup(workerGroup).tenantCode(tenantCode).environmentCode(environmentCode).startParamList(startParams).dryRun(Flag.of((int)dryRun)).testFlag(Flag.of((int)testFlag)).backfillTime(WorkflowUtils.parseBackfillTime(scheduleTime)).expectedParallelismNumber(expectedParallelismNumber).backfillDependentMode(complementDependentMode).allLevelDependent(allLevelDependent).executionOrder(executionOrder).build();
                return Result.success(this.execService.backfillWorkflowDefinition(workflowBackFillRequest));
            }
        }
        throw new ServiceException("The execType: " + execType + " is invalid");
    }

    @Operation(summary="batchStartWorkflowInstance", description="BATCH_RUN_WORKFLOW_INSTANCE_NOTES")
    @Parameters(value={@Parameter(name="workflowDefinitionCodes", description="WORKFLOW_DEFINITION_CODE_LIST", required=true, schema=@Schema(implementation=String.class, example="1,2,3")), @Parameter(name="scheduleTime", description="SCHEDULE_TIME", required=true, schema=@Schema(implementation=String.class, example="2022-04-06 00:00:00,2022-04-06 00:00:00")), @Parameter(name="failureStrategy", description="FAILURE_STRATEGY", required=true, schema=@Schema(implementation=FailureStrategy.class)), @Parameter(name="startNodeList", description="START_NODE_LIST", schema=@Schema(implementation=String.class)), @Parameter(name="taskDependType", description="TASK_DEPEND_TYPE", schema=@Schema(implementation=TaskDependType.class)), @Parameter(name="execType", description="COMMAND_TYPE", schema=@Schema(implementation=CommandType.class)), @Parameter(name="warningType", description="WARNING_TYPE", required=true, schema=@Schema(implementation=WarningType.class)), @Parameter(name="warningGroupId", description="WARNING_GROUP_ID", required=true, schema=@Schema(implementation=int.class, example="100")), @Parameter(name="runMode", description="RUN_MODE", schema=@Schema(implementation=RunMode.class)), @Parameter(name="workflowInstancePriority", description="WORKFLOW_INSTANCE_PRIORITY", required=true, schema=@Schema(implementation=Priority.class)), @Parameter(name="workerGroup", description="WORKER_GROUP", schema=@Schema(implementation=String.class, example="default")), @Parameter(name="tenantCode", description="TENANT_CODE", schema=@Schema(implementation=String.class, example="default")), @Parameter(name="environmentCode", description="ENVIRONMENT_CODE", schema=@Schema(implementation=Long.class, example="-1")), @Parameter(name="expectedParallelismNumber", description="EXPECTED_PARALLELISM_NUMBER", schema=@Schema(implementation=int.class, example="8")), @Parameter(name="dryRun", description="DRY_RUN", schema=@Schema(implementation=int.class, example="0")), @Parameter(name="testFlag", description="TEST_FLAG", schema=@Schema(implementation=int.class, example="0")), @Parameter(name="complementDependentMode", description="COMPLEMENT_DEPENDENT_MODE", schema=@Schema(implementation=ComplementDependentMode.class)), @Parameter(name="allLevelDependent", description="ALL_LEVEL_DEPENDENT", schema=@Schema(implementation=boolean.class, example="false")), @Parameter(name="executionOrder", description="EXECUTION_ORDER", schema=@Schema(implementation=ExecutionOrder.class))})
    @PostMapping(value={"batch-start-workflow-instance"})
    @ResponseStatus(value=HttpStatus.OK)
    @ApiException(value=Status.BATCH_START_WORKFLOW_INSTANCE_ERROR)
    @OperatorLog(auditType=AuditType.WORKFLOW_BATCH_START)
    public Result<List<Integer>> batchTriggerWorkflowDefinitions(@Parameter(hidden=true) @RequestAttribute(value="session.user") User loginUser, @RequestParam(value="workflowDefinitionCodes") String workflowDefinitionCodes, @RequestParam(value="scheduleTime") String scheduleTime, @RequestParam(value="failureStrategy") FailureStrategy failureStrategy, @RequestParam(value="startNodeList", required=false) String startNodeList, @RequestParam(value="taskDependType", required=false) TaskDependType taskDependType, @RequestParam(value="execType", required=false) CommandType execType, @RequestParam(value="warningType") WarningType warningType, @RequestParam(value="warningGroupId", required=false) Integer warningGroupId, @RequestParam(value="runMode", required=false) RunMode runMode, @RequestParam(value="workflowInstancePriority", required=false) Priority workflowInstancePriority, @RequestParam(value="workerGroup", required=false, defaultValue="default") String workerGroup, @RequestParam(value="tenantCode", required=false, defaultValue="default") String tenantCode, @RequestParam(value="environmentCode", required=false, defaultValue="-1") Long environmentCode, @RequestParam(value="startParams", required=false) String startParams, @RequestParam(value="expectedParallelismNumber", required=false) Integer expectedParallelismNumber, @RequestParam(value="dryRun", defaultValue="0", required=false) int dryRun, @RequestParam(value="testFlag", defaultValue="0") int testFlag, @RequestParam(value="complementDependentMode", required=false) ComplementDependentMode complementDependentMode, @RequestParam(value="allLevelDependent", required=false, defaultValue="false") boolean allLevelDependent, @RequestParam(value="executionOrder", required=false) ExecutionOrder executionOrder) {
        List workflowDefinitionCodeList = Arrays.stream(workflowDefinitionCodes.split(",")).map(Long::parseLong).collect(Collectors.toList());
        ArrayList result = new ArrayList();
        for (Long workflowDefinitionCode : workflowDefinitionCodeList) {
            Result<List<Integer>> workflowInstanceIds = this.triggerWorkflowDefinition(loginUser, workflowDefinitionCode, scheduleTime, failureStrategy, startNodeList, taskDependType, execType, warningType, warningGroupId, runMode, workflowInstancePriority, workerGroup, tenantCode, environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, complementDependentMode, allLevelDependent, executionOrder);
            result.addAll(workflowInstanceIds.getData());
        }
        return Result.success(result);
    }

    @Operation(summary="execute", description="EXECUTE_ACTION_TO_WORKFLOW_INSTANCE_NOTES")
    @Parameters(value={@Parameter(name="workflowInstanceId", description="WORKFLOW_INSTANCE_ID", required=true, schema=@Schema(implementation=int.class, example="100")), @Parameter(name="executeType", description="EXECUTE_TYPE", required=true, schema=@Schema(implementation=ExecuteType.class))})
    @PostMapping(value={"/execute"})
    @ResponseStatus(value=HttpStatus.OK)
    @ApiException(value=Status.EXECUTE_WORKFLOW_INSTANCE_ERROR)
    @OperatorLog(auditType=AuditType.WORKFLOW_EXECUTE)
    public Result<Void> controlWorkflowInstance(@Parameter(hidden=true) @RequestAttribute(value="session.user") User loginUser, @RequestParam(value="workflowInstanceId") Integer workflowInstanceId, @RequestParam(value="executeType") ExecuteType executeType) {
        this.execService.controlWorkflowInstance(loginUser, workflowInstanceId, executeType);
        return Result.success();
    }

    @Operation(summary="batchExecute", description="BATCH_EXECUTE_ACTION_TO_WORKFLOW_INSTANCE_NOTES")
    @Parameters(value={@Parameter(name="projectCode", description="PROJECT_CODE", required=true, schema=@Schema(implementation=int.class)), @Parameter(name="workflowInstanceIds", description="WORKFLOW_INSTANCE_IDS", required=true, schema=@Schema(implementation=String.class)), @Parameter(name="executeType", description="EXECUTE_TYPE", required=true, schema=@Schema(implementation=ExecuteType.class))})
    @PostMapping(value={"/batch-execute"})
    @ResponseStatus(value=HttpStatus.OK)
    @ApiException(value=Status.BATCH_EXECUTE_WORKFLOW_INSTANCE_ERROR)
    @OperatorLog(auditType=AuditType.WORKFLOW_BATCH_RERUN)
    public Result<Void> batchControlWorkflowInstance(@RequestAttribute(value="session.user") User loginUser, @RequestParam(value="workflowInstanceIds") String workflowInstanceIds, @RequestParam(value="executeType") ExecuteType executeType) {
        String[] workflowInstanceIdArray = workflowInstanceIds.split(",");
        ArrayList<String> errorMessage = new ArrayList<String>();
        for (String strWorkflowInstanceId : workflowInstanceIdArray) {
            int workflowInstanceId = Integer.parseInt(strWorkflowInstanceId);
            try {
                this.execService.controlWorkflowInstance(loginUser, workflowInstanceId, executeType);
                log.info("Success do action {} on workflowInstance: {}", (Object)executeType, (Object)workflowInstanceId);
            }
            catch (Exception e) {
                errorMessage.add("Failed do action " + (Object)((Object)executeType) + " on workflowInstance: " + workflowInstanceId + "reason: " + e.getMessage());
                log.error("Failed do action {} on workflowInstance: {}, error: {}", new Object[]{executeType, workflowInstanceId, e});
            }
        }
        if (CollectionUtils.isNotEmpty(errorMessage)) {
            throw new ServiceException(String.join((CharSequence)"\n", errorMessage));
        }
        return Result.success();
    }

    @Operation(summary="startTaskInstance", description="RUN_TASK_INSTANCE_NOTES")
    @Parameters(value={@Parameter(name="version", description="VERSION", schema=@Schema(implementation=int.class, example="1")), @Parameter(name="failureStrategy", description="FAILURE_STRATEGY", required=true, schema=@Schema(implementation=FailureStrategy.class)), @Parameter(name="execType", description="COMMAND_TYPE", schema=@Schema(implementation=CommandType.class)), @Parameter(name="warningType", description="WARNING_TYPE", required=true, schema=@Schema(implementation=WarningType.class)), @Parameter(name="warningGroupId", description="WARNING_GROUP_ID", schema=@Schema(implementation=int.class, example="100")), @Parameter(name="workerGroup", description="WORKER_GROUP", schema=@Schema(implementation=String.class, example="default")), @Parameter(name="tenantCode", description="TENANT_CODE", schema=@Schema(implementation=String.class, example="default")), @Parameter(name="environmentCode", description="ENVIRONMENT_CODE", schema=@Schema(implementation=long.class, example="-1")), @Parameter(name="timeout", description="TIMEOUT", schema=@Schema(implementation=int.class, example="100")), @Parameter(name="dryRun", description="DRY_RUN", schema=@Schema(implementation=int.class, example="0"))})
    @PostMapping(value={"/task-instance/{code}/start"})
    @ResponseStatus(value=HttpStatus.OK)
    @ApiException(value=Status.START_TASK_INSTANCE_ERROR)
    @OperatorLog(auditType=AuditType.TASK_START)
    public Result<Boolean> startStreamTaskInstance(@Parameter(hidden=true) @RequestAttribute(value="session.user") User loginUser, @Parameter(name="projectCode", description="PROJECT_CODE", required=true) @PathVariable long projectCode, @Parameter(name="code", description="TASK_CODE", required=true) @PathVariable long code, @RequestParam(value="version", required=true) int version, @RequestParam(value="warningGroupId", required=false, defaultValue="0") Integer warningGroupId, @RequestParam(value="workerGroup", required=false, defaultValue="default") String workerGroup, @RequestParam(value="tenantCode", required=false, defaultValue="default") String tenantCode, @RequestParam(value="environmentCode", required=false, defaultValue="-1") Long environmentCode, @RequestParam(value="startParams", required=false) String startParams, @RequestParam(value="dryRun", defaultValue="0", required=false) int dryRun) {
        Map startParamMap = null;
        if (startParams != null) {
            startParamMap = JSONUtils.toMap((String)startParams);
        }
        log.info("Start to execute stream task instance, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.", new Object[]{projectCode, code, version});
        this.execService.execStreamTaskInstance(loginUser, projectCode, code, version, warningGroupId, workerGroup, tenantCode, environmentCode, startParamMap, dryRun);
        return Result.success(true);
    }

    @Operation(summary="execute-task", description="EXECUTE_ACTION_TO_WORKFLOW_INSTANCE_NOTES")
    @Parameters(value={@Parameter(name="workflowInstanceId", description="WORKFLOW_INSTANCE_ID", required=true, schema=@Schema(implementation=int.class, example="100")), @Parameter(name="startNodeList", description="START_NODE_LIST", required=true, schema=@Schema(implementation=String.class)), @Parameter(name="taskDependType", description="TASK_DEPEND_TYPE", required=true, schema=@Schema(implementation=TaskDependType.class))})
    @PostMapping(value={"/execute-task"})
    @ResponseStatus(value=HttpStatus.OK)
    @ApiException(value=Status.EXECUTE_WORKFLOW_INSTANCE_ERROR)
    @OperatorLog(auditType=AuditType.WORKFLOW_EXECUTE)
    public Result executeTask(@Parameter(hidden=true) @RequestAttribute(value="session.user") User loginUser, @Parameter(name="projectCode", description="PROJECT_CODE", required=true) @PathVariable long projectCode, @RequestParam(value="workflowInstanceId") Integer workflowInstanceId, @RequestParam(value="startNodeList") String startNodeList, @RequestParam(value="taskDependType") TaskDependType taskDependType) {
        log.info("Start to execute task in workflow instance, projectCode:{}, workflowInstanceId:{}.", (Object)projectCode, (Object)workflowInstanceId);
        return this.execService.executeTask(loginUser, projectCode, workflowInstanceId, startNodeList, taskDependType);
    }
}

