/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.worker.core.tracker.task.heavy;

import akka.actor.ActorSelection;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker;
import tech.powerjob.worker.persistence.TaskDO;

public class CommonTaskTracker
extends HeavyTaskTracker {
    private static final Logger log = LoggerFactory.getLogger(CommonTaskTracker.class);
    public static final String ROOT_TASK_ID = "0";
    public static final String LAST_TASK_ID = "9999";

    protected CommonTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
        super(req, workerRuntime);
    }

    @Override
    protected void initTaskTracker(ServerScheduleJobReq req) {
        String poolName = String.format("ctttp-%d", req.getInstanceId()) + "-%d";
        ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
        this.scheduledPool = Executors.newScheduledThreadPool(2, factory);
        this.persistenceRootTask();
        int delay = Integer.parseInt(System.getProperty("powerjob.worker.status-check.normal.period", "13"));
        this.scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 3L, delay, TimeUnit.SECONDS);
        ExecuteType executeType = ExecuteType.valueOf((String)req.getExecuteType());
        if (executeType == ExecuteType.MAP || executeType == ExecuteType.MAP_REDUCE) {
            this.scheduledPool.scheduleAtFixedRate(new HeavyTaskTracker.WorkerDetector(), 1L, 1L, TimeUnit.MINUTES);
        }
        this.scheduledPool.scheduleWithFixedDelay(new HeavyTaskTracker.Dispatcher(), 10L, 5000L, TimeUnit.MILLISECONDS);
    }

    @Override
    public InstanceDetail fetchRunningStatus() {
        InstanceDetail detail = new InstanceDetail();
        detail.setActualTriggerTime(Long.valueOf(this.createTime));
        detail.setStatus(Integer.valueOf(InstanceStatus.RUNNING.getV()));
        detail.setTaskTrackerAddress(this.workerRuntime.getWorkerAddress());
        HeavyTaskTracker.InstanceStatisticsHolder holder = this.getInstanceStatisticsHolder(this.instanceId);
        InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail();
        taskDetail.setSucceedTaskNum(holder.succeedNum);
        taskDetail.setFailedTaskNum(holder.failedNum);
        taskDetail.setTotalTaskNum(holder.getTotalTaskNum());
        detail.setTaskDetail(taskDetail);
        return detail;
    }

    private void persistenceRootTask() {
        TaskDO rootTask = new TaskDO();
        rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
        rootTask.setInstanceId(this.instanceInfo.getInstanceId());
        rootTask.setTaskId(ROOT_TASK_ID);
        rootTask.setFailedCnt(0);
        rootTask.setAddress(this.workerRuntime.getWorkerAddress());
        rootTask.setTaskName("OMS_ROOT_TASK");
        rootTask.setCreatedTime(System.currentTimeMillis());
        rootTask.setLastModifiedTime(System.currentTimeMillis());
        rootTask.setLastReportTime(-1L);
        rootTask.setSubInstanceId(this.instanceId);
        if (!this.taskPersistenceService.save(rootTask)) {
            log.error("[TaskTracker-{}] create root task failed.", (Object)this.instanceId);
            throw new PowerJobException("create root task failed for instance: " + this.instanceId);
        }
        log.info("[TaskTracker-{}] create root task successfully.", (Object)this.instanceId);
    }

    public String toString() {
        return "CommonTaskTracker()";
    }

    private class StatusCheckRunnable
    implements Runnable {
        private static final long DISPATCH_TIME_OUT_MS = 15000L;

        private StatusCheckRunnable() {
        }

        private void innerRun() {
            List<String> disconnectedPTs;
            HeavyTaskTracker.InstanceStatisticsHolder holder = CommonTaskTracker.this.getInstanceStatisticsHolder(CommonTaskTracker.this.instanceId);
            long finishedNum = holder.succeedNum + holder.failedNum;
            long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;
            log.debug("[TaskTracker-{}] status check result: {}", (Object)CommonTaskTracker.this.instanceId, (Object)holder);
            TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
            req.setAppId(CommonTaskTracker.this.workerRuntime.getAppId());
            req.setJobId(CommonTaskTracker.this.instanceInfo.getJobId());
            req.setInstanceId(Long.valueOf(CommonTaskTracker.this.instanceId));
            req.setWfInstanceId(CommonTaskTracker.this.instanceInfo.getWfInstanceId());
            req.setTotalTaskNum(finishedNum + unfinishedNum);
            req.setSucceedTaskNum(holder.succeedNum);
            req.setFailedTaskNum(holder.failedNum);
            req.setReportTime(System.currentTimeMillis());
            req.setStartTime(CommonTaskTracker.this.createTime);
            req.setSourceAddress(CommonTaskTracker.this.workerRuntime.getWorkerAddress());
            boolean success = false;
            String result = null;
            if (unfinishedNum == 0L) {
                if (finishedNum == 0L) {
                    CommonTaskTracker.this.finished.set(true);
                    result = "create root task failed";
                } else {
                    ExecuteType executeType = ExecuteType.valueOf((String)CommonTaskTracker.this.instanceInfo.getExecuteType());
                    switch (executeType) {
                        case STANDALONE: {
                            CommonTaskTracker.this.finished.set(true);
                            List<TaskDO> allTask = CommonTaskTracker.this.taskPersistenceService.getAllTask(CommonTaskTracker.this.instanceId, CommonTaskTracker.this.instanceId);
                            if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
                                success = false;
                                result = "unknown bug";
                                log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", (Object)CommonTaskTracker.this.instanceId);
                                break;
                            }
                            result = allTask.get(0).getResult();
                            success = allTask.get(0).getStatus().intValue() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
                            break;
                        }
                        case MAP: {
                            CommonTaskTracker.this.finished.set(true);
                            success = holder.failedNum == 0L;
                            result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
                            break;
                        }
                        default: {
                            Optional<TaskDO> lastTaskOptional = CommonTaskTracker.this.taskPersistenceService.getLastTask(CommonTaskTracker.this.instanceId, CommonTaskTracker.this.instanceId);
                            if (lastTaskOptional.isPresent()) {
                                TaskDO resultTask = lastTaskOptional.get();
                                TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());
                                if (lastTaskStatus != TaskStatus.WORKER_PROCESS_SUCCESS && lastTaskStatus != TaskStatus.WORKER_PROCESS_FAILED) break;
                                CommonTaskTracker.this.finished.set(true);
                                success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;
                                result = resultTask.getResult();
                                break;
                            }
                            TaskDO newLastTask = new TaskDO();
                            newLastTask.setTaskName("OMS_LAST_TASK");
                            newLastTask.setTaskId(CommonTaskTracker.LAST_TASK_ID);
                            newLastTask.setSubInstanceId(CommonTaskTracker.this.instanceId);
                            newLastTask.setAddress(CommonTaskTracker.this.workerRuntime.getWorkerAddress());
                            CommonTaskTracker.this.submitTask(Lists.newArrayList((Object[])new TaskDO[]{newLastTask}));
                        }
                    }
                }
            }
            if (this.isTimeout()) {
                CommonTaskTracker.this.finished.set(true);
                success = false;
                result = "instance execute timeout";
            }
            String serverPath = AkkaUtils.getServerActorPath(CommonTaskTracker.this.workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
            ActorSelection serverActor = CommonTaskTracker.this.workerRuntime.getActorSystem().actorSelection(serverPath);
            if (CommonTaskTracker.this.finished.get()) {
                req.setResult(result);
                req.setAppendedWfContext(CommonTaskTracker.this.appendedWfContext);
                req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
                CommonTaskTracker.this.reportFinalStatusThenDestroy(serverActor, req);
                return;
            }
            req.setInstanceStatus(InstanceStatus.RUNNING.getV());
            serverActor.tell((Object)req, null);
            long currentMS = System.currentTimeMillis();
            if (holder.workerUnreceivedNum != 0L) {
                CommonTaskTracker.this.taskPersistenceService.getTaskByStatus(CommonTaskTracker.this.instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
                    long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
                    if (elapsedTime > 15000L) {
                        TaskDO updateEntity = new TaskDO();
                        updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                        if (!"OMS_LAST_TASK".equals(uncheckTask.getTaskName())) {
                            updateEntity.setAddress("N/A");
                        }
                        updateEntity.setFailedCnt(uncheckTask.getFailedCnt() + 1);
                        CommonTaskTracker.this.taskPersistenceService.updateTask(CommonTaskTracker.this.instanceId, uncheckTask.getTaskId(), updateEntity);
                        log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.", new Object[]{CommonTaskTracker.this.instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName()});
                    }
                });
            }
            if (!(disconnectedPTs = CommonTaskTracker.this.ptStatusHolder.getAllDisconnectedProcessorTrackers()).isEmpty()) {
                log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", (Object)CommonTaskTracker.this.instanceId, disconnectedPTs);
                if (CommonTaskTracker.this.taskPersistenceService.updateLostTasks(CommonTaskTracker.this.instanceId, disconnectedPTs, true)) {
                    CommonTaskTracker.this.ptStatusHolder.remove(disconnectedPTs);
                    log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", (Object)CommonTaskTracker.this.instanceId, disconnectedPTs);
                }
            }
        }

        public boolean isTimeout() {
            if (CommonTaskTracker.this.instanceInfo.getInstanceTimeoutMS() > 0L) {
                return System.currentTimeMillis() - CommonTaskTracker.this.createTime > CommonTaskTracker.this.instanceInfo.getInstanceTimeoutMS();
            }
            return false;
        }

        @Override
        public void run() {
            try {
                this.innerRun();
            }
            catch (Exception e) {
                log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", (Object)CommonTaskTracker.this.instanceId, (Object)e);
            }
        }
    }
}

