package org.apache.doris.load.routineload;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/routineload/RoutineLoadTaskScheduler.class */
public class RoutineLoadTaskScheduler extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class);
    private static final long BACKEND_SLOT_UPDATE_INTERVAL_MS = 10000;
    private static final long SLOT_FULL_SLEEP_MS = 10000;
    private RoutineLoadManager routineLoadManager;
    private LinkedBlockingQueue<RoutineLoadTaskInfo> needScheduleTasksQueue;
    private long lastBackendSlotUpdateTime;

    @VisibleForTesting
    public RoutineLoadTaskScheduler() {
        super("Routine load task scheduler", 0L);
        this.needScheduleTasksQueue = Queues.newLinkedBlockingQueue();
        this.lastBackendSlotUpdateTime = -1L;
        this.routineLoadManager = Env.getCurrentEnv().getRoutineLoadManager();
    }

    public RoutineLoadTaskScheduler(RoutineLoadManager routineLoadManager) {
        super("Routine load task scheduler", 1L);
        this.needScheduleTasksQueue = Queues.newLinkedBlockingQueue();
        this.lastBackendSlotUpdateTime = -1L;
        this.routineLoadManager = routineLoadManager;
    }

    @Override // org.apache.doris.common.util.MasterDaemon
    protected void runAfterCatalogReady() {
        try {
            process();
        } catch (Throwable th) {
            LOG.warn("Failed to process one round of RoutineLoadTaskScheduler", th);
        }
    }

    private void process() throws UserException, InterruptedException {
        updateBackendSlotIfNecessary();
        if (this.routineLoadManager.getClusterIdleSlotNum() == 0) {
            Thread.sleep(10000L);
            return;
        }
        try {
            RoutineLoadTaskInfo take = this.needScheduleTasksQueue.take();
            if (System.currentTimeMillis() - take.getLastScheduledTime() < take.getTimeoutMs()) {
                this.needScheduleTasksQueue.put(take);
            } else {
                scheduleOneTask(take);
            }
        } catch (Exception e) {
            LOG.warn("Taking routine load task from queue has been interrupted", e);
        }
    }

    private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exception {
        routineLoadTaskInfo.setLastScheduledTime(System.currentTimeMillis());
        LOG.debug("schedule routine load task info {} for job {}", routineLoadTaskInfo.id, Long.valueOf(routineLoadTaskInfo.getJobId()));
        if (!this.routineLoadManager.checkTaskInJob(routineLoadTaskInfo)) {
            LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()).add("error_msg", "task has been abandoned when scheduling task").build());
            return;
        }
        if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
            this.needScheduleTasksQueue.put(routineLoadTaskInfo);
            return;
        }
        try {
            if (!allocateTaskToBe(routineLoadTaskInfo)) {
                this.needScheduleTasksQueue.put(routineLoadTaskInfo);
                return;
            }
            try {
                if (!routineLoadTaskInfo.beginTxn()) {
                    routineLoadTaskInfo.setBeId(-1L);
                    this.needScheduleTasksQueue.put(routineLoadTaskInfo);
                    return;
                }
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    TRoutineLoadTask createRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask();
                    LOG.debug("create routine load task cost(ms): {}, job id: {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(routineLoadTaskInfo.getJobId()));
                    try {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        submitTask(routineLoadTaskInfo.getBeId(), createRoutineLoadTask);
                        LOG.debug("send routine load task cost(ms): {}, job id: {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2), Long.valueOf(routineLoadTaskInfo.getJobId()));
                        if (createRoutineLoadTask.isSetKafkaLoadInfo()) {
                            LOG.debug("send kafka routine load task {} with partition offset: {}, job: {}", createRoutineLoadTask.label, createRoutineLoadTask.kafka_load_info.partition_begin_offset, Long.valueOf(createRoutineLoadTask.getJobId()));
                        }
                    } catch (LoadException e) {
                        LOG.warn("failed to submit routine load task {} to BE: {}, error: {}", DebugUtil.printId(routineLoadTaskInfo.getId()), Long.valueOf(routineLoadTaskInfo.getBeId()), e.getMessage());
                        this.routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).setOtherMsg(e.getMessage());
                    }
                    routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
                } catch (MetaNotFoundException e2) {
                    routineLoadTaskInfo.setBeId(-1L);
                    this.routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(RoutineLoadJob.JobState.CANCELLED, new ErrorReason(InternalErrorCode.META_NOT_FOUND_ERR, "meta not found: " + e2.getMessage()), false);
                    throw e2;
                } catch (UserException e3) {
                    routineLoadTaskInfo.setBeId(-1L);
                    this.routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(RoutineLoadJob.JobState.PAUSED, new ErrorReason(e3.getErrorCode(), "failed to create task: " + e3.getMessage()), false);
                    throw e3;
                }
            } catch (Exception e4) {
                routineLoadTaskInfo.setBeId(-1L);
                this.routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(RoutineLoadJob.JobState.PAUSED, new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR, "failed to begin txn: " + e4.getMessage()), false);
                LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()).add("error_msg", "begin task txn encounter exception: " + e4.getMessage()).build(), e4);
                throw e4;
            }
        } catch (UserException e5) {
            this.routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(RoutineLoadJob.JobState.PAUSED, new ErrorReason(e5.getErrorCode(), e5.getMessage()), false);
            throw e5;
        } catch (Exception e6) {
            this.routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(RoutineLoadJob.JobState.PAUSED, new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR, "failed to allocate task: " + e6.getMessage()), false);
            LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()).add("error_msg", "allocate task encounter exception: " + e6.getMessage()).build(), e6);
            throw e6;
        }
    }

    private void updateBackendSlotIfNecessary() {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.lastBackendSlotUpdateTime == -1 || currentTimeMillis - this.lastBackendSlotUpdateTime > 10000) {
            this.routineLoadManager.updateBeIdToMaxConcurrentTasks();
            this.lastBackendSlotUpdateTime = currentTimeMillis;
            LOG.debug("update backend max slot for routine load task scheduling. current task num per BE: {}", Integer.valueOf(Config.max_routine_load_task_num_per_be));
        }
    }

    public void addTaskInQueue(RoutineLoadTaskInfo routineLoadTaskInfo) {
        this.needScheduleTasksQueue.add(routineLoadTaskInfo);
        LOG.debug("total tasks num in routine load task queue: {}", Integer.valueOf(this.needScheduleTasksQueue.size()));
    }

    public void addTasksInQueue(List<RoutineLoadTaskInfo> list) {
        this.needScheduleTasksQueue.addAll(list);
        LOG.debug("total tasks num in routine load task queue: {}", Integer.valueOf(this.needScheduleTasksQueue.size()));
    }

    private void submitTask(long j, TRoutineLoadTask tRoutineLoadTask) throws LoadException {
        Backend backend = Env.getCurrentSystemInfo().getBackend(j);
        if (backend == null) {
            throw new LoadException("failed to send tasks to backend " + j + " because not exist");
        }
        TNetworkAddress tNetworkAddress = new TNetworkAddress(backend.getHost(), backend.getBePort());
        try {
            try {
                BackendService.Client borrowObject = ClientPool.backendPool.borrowObject(tNetworkAddress);
                TStatus submitRoutineLoadTask = borrowObject.submitRoutineLoadTask(Lists.newArrayList(new TRoutineLoadTask[]{tRoutineLoadTask}));
                if (submitRoutineLoadTask.getStatusCode() != TStatusCode.OK) {
                    throw new LoadException("failed to submit task. error code: " + submitRoutineLoadTask.getStatusCode() + ", msg: " + (submitRoutineLoadTask.getErrorMsgsSize() > 0 ? (String) submitRoutineLoadTask.getErrorMsgs().get(0) : "NaN"));
                }
                LOG.debug("send routine load task {} to BE: {}", DebugUtil.printId(tRoutineLoadTask.id), Long.valueOf(j));
                if (1 != 0) {
                    ClientPool.backendPool.returnObject(tNetworkAddress, borrowObject);
                } else {
                    ClientPool.backendPool.invalidateObject(tNetworkAddress, borrowObject);
                }
            } catch (Exception e) {
                throw new LoadException("failed to send task: " + e.getMessage(), e);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                ClientPool.backendPool.returnObject(tNetworkAddress, null);
            } else {
                ClientPool.backendPool.invalidateObject(tNetworkAddress, null);
            }
            throw th;
        }
    }

    private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException {
        long availableBeForTask = this.routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(), routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
        if (availableBeForTask == -1) {
            return false;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()).add("job_id", routineLoadTaskInfo.getJobId()).add("previous_be_id", routineLoadTaskInfo.getPreviousBeId()).add("assigned_be_id", availableBeForTask).build());
        }
        routineLoadTaskInfo.setBeId(availableBeForTask);
        return true;
    }
}
