/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.runner;

import java.util.Map;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RetryReportTaskStatusThread
implements Runnable {
    private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
    @Autowired
    WorkerConfig workerConfig;
    private final TaskCallbackService taskCallbackService = (TaskCallbackService)SpringApplicationContext.getBean(TaskCallbackService.class);

    public void start() {
        Thread thread = new Thread((Runnable)this, "RetryReportTaskStatusThread");
        thread.setDaemon(true);
        thread.start();
    }

    @Override
    public void run() {
        ResponceCache responceCache = ResponceCache.get();
        long interval = (long)(this.workerConfig.getRetryReportTaskStatusInterval() * 1000) * 60L;
        while (Stopper.isRunning()) {
            ThreadUtils.sleep((long)60000L);
            long nowTimeMillis = System.currentTimeMillis();
            try {
                this.retrySendCommand(responceCache.getAckCache(), interval, nowTimeMillis);
                this.retrySendCommand(responceCache.getResponseCache(), interval, nowTimeMillis);
                this.retrySendCommand(responceCache.getKillResponseCache(), interval, nowTimeMillis);
                this.retrySendCommand(responceCache.getRecallCache(), interval, nowTimeMillis);
            }
            catch (Exception e) {
                this.logger.warn("retry report task status error", (Throwable)e);
            }
        }
    }

    private void retrySendCommand(Map<Integer, Command> cache, long interval, long nowTimeMillis) {
        for (Map.Entry<Integer, Command> entry : cache.entrySet()) {
            Command command = entry.getValue();
            if (nowTimeMillis - command.getGenCommandTimeMillis() <= interval) continue;
            Integer taskInstanceId = entry.getKey();
            this.taskCallbackService.sendResult(taskInstanceId, command);
            this.logger.info("retry send command successfully, the command type {}, the task id:{}", (Object)command.getType(), (Object)taskInstanceId);
        }
    }
}

