/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.api.service.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowBackFillRequest;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowTriggerRequest;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteResponse;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.executor.workflow.ExecutorClient;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO;
import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTOValidator;
import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowRequestTransformer;
import org.apache.dolphinscheduler.api.validator.workflow.TriggerWorkflowDTO;
import org.apache.dolphinscheduler.api.validator.workflow.TriggerWorkflowDTOValidator;
import org.apache.dolphinscheduler.api.validator.workflow.TriggerWorkflowRequestTransformer;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowTaskRelationMapper;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class ExecutorServiceImpl
extends BaseServiceImpl
implements ExecutorService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ExecutorServiceImpl.class);
    @Autowired
    private ProjectService projectService;
    @Autowired
    private WorkflowDefinitionMapper workflowDefinitionMapper;
    @Lazy
    @Autowired
    private ProcessService processService;
    @Autowired
    private WorkflowInstanceDao workflowInstanceDao;
    @Autowired
    private CommandService commandService;
    @Autowired
    private TaskDefinitionLogMapper taskDefinitionLogMapper;
    @Autowired
    private TaskDefinitionMapper taskDefinitionMapper;
    @Autowired
    private WorkflowTaskRelationMapper workflowTaskRelationMapper;
    @Autowired
    private TaskGroupQueueMapper taskGroupQueueMapper;
    @Autowired
    private WorkerGroupService workerGroupService;
    @Autowired
    private TenantMapper tenantMapper;
    @Autowired
    private WorkflowLineageService workflowLineageService;
    @Autowired
    private TriggerWorkflowRequestTransformer triggerWorkflowRequestTransformer;
    @Autowired
    private TriggerWorkflowDTOValidator triggerWorkflowDTOValidator;
    @Autowired
    private BackfillWorkflowRequestTransformer backfillWorkflowRequestTransformer;
    @Autowired
    private BackfillWorkflowDTOValidator backfillWorkflowDTOValidator;
    @Autowired
    private ExecutorClient executorClient;

    @Override
    @Transactional
    public Integer triggerWorkflowDefinition(WorkflowTriggerRequest triggerRequest) {
        TriggerWorkflowDTO triggerWorkflowDTO = this.triggerWorkflowRequestTransformer.transform(triggerRequest);
        this.triggerWorkflowDTOValidator.validate(triggerWorkflowDTO);
        return this.executorClient.triggerWorkflowDefinition().execute(triggerWorkflowDTO);
    }

    @Override
    @Transactional
    public List<Integer> backfillWorkflowDefinition(WorkflowBackFillRequest workflowBackFillRequest) {
        BackfillWorkflowDTO backfillWorkflowDTO = this.backfillWorkflowRequestTransformer.transform(workflowBackFillRequest);
        this.backfillWorkflowDTOValidator.validate(backfillWorkflowDTO);
        return this.executorClient.backfillWorkflowDefinition().execute(backfillWorkflowDTO);
    }

    @Override
    public void checkWorkflowDefinitionValid(long projectCode, WorkflowDefinition workflowDefinition, long workflowDefinitionCode, Integer version) {
        if (projectCode != workflowDefinition.getProjectCode()) {
            throw new ServiceException(Status.WORKFLOW_DEFINITION_NOT_EXIST, workflowDefinition.getCode());
        }
        if (workflowDefinition.getReleaseState() != ReleaseState.ONLINE) {
            throw new ServiceException(Status.WORKFLOW_DEFINITION_NOT_RELEASE, workflowDefinition.getCode(), workflowDefinition.getVersion());
        }
        if (!this.checkSubWorkflowDefinitionValid(workflowDefinition)) {
            throw new ServiceException(Status.SUB_WORKFLOW_DEFINITION_NOT_RELEASE);
        }
    }

    @Override
    public boolean checkSubWorkflowDefinitionValid(WorkflowDefinition workflowDefinition) {
        List workflowTaskRelations = this.workflowTaskRelationMapper.queryDownstreamByWorkflowDefinitionCode(workflowDefinition.getCode());
        if (workflowTaskRelations.isEmpty()) {
            return true;
        }
        Set relationCodes = workflowTaskRelations.stream().map(WorkflowTaskRelation::getPostTaskCode).collect(Collectors.toSet());
        List taskDefinitions = this.taskDefinitionMapper.queryByCodeList(relationCodes);
        HashSet workflowDefinitionCodeSet = new HashSet();
        taskDefinitions.stream().filter(task -> TaskTypeUtils.isSubWorkflowTask((String)task.getTaskType())).forEach(taskDefinition -> workflowDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString((String)taskDefinition.getTaskParams(), (String)"processDefinitionCode"))));
        if (workflowDefinitionCodeSet.isEmpty()) {
            return true;
        }
        List workflowDefinitions = this.workflowDefinitionMapper.queryByCodes(workflowDefinitionCodeSet);
        return workflowDefinitions.stream().filter(definition -> definition.getReleaseState().equals((Object)ReleaseState.OFFLINE)).collect(Collectors.toSet()).isEmpty();
    }

    private void checkValidTenant(String tenantCode) {
        Tenant tenant;
        if (!"default".equals(tenantCode) && (tenant = this.tenantMapper.queryByTenantCode(tenantCode)) == null) {
            throw new ServiceException(Status.TENANT_NOT_EXIST, tenantCode);
        }
    }

    @Override
    public void controlWorkflowInstance(User loginUser, Integer workflowInstanceId, ExecuteType executeType) {
        Preconditions.checkNotNull((Object)workflowInstanceId, (Object)"workflowInstanceId cannot be null");
        Preconditions.checkNotNull((Object)((Object)executeType), (Object)"executeType cannot be null");
        WorkflowInstance workflowInstance = (WorkflowInstance)this.workflowInstanceDao.queryOptionalById((Serializable)workflowInstanceId).orElseThrow(() -> new ServiceException(Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId));
        this.projectService.checkProjectAndAuthThrowException(loginUser, workflowInstance.getProjectCode(), ApiFuncIdentificationConstant.map.get((Object)executeType));
        switch (executeType) {
            case REPEAT_RUNNING: {
                this.executorClient.repeatRunningWorkflowInstance().onWorkflowInstance(workflowInstance).byUser(loginUser).execute();
                return;
            }
            case START_FAILURE_TASK_PROCESS: {
                this.executorClient.recoverFailureTaskInstance().onWorkflowInstance(workflowInstance).byUser(loginUser).execute();
                return;
            }
            case RECOVER_SUSPENDED_PROCESS: {
                this.executorClient.recoverSuspendedWorkflowInstanceOperation().onWorkflowInstance(workflowInstance).byUser(loginUser).execute();
                return;
            }
            case PAUSE: {
                this.executorClient.pauseWorkflowInstance().onWorkflowInstance(workflowInstance).byUser(loginUser).execute();
                return;
            }
            case STOP: {
                this.executorClient.stopWorkflowInstance().onWorkflowInstance(workflowInstance).byUser(loginUser).execute();
                return;
            }
        }
        throw new ServiceException("Unsupported executeType: " + (Object)((Object)executeType));
    }

    @Override
    public WorkflowExecuteResponse executeTask(User loginUser, long projectCode, Integer workflowInstanceId, String startNodeList, TaskDependType taskDependType) {
        long startNodeListLong;
        WorkflowExecuteResponse response = new WorkflowExecuteResponse();
        this.projectService.checkProjectAndAuthThrowException(loginUser, projectCode, ApiFuncIdentificationConstant.map.get((Object)ExecuteType.EXECUTE_TASK));
        WorkflowInstance workflowInstance = (WorkflowInstance)this.processService.findWorkflowInstanceDetailById(workflowInstanceId.intValue()).orElseThrow(() -> new ServiceException(Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId));
        if (!workflowInstance.getState().isFinished()) {
            log.error("Can not execute task for workflow instance which is not finished, workflowInstanceId:{}.", (Object)workflowInstanceId);
            this.putMsg((Result)response, Status.WORKFLOW_INSTANCE_IS_NOT_FINISHED, new Object[0]);
            return response;
        }
        WorkflowDefinition workflowDefinition = this.processService.findWorkflowDefinition(workflowInstance.getWorkflowDefinitionCode(), workflowInstance.getWorkflowDefinitionVersion());
        workflowDefinition.setReleaseState(ReleaseState.ONLINE);
        this.checkWorkflowDefinitionValid(projectCode, workflowDefinition, workflowInstance.getWorkflowDefinitionCode(), workflowInstance.getWorkflowDefinitionVersion());
        try {
            startNodeListLong = Long.parseLong(startNodeList);
        }
        catch (NumberFormatException e) {
            log.error("startNodeList is not a number");
            this.putMsg((Result)response, Status.REQUEST_PARAMS_NOT_VALID_ERROR, new Object[]{startNodeList});
            return response;
        }
        if (this.taskDefinitionLogMapper.queryMaxVersionForDefinition(startNodeListLong) == null) {
            this.putMsg((Result)response, Status.EXECUTE_NOT_DEFINE_TASK, new Object[0]);
            return response;
        }
        HashMap<String, Object> cmdParam = new HashMap<String, Object>();
        cmdParam.put("ProcessInstanceId", workflowInstanceId);
        cmdParam.put("StartNodeList", startNodeList);
        Command command = new Command();
        command.setCommandType(CommandType.EXECUTE_TASK);
        command.setWorkflowDefinitionCode(workflowDefinition.getCode());
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        command.setExecutorId(loginUser.getId().intValue());
        command.setWorkflowDefinitionVersion(workflowDefinition.getVersion());
        command.setWorkflowInstanceId(workflowInstanceId.intValue());
        command.setTestFlag(workflowInstance.getTestFlag());
        command.setTaskDependType(taskDependType);
        if (!this.commandService.verifyIsNeedCreateCommand(command)) {
            log.warn("workflow instance is executing the command, workflowDefinitionCode:{}, workflowDefinitionVersion:{}, workflowInstanceId:{}.", new Object[]{workflowDefinition.getCode(), workflowDefinition.getVersion(), workflowInstanceId});
            this.putMsg((Result)response, Status.WORKFLOW_INSTANCE_EXECUTING_COMMAND, new Object[]{String.valueOf(workflowDefinition.getCode())});
            return response;
        }
        log.info("Creating command, commandInfo:{}.", (Object)command);
        int create = this.commandService.createCommand(command);
        if (create > 0) {
            log.info("Create {} command complete, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.", new Object[]{command.getCommandType().getDescp(), command.getWorkflowDefinitionCode(), workflowDefinition.getVersion()});
            this.putMsg((Result)response, Status.SUCCESS, new Object[0]);
        } else {
            log.error("Execute workflow instance failed because create {} command error, workflowDefinitionCode:{}, workflowDefinitionVersion:{}\uff0c workflowInstanceId:{}.", new Object[]{command.getCommandType().getDescp(), command.getWorkflowDefinitionCode(), workflowDefinition.getVersion(), workflowInstanceId});
            this.putMsg((Result)response, Status.EXECUTE_WORKFLOW_INSTANCE_ERROR, new Object[0]);
        }
        return response;
    }

    @Override
    public Map<String, Object> forceStartTaskInstance(User loginUser, int queueId) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        TaskGroupQueue taskGroupQueue = (TaskGroupQueue)this.taskGroupQueueMapper.selectById((Serializable)Integer.valueOf(queueId));
        this.workflowInstanceDao.queryOptionalById((Serializable)taskGroupQueue.getWorkflowInstanceId()).orElseThrow(() -> new ServiceException(Status.WORKFLOW_INSTANCE_NOT_EXIST, taskGroupQueue.getWorkflowInstanceId()));
        if (taskGroupQueue.getInQueue() == Flag.NO.getCode()) {
            throw new ServiceException(Status.TASK_GROUP_QUEUE_ALREADY_START);
        }
        taskGroupQueue.setForceStart(Flag.YES.getCode());
        taskGroupQueue.setUpdateTime(new Date());
        this.taskGroupQueueMapper.updateById((Object)taskGroupQueue);
        result.put("status", (Object)Status.SUCCESS);
        return result;
    }

    private int createComplementCommand(Long triggerCode, Command command, Map<String, String> cmdParam, List<ZonedDateTime> dateTimeList, List<Schedule> schedules, ComplementDependentMode complementDependentMode, boolean allLevelDependent) {
        String dateTimeListStr = dateTimeList.stream().map(item -> DateUtils.dateToString((ZonedDateTime)item)).collect(Collectors.joining(","));
        cmdParam.put("complementScheduleDateList", dateTimeListStr);
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        log.info("Creating command, commandInfo:{}.", (Object)command);
        int createCount = this.commandService.createCommand(command);
        if (createCount > 0) {
            log.info("Create {} command complete, workflowDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getWorkflowDefinitionCode());
        } else {
            log.error("Create {} command error, workflowDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getWorkflowDefinitionCode());
        }
        if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
            log.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, workflowDefinitionCode:{}.", (Object)command.getWorkflowDefinitionCode());
        } else {
            log.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, workflowDefinitionCode:{}.", (Object)command.getWorkflowDefinitionCode());
            this.createComplementDependentCommand(schedules, command, allLevelDependent);
        }
        return createCount;
    }

    protected int createComplementCommandList(Long triggerCode, String scheduleTimeParam, RunMode runMode, Command command, Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode, boolean allLevelDependent, ExecutionOrder executionOrder) throws CronParseException {
        String dateList;
        int createCount = 0;
        int dependentWorkflowDefinitionCreateCount = 0;
        runMode = runMode == null ? RunMode.RUN_MODE_SERIAL : runMode;
        Map cmdParam = JSONUtils.toMap((String)command.getCommandParam());
        Map scheduleParam = JSONUtils.toMap((String)scheduleTimeParam);
        if (Objects.isNull(executionOrder)) {
            executionOrder = ExecutionOrder.DESC_ORDER;
        }
        List schedules = this.processService.queryReleaseSchedulerListByWorkflowDefinitionCode(command.getWorkflowDefinitionCode());
        List<Object> listDate = new ArrayList();
        if (scheduleParam.containsKey("complementStartDate") && scheduleParam.containsKey("complementEndDate")) {
            String startDate = (String)scheduleParam.get("complementStartDate");
            String endDate = (String)scheduleParam.get("complementEndDate");
            if (startDate != null && endDate != null) {
                listDate = CronUtils.getSelfFireDateList((ZonedDateTime)DateUtils.stringToZoneDateTime((String)startDate), (ZonedDateTime)DateUtils.stringToZoneDateTime((String)endDate), (List)schedules);
            }
        }
        if (scheduleParam.containsKey("complementScheduleDateList") && StringUtils.isNotBlank((CharSequence)(dateList = (String)scheduleParam.get("complementScheduleDateList")))) {
            listDate = Splitter.on((String)",").splitToStream((CharSequence)dateList).map(item -> DateUtils.stringToZoneDateTime((String)item.trim())).distinct().collect(Collectors.toList());
        }
        if (CollectionUtils.isEmpty(listDate)) {
            throw new ServiceException(Status.TASK_COMPLEMENT_DATA_DATE_ERROR);
        }
        if (executionOrder.equals((Object)ExecutionOrder.DESC_ORDER)) {
            Collections.sort(listDate, Collections.reverseOrder());
        } else {
            Collections.sort(listDate);
        }
        switch (runMode) {
            case RUN_MODE_SERIAL: {
                log.info("RunMode of {} command is serial run, workflowDefinitionCode:{}.", (Object)command.getCommandType().getDescp(), (Object)command.getWorkflowDefinitionCode());
                createCount = this.createComplementCommand(triggerCode, command, cmdParam, listDate, schedules, complementDependentMode, allLevelDependent);
                break;
            }
            case RUN_MODE_PARALLEL: {
                log.info("RunMode of {} command is parallel run, workflowDefinitionCode:{}.", (Object)command.getCommandType().getDescp(), (Object)command.getWorkflowDefinitionCode());
                int queueNum = 0;
                if (!CollectionUtils.isNotEmpty(listDate)) break;
                queueNum = listDate.size();
                if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
                    queueNum = Math.min(queueNum, expectedParallelismNumber);
                }
                log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", (Object)queueNum);
                List[] queues = new List[queueNum];
                for (int i = 0; i < listDate.size(); ++i) {
                    if (Objects.isNull(queues[i % queueNum])) {
                        queues[i % queueNum] = new ArrayList();
                    }
                    queues[i % queueNum].add(listDate.get(i));
                }
                for (List queue : queues) {
                    createCount = this.createComplementCommand(triggerCode, command, cmdParam, queue, schedules, complementDependentMode, allLevelDependent);
                }
                break;
            }
        }
        log.info("Create complement command count:{}, Create dependent complement command count:{}", (Object)createCount, (Object)dependentWorkflowDefinitionCreateCount);
        return createCount;
    }

    public int createComplementDependentCommand(List<Schedule> schedules, Command command, boolean allLevelDependent) {
        Command dependentCommand;
        int dependentWorkflowDefinitionCreateCount = 0;
        try {
            dependentCommand = (Command)BeanUtils.cloneBean((Object)command);
        }
        catch (Exception e) {
            log.error("Copy dependent command error.", (Throwable)e);
            return dependentWorkflowDefinitionCreateCount;
        }
        List<DependentWorkflowDefinition> dependentWorkflowDefinitionList = this.getComplementDependentDefinitionList(dependentCommand.getWorkflowDefinitionCode(), CronUtils.getMaxCycle((String)schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup(), allLevelDependent);
        dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
        for (DependentWorkflowDefinition dependentWorkflowDefinition : dependentWorkflowDefinitionList) {
            dependentCommand.setId(null);
            dependentCommand.setWorkflowDefinitionCode(dependentWorkflowDefinition.getWorkflowDefinitionCode());
            dependentCommand.setWorkflowDefinitionVersion(dependentWorkflowDefinition.getWorkflowDefinitionVersion());
            dependentCommand.setWorkerGroup(dependentWorkflowDefinition.getWorkerGroup());
            Map cmdParam = JSONUtils.toMap((String)dependentCommand.getCommandParam());
            cmdParam.put("StartNodeList", String.valueOf(dependentWorkflowDefinition.getTaskDefinitionCode()));
            dependentCommand.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
            log.info("Creating complement dependent command, commandInfo:{}.", (Object)command);
            dependentWorkflowDefinitionCreateCount += this.commandService.createCommand(dependentCommand);
        }
        return dependentWorkflowDefinitionCreateCount;
    }

    private List<DependentWorkflowDefinition> getComplementDependentDefinitionList(long workflowDefinitionCode, CycleEnum workflowDefinitionCycle, String workerGroup, boolean allLevelDependent) {
        List<DependentWorkflowDefinition> dependentWorkflowDefinitionList = this.checkDependentWorkflowDefinitionValid(this.workflowLineageService.queryDownstreamDependentWorkflowDefinitions(workflowDefinitionCode), workflowDefinitionCycle, workerGroup, workflowDefinitionCode);
        if (dependentWorkflowDefinitionList.isEmpty()) {
            return dependentWorkflowDefinitionList;
        }
        if (allLevelDependent) {
            List childDependentList;
            ArrayList<Object> childList = new ArrayList<DependentWorkflowDefinition>(dependentWorkflowDefinitionList);
            while (!(childDependentList = childList.stream().flatMap(dependentWorkflowDefinition -> this.checkDependentWorkflowDefinitionValid(this.workflowLineageService.queryDownstreamDependentWorkflowDefinitions(dependentWorkflowDefinition.getWorkflowDefinitionCode()), workflowDefinitionCycle, workerGroup, dependentWorkflowDefinition.getWorkflowDefinitionCode()).stream()).collect(Collectors.toList())).isEmpty()) {
                dependentWorkflowDefinitionList.addAll(childDependentList);
                childList = new ArrayList(childDependentList);
            }
        }
        return dependentWorkflowDefinitionList;
    }

    private List<DependentWorkflowDefinition> checkDependentWorkflowDefinitionValid(List<DependentWorkflowDefinition> dependentWorkflowDefinitionList, CycleEnum workflowDefinitionCycle, String workerGroup, long upstreamWorkflowDefinitionCode) {
        ArrayList<DependentWorkflowDefinition> validDependentWorkflowDefinitionList = new ArrayList<DependentWorkflowDefinition>();
        List<Long> workflowDefinitionCodeList = dependentWorkflowDefinitionList.stream().map(DependentWorkflowDefinition::getWorkflowDefinitionCode).collect(Collectors.toList());
        Map<Long, String> workflowDefinitionWorkerGroupMap = this.workerGroupService.queryWorkerGroupByWorkflowDefinitionCodes(workflowDefinitionCodeList);
        for (DependentWorkflowDefinition dependentWorkflowDefinition : dependentWorkflowDefinitionList) {
            if (dependentWorkflowDefinition.getDependentCycle(upstreamWorkflowDefinitionCode) != workflowDefinitionCycle) continue;
            if (workflowDefinitionWorkerGroupMap.get(dependentWorkflowDefinition.getWorkflowDefinitionCode()) == null) {
                dependentWorkflowDefinition.setWorkerGroup(workerGroup);
            }
            validDependentWorkflowDefinitionList.add(dependentWorkflowDefinition);
        }
        return validDependentWorkflowDefinitionList;
    }

    private boolean isValidateScheduleTime(String schedule) {
        Map scheduleResult = JSONUtils.toMap((String)schedule);
        if (scheduleResult == null) {
            return false;
        }
        if (scheduleResult.containsKey("complementScheduleDateList") && scheduleResult.get("complementScheduleDateList") == null) {
            return false;
        }
        if (scheduleResult.containsKey("complementStartDate")) {
            String startDate = (String)scheduleResult.get("complementStartDate");
            String endDate = (String)scheduleResult.get("complementEndDate");
            if (startDate == null || endDate == null) {
                return false;
            }
            try {
                ZonedDateTime start = DateUtils.stringToZoneDateTime((String)startDate);
                ZonedDateTime end = DateUtils.stringToZoneDateTime((String)endDate);
                if (start == null || end == null) {
                    return false;
                }
                if (start.isAfter(end)) {
                    log.error("Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.", (Object)start, (Object)end);
                    return false;
                }
            }
            catch (Exception ex) {
                log.warn("Parse schedule time error, startDate:{}, endDate:{}.", (Object)startDate, (Object)endDate);
                return false;
            }
        }
        return true;
    }

    @Override
    public void execStreamTaskInstance(User loginUser, long projectCode, long taskDefinitionCode, int taskDefinitionVersion, int warningGroupId, String workerGroup, String tenantCode, Long environmentCode, Map<String, String> startParams, int dryRun) {
        throw new ServiceException("Not supported");
    }
}

