/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.server.core.workflow;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.helper.StatusMappingHelper;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.core.service.WorkflowNodeHandleService;
import tech.powerjob.server.core.uid.IdGenerateService;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.extension.defaultimpl.alarm.AlarmCenter;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.defaultimpl.alarm.module.WorkflowInstanceAlarm;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;

@Service
public class WorkflowInstanceManager {
    private static final Logger log = LoggerFactory.getLogger(WorkflowInstanceManager.class);
    private final AlarmCenter alarmCenter;
    private final IdGenerateService idGenerateService;
    private final JobInfoRepository jobInfoRepository;
    private final UserService userService;
    private final WorkflowInfoRepository workflowInfoRepository;
    private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
    private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
    private final WorkflowNodeHandleService workflowNodeHandleService;

    public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime, Long parentWfInstanceId) {
        Long wfId = wfInfo.getId();
        Long wfInstanceId = this.idGenerateService.allocate();
        WorkflowInstanceInfoDO newWfInstance = this.constructWfInstance(wfInfo, initParams, expectTriggerTime, wfId, wfInstanceId);
        if (parentWfInstanceId != null) {
            newWfInstance.setParentWfInstanceId(parentWfInstanceId);
            newWfInstance.setWfContext(initParams);
        }
        PEWorkflowDAG dag = null;
        try {
            dag = (PEWorkflowDAG)JSON.parseObject((String)wfInfo.getPeDAG(), PEWorkflowDAG.class);
            if (!WorkflowDAGUtils.valid(dag)) {
                log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", (Object)wfId, (Object)wfInstanceId);
                throw new PowerJobException("invalid dag");
            }
            this.initNodeInfo(dag);
            HashSet allJobIds = Sets.newHashSet();
            dag.getNodes().forEach(node -> {
                if (node.getNodeType().intValue() == WorkflowNodeType.JOB.getCode()) {
                    allJobIds.add(node.getJobId());
                }
                node.setStatus(Integer.valueOf(InstanceStatus.WAITING_DISPATCH.getV()));
            });
            int needNum = allJobIds.size();
            long dbNum = this.jobInfoRepository.countByAppIdAndStatusInAndIdIn(wfInfo.getAppId(), (Set)Sets.newHashSet((Object[])new Integer[]{SwitchableStatus.ENABLE.getV(), SwitchableStatus.DISABLE.getV()}), (Set)allJobIds);
            log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", new Object[]{wfId, wfInstanceId, needNum, dbNum});
            if (dbNum < (long)allJobIds.size()) {
                log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", new Object[]{wfId, wfInstanceId, needNum, dbNum});
                throw new PowerJobException("can't find some job");
            }
            newWfInstance.setDag(JSON.toJSONString((Object)dag));
            this.workflowInstanceInfoRepository.saveAndFlush((Object)newWfInstance);
        }
        catch (Exception e) {
            if (dag != null) {
                newWfInstance.setDag(JSON.toJSONString((Object)dag));
            }
            this.handleWfInstanceFinalStatus(newWfInstance, e.getMessage(), WorkflowInstanceStatus.FAILED);
        }
        return wfInstanceId;
    }

    private void initNodeInfo(PEWorkflowDAG dag) {
        for (PEWorkflowDAG.Node node : dag.getNodes()) {
            WorkflowNodeInfoDO workflowNodeInfo = (WorkflowNodeInfoDO)this.workflowNodeInfoRepository.findById((Object)node.getNodeId()).orElseThrow(() -> new PowerJobException("can't find some node"));
            if (workflowNodeInfo.getType() == null) {
                workflowNodeInfo.setType(Integer.valueOf(WorkflowNodeType.JOB.getCode()));
            }
            node.setNodeType(workflowNodeInfo.getType()).setJobId(workflowNodeInfo.getJobId()).setNodeName(workflowNodeInfo.getNodeName()).setNodeParams(workflowNodeInfo.getNodeParams()).setEnable(workflowNodeInfo.getEnable()).setSkipWhenFailed(workflowNodeInfo.getSkipWhenFailed());
            if (node.getNodeType().intValue() != WorkflowNodeType.JOB.getCode()) continue;
            if (workflowNodeInfo.getJobId() == null) {
                throw new PowerJobException("illegal node info");
            }
            JobInfoDO jobInfo = (JobInfoDO)this.jobInfoRepository.findById((Object)workflowNodeInfo.getJobId()).orElseThrow(() -> new PowerJobException("can't find some job"));
            if (!StringUtils.isBlank((CharSequence)workflowNodeInfo.getNodeParams())) {
                node.setNodeParams(workflowNodeInfo.getNodeParams());
                continue;
            }
            node.setNodeParams(jobInfo.getJobParams());
        }
    }

    private WorkflowInstanceInfoDO constructWfInstance(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime, Long wfId, Long wfInstanceId) {
        Date now = new Date();
        WorkflowInstanceInfoDO newWfInstance = new WorkflowInstanceInfoDO();
        newWfInstance.setAppId(wfInfo.getAppId());
        newWfInstance.setWfInstanceId(wfInstanceId);
        newWfInstance.setWorkflowId(wfId);
        newWfInstance.setStatus(Integer.valueOf(WorkflowInstanceStatus.WAITING.getV()));
        newWfInstance.setExpectedTriggerTime(expectTriggerTime);
        newWfInstance.setActualTriggerTime(Long.valueOf(System.currentTimeMillis()));
        newWfInstance.setWfInitParams(initParams);
        boolean injectDirect = false;
        try {
            Map parseRes = (Map)JSON.parseObject((String)initParams, (TypeReference)new TypeReference<Map<String, String>>(){}, (Feature[])new Feature[0]);
            if (parseRes != null && !parseRes.isEmpty()) {
                injectDirect = true;
            }
        }
        catch (Exception parseRes) {
            // empty catch block
        }
        if (injectDirect) {
            newWfInstance.setWfContext(initParams);
        } else {
            HashMap wfContextMap = Maps.newHashMap();
            wfContextMap.put("initParams", initParams);
            newWfInstance.setWfContext(JsonUtils.toJSONString((Object)wfContextMap));
        }
        newWfInstance.setGmtCreate(now);
        newWfInstance.setGmtModified(now);
        return newWfInstance;
    }

    @UseCacheLock(type="processWfInstance", key="#wfInfo.getMaxWfInstanceNum() > 0 ? #wfInfo.getId() : #wfInstanceId", concurrencyLevel=1024)
    public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) {
        int instanceConcurrency;
        Optional wfInstanceInfoOpt = this.workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
        if (!wfInstanceInfoOpt.isPresent()) {
            log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", (Object)wfInstanceId);
            return;
        }
        WorkflowInstanceInfoDO wfInstanceInfo = (WorkflowInstanceInfoDO)wfInstanceInfoOpt.get();
        if (wfInstanceInfo.getStatus().intValue() != WorkflowInstanceStatus.WAITING.getV()) {
            log.info("[Workflow-{}|{}] workflowInstance({}) needn't running any more.", new Object[]{wfInfo.getId(), wfInstanceId, wfInstanceInfo});
            return;
        }
        if (wfInfo.getMaxWfInstanceNum() > 0 && (instanceConcurrency = this.workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS)) > wfInfo.getMaxWfInstanceNum()) {
            this.handleWfInstanceFinalStatus(wfInstanceInfo, String.format("too many instances(%d>%d)", instanceConcurrency, wfInfo.getMaxWfInstanceNum()), WorkflowInstanceStatus.FAILED);
            return;
        }
        try {
            PEWorkflowDAG dag = (PEWorkflowDAG)JSON.parseObject((String)wfInstanceInfo.getDag(), PEWorkflowDAG.class);
            List<PEWorkflowDAG.Node> readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
            List<PEWorkflowDAG.Node> controlNodes = this.findControlNodes(readyNodes);
            while (!controlNodes.isEmpty()) {
                this.workflowNodeHandleService.handleControlNodes(controlNodes, dag, wfInstanceInfo);
                readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
                controlNodes = this.findControlNodes(readyNodes);
            }
            if (readyNodes.isEmpty()) {
                wfInstanceInfo.setFinishedTime(Long.valueOf(System.currentTimeMillis()));
                wfInstanceInfo.setDag(JSON.toJSONString((Object)dag));
                log.warn("[Workflow-{}|{}] workflowInstance({}) needn't running ", new Object[]{wfInfo.getId(), wfInstanceId, wfInstanceInfo});
                this.handleWfInstanceFinalStatus(wfInstanceInfo, "no enabled nodes", WorkflowInstanceStatus.SUCCEED);
                return;
            }
            wfInstanceInfo.setStatus(Integer.valueOf(WorkflowInstanceStatus.RUNNING.getV()));
            this.workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstanceInfo);
            log.info("[Workflow-{}|{}] start workflow successfully", (Object)wfInfo.getId(), (Object)wfInstanceId);
        }
        catch (Exception e) {
            log.error("[Workflow-{}|{}] start workflow: {} failed.", new Object[]{wfInfo.getId(), wfInstanceId, wfInfo, e});
            this.handleWfInstanceFinalStatus(wfInstanceInfo, e.getMessage(), WorkflowInstanceStatus.FAILED);
        }
    }

    @UseCacheLock(type="processWfInstance", key="#wfInstanceId", concurrencyLevel=1024)
    public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) {
        Optional wfInstanceInfoOpt = this.workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
        if (!wfInstanceInfoOpt.isPresent()) {
            log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", (Object)wfInstanceId);
            return;
        }
        WorkflowInstanceInfoDO wfInstance = (WorkflowInstanceInfoDO)wfInstanceInfoOpt.get();
        Long wfId = wfInstance.getWorkflowId();
        if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
            return;
        }
        try {
            PEWorkflowDAG dag = (PEWorkflowDAG)JSON.parseObject((String)wfInstance.getDag(), PEWorkflowDAG.class);
            boolean allFinished = true;
            PEWorkflowDAG.Node instanceNode = null;
            for (PEWorkflowDAG.Node node : dag.getNodes()) {
                if (instanceId.equals(node.getInstanceId())) {
                    node.setStatus(Integer.valueOf(status.getV()));
                    node.setResult(result);
                    node.setFinishedTime(CommonUtils.formatTime((Long)System.currentTimeMillis()));
                    instanceNode = node;
                    log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", new Object[]{wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result});
                }
                if (!InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) continue;
                allFinished = false;
            }
            if (instanceNode == null) {
                log.warn("[Workflow-{}|{}] current job instance(instanceId={}) is dissociative! it will be ignore! ", new Object[]{wfId, wfInstanceId, instanceId});
                return;
            }
            wfInstance.setGmtModified(new Date());
            wfInstance.setDag(JSON.toJSONString((Object)dag));
            if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
                this.workflowInstanceInfoRepository.saveAndFlush((Object)wfInstance);
                log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", new Object[]{wfId, wfInstanceId, wfInstance.getStatus()});
                return;
            }
            if (status == InstanceStatus.FAILED && WorkflowDAGUtils.isNotAllowSkipWhenFailed(instanceNode)) {
                log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", new Object[]{wfId, wfInstanceId, instanceId});
                this.handleWfInstanceFinalStatus(wfInstance, "middle job failed", WorkflowInstanceStatus.FAILED);
                return;
            }
            if (status == InstanceStatus.STOPPED) {
                this.handleWfInstanceFinalStatus(wfInstance, "middle job stopped by user", WorkflowInstanceStatus.STOPPED);
                log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", new Object[]{wfId, wfInstanceId, instanceId});
                return;
            }
            List<PEWorkflowDAG.Node> readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
            if (readyNodes.isEmpty() && this.isFinish(dag)) {
                allFinished = true;
            }
            if (allFinished) {
                wfInstance.setDag(JSON.toJSONString((Object)dag));
                this.handleWfInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED);
                log.info("[Workflow-{}|{}] process successfully.", (Object)wfId, (Object)wfInstanceId);
                return;
            }
            List<PEWorkflowDAG.Node> controlNodes = this.findControlNodes(readyNodes);
            while (!controlNodes.isEmpty()) {
                this.workflowNodeHandleService.handleControlNodes(controlNodes, dag, wfInstance);
                readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
                controlNodes = this.findControlNodes(readyNodes);
            }
            if (readyNodes.isEmpty()) {
                if (this.isFinish(dag)) {
                    wfInstance.setDag(JSON.toJSONString((Object)dag));
                    this.handleWfInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED);
                    log.info("[Workflow-{}|{}] process successfully.", (Object)wfId, (Object)wfInstanceId);
                    return;
                }
                wfInstance.setDag(JSON.toJSONString((Object)dag));
                this.workflowInstanceInfoRepository.saveAndFlush((Object)wfInstance);
                return;
            }
            this.workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstance);
        }
        catch (Exception e) {
            this.handleWfInstanceFinalStatus(wfInstance, "MOVE NEXT STEP FAILED: " + e.getMessage(), WorkflowInstanceStatus.FAILED);
            log.error("[Workflow-{}|{}] update failed.", new Object[]{wfId, wfInstanceId, e});
        }
    }

    @UseCacheLock(type="processWfInstance", key="#wfInstanceId", concurrencyLevel=1024)
    public void updateWorkflowContext(Long wfInstanceId, Map<String, String> appendedWfContextData) {
        try {
            Optional wfInstanceInfoOpt = this.workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
            if (!wfInstanceInfoOpt.isPresent()) {
                log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", (Object)wfInstanceId);
                return;
            }
            WorkflowInstanceInfoDO wfInstance = (WorkflowInstanceInfoDO)wfInstanceInfoOpt.get();
            HashMap wfContext = (HashMap)JSON.parseObject((String)wfInstance.getWfContext(), (TypeReference)new TypeReference<HashMap<String, String>>(){}, (Feature[])new Feature[0]);
            for (Map.Entry<String, String> entry : appendedWfContextData.entrySet()) {
                String key = entry.getKey();
                String originValue = wfContext.put(key, entry.getValue());
                log.info("[Workflow-{}|{}] update workflow context {} : {} -> {}", new Object[]{wfInstance.getWorkflowId(), wfInstance.getWfInstanceId(), key, originValue, entry.getValue()});
            }
            wfInstance.setWfContext(JSON.toJSONString((Object)wfContext));
            this.workflowInstanceInfoRepository.saveAndFlush((Object)wfInstance);
        }
        catch (Exception e) {
            log.error("[WorkflowInstanceManager] update workflow(workflowInstanceId={}) context failed.", (Object)wfInstanceId, (Object)e);
        }
    }

    private void handleWfInstanceFinalStatus(WorkflowInstanceInfoDO wfInstance, String result, WorkflowInstanceStatus workflowInstanceStatus) {
        wfInstance.setStatus(Integer.valueOf(workflowInstanceStatus.getV()));
        wfInstance.setResult(result);
        wfInstance.setFinishedTime(Long.valueOf(System.currentTimeMillis()));
        wfInstance.setGmtModified(new Date());
        this.workflowInstanceInfoRepository.saveAndFlush((Object)wfInstance);
        if (wfInstance.getParentWfInstanceId() != null) {
            if (workflowInstanceStatus == WorkflowInstanceStatus.SUCCEED) {
                HashMap wfContext = (HashMap)JSON.parseObject((String)wfInstance.getWfContext(), (TypeReference)new TypeReference<HashMap<String, String>>(){}, (Feature[])new Feature[0]);
                ((WorkflowInstanceManager)SpringUtils.getBean(this.getClass())).updateWorkflowContext(wfInstance.getParentWfInstanceId(), wfContext);
            }
            ((WorkflowInstanceManager)SpringUtils.getBean(this.getClass())).move(wfInstance.getParentWfInstanceId(), wfInstance.getWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus), result);
        }
        if (workflowInstanceStatus == WorkflowInstanceStatus.FAILED) {
            try {
                this.workflowInfoRepository.findById((Object)wfInstance.getWorkflowId()).ifPresent(wfInfo -> {
                    WorkflowInstanceAlarm content = new WorkflowInstanceAlarm();
                    BeanUtils.copyProperties((Object)wfInfo, (Object)content);
                    BeanUtils.copyProperties((Object)wfInstance, (Object)content);
                    content.setResult(result);
                    List<UserInfoDO> userList = this.userService.fetchNotifyUserList(wfInfo.getNotifyUserIds());
                    this.alarmCenter.alarmFailed((Alarm)content, userList);
                });
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    private List<PEWorkflowDAG.Node> findControlNodes(List<PEWorkflowDAG.Node> readyNodes) {
        return readyNodes.stream().filter(node -> {
            WorkflowNodeType nodeType = WorkflowNodeType.of((int)node.getNodeType());
            return nodeType.isControlNode();
        }).collect(Collectors.toList());
    }

    private boolean isFinish(PEWorkflowDAG dag) {
        for (PEWorkflowDAG.Node node : dag.getNodes()) {
            if (!InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) continue;
            return false;
        }
        return true;
    }

    public WorkflowInstanceManager(AlarmCenter alarmCenter, IdGenerateService idGenerateService, JobInfoRepository jobInfoRepository, UserService userService, WorkflowInfoRepository workflowInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository, WorkflowNodeInfoRepository workflowNodeInfoRepository, WorkflowNodeHandleService workflowNodeHandleService) {
        this.alarmCenter = alarmCenter;
        this.idGenerateService = idGenerateService;
        this.jobInfoRepository = jobInfoRepository;
        this.userService = userService;
        this.workflowInfoRepository = workflowInfoRepository;
        this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
        this.workflowNodeInfoRepository = workflowNodeInfoRepository;
        this.workflowNodeHandleService = workflowNodeHandleService;
    }
}

