package org.apache.doris.load.routineload;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.PauseRoutineLoadStmt;
import org.apache.doris.analysis.ResumeRoutineLoadStmt;
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.UserProperty;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/routineload/RoutineLoadManager.class */
public class RoutineLoadManager implements Writable {
    private static final Logger LOG = LogManager.getLogger(RoutineLoadManager.class);
    private Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap();
    private Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newConcurrentMap();
    private Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap();
    private ConcurrentHashMap<Long, Long> multiLoadTaskTxnIdToRoutineLoadJobId = new ConcurrentHashMap<>();
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

    private void readLock() {
        this.lock.readLock().lock();
    }

    private void readUnlock() {
        this.lock.readLock().unlock();
    }

    private void writeLock() {
        this.lock.writeLock().lock();
    }

    private void writeUnlock() {
        this.lock.writeLock().unlock();
    }

    public void addMultiLoadTaskTxnIdToRoutineLoadJobId(long j, long j2) {
        this.multiLoadTaskTxnIdToRoutineLoadJobId.put(Long.valueOf(j), Long.valueOf(j2));
    }

    public RoutineLoadJob getRoutineLoadJobByMultiLoadTaskTxnId(long j) {
        long longValue = this.multiLoadTaskTxnIdToRoutineLoadJobId.get(Long.valueOf(j)).longValue();
        if (longValue == 0) {
            return null;
        }
        return this.idToRoutineLoadJob.get(Long.valueOf(longValue));
    }

    public void removeMultiLoadTaskTxnIdToRoutineLoadJobId(long j) {
        this.multiLoadTaskTxnIdToRoutineLoadJobId.remove(Long.valueOf(j));
    }

    public void updateBeIdToMaxConcurrentTasks() {
        this.beIdToMaxConcurrentTasks = (Map) Env.getCurrentSystemInfo().getAllBackendIds(true).stream().collect(Collectors.toMap(l -> {
            return l;
        }, l2 -> {
            return Integer.valueOf(Config.max_routine_load_task_num_per_be);
        }));
    }

    public int getTotalMaxConcurrentTaskNum() {
        return this.beIdToMaxConcurrentTasks.values().stream().mapToInt(num -> {
            return num.intValue();
        }).sum();
    }

    private Map<Long, Integer> getBeCurrentTasksNumMap() {
        HashMap newHashMap = Maps.newHashMap();
        Iterator<RoutineLoadJob> it = getRoutineLoadJobByState(Sets.newHashSet(new RoutineLoadJob.JobState[]{RoutineLoadJob.JobState.RUNNING})).iterator();
        while (it.hasNext()) {
            for (Map.Entry<Long, Integer> entry : it.next().getBeCurrentTasksNumMap().entrySet()) {
                if (newHashMap.containsKey(entry.getKey())) {
                    newHashMap.put(entry.getKey(), Integer.valueOf(((Integer) newHashMap.get(entry.getKey())).intValue() + entry.getValue().intValue()));
                } else {
                    newHashMap.put(entry.getKey(), entry.getValue());
                }
            }
        }
        return newHashMap;
    }

    public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt) throws UserException {
        if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), createRoutineLoadStmt.getDBName(), createRoutineLoadStmt.getTableName(), PrivPredicate.LOAD)) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), createRoutineLoadStmt.getDBName(), createRoutineLoadStmt.getDBName() + ": " + createRoutineLoadStmt.getTableName());
        }
        LoadDataSourceType valueOf = LoadDataSourceType.valueOf(createRoutineLoadStmt.getTypeName());
        switch (valueOf) {
            case KAFKA:
                KafkaRoutineLoadJob fromCreateStmt = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
                fromCreateStmt.setOrigStmt(createRoutineLoadStmt.getOrigStmt());
                fromCreateStmt.setComment(createRoutineLoadStmt.getComment());
                addRoutineLoadJob(fromCreateStmt, createRoutineLoadStmt.getDBName());
                return;
            default:
                throw new UserException("Unknown data source type: " + valueOf);
        }
    }

    public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String str) throws DdlException {
        writeLock();
        try {
            if (isNameUsed(Long.valueOf(routineLoadJob.getDbId()), routineLoadJob.getName())) {
                throw new DdlException("Name " + routineLoadJob.getName() + " already used in db " + str);
            }
            if (getRoutineLoadJobByState(Sets.newHashSet(new RoutineLoadJob.JobState[]{RoutineLoadJob.JobState.NEED_SCHEDULE, RoutineLoadJob.JobState.RUNNING, RoutineLoadJob.JobState.PAUSED})).size() > Config.max_routine_load_job_num) {
                throw new DdlException("There are more than " + Config.max_routine_load_job_num + " routine load jobs are running. exceed limit.");
            }
            unprotectedAddJob(routineLoadJob);
            Env.getCurrentEnv().getEditLog().logCreateRoutineLoadJob(routineLoadJob);
            LOG.info("create routine load job: id: {}, name: {}", Long.valueOf(routineLoadJob.getId()), routineLoadJob.getName());
        } finally {
            writeUnlock();
        }
    }

    private void unprotectedAddJob(RoutineLoadJob routineLoadJob) {
        this.idToRoutineLoadJob.put(Long.valueOf(routineLoadJob.getId()), routineLoadJob);
        this.dbToNameToRoutineLoadJob.computeIfAbsent(Long.valueOf(routineLoadJob.getDbId()), l -> {
            return Maps.newConcurrentMap();
        }).computeIfAbsent(routineLoadJob.getName(), str -> {
            return Lists.newArrayList();
        }).add(routineLoadJob);
        Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(routineLoadJob);
    }

    private boolean isNameUsed(Long l, String str) {
        if (!this.dbToNameToRoutineLoadJob.containsKey(l)) {
            return false;
        }
        Map<String, List<RoutineLoadJob>> map = this.dbToNameToRoutineLoadJob.get(l);
        if (map.containsKey(str)) {
            return map.get(str).stream().filter(routineLoadJob -> {
                return routineLoadJob.getName().equals(str);
            }).filter(routineLoadJob2 -> {
                return !routineLoadJob2.getState().isFinalState();
            }).findFirst().isPresent();
        }
        return false;
    }

    public RoutineLoadJob checkPrivAndGetJob(String str, String str2) throws MetaNotFoundException, DdlException, AnalysisException {
        RoutineLoadJob job = getJob(str, str2);
        if (job == null) {
            throw new DdlException("There is not operable routine load job with name " + str2);
        }
        try {
            String dbFullName = job.getDbFullName();
            String tableName = job.getTableName();
            if (job.isMultiTable()) {
                if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), dbFullName, PrivPredicate.LOAD)) {
                    ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), dbFullName);
                }
                return job;
            }
            if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), dbFullName, tableName, PrivPredicate.LOAD)) {
                ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), dbFullName + ": " + tableName);
            }
            return job;
        } catch (MetaNotFoundException e) {
            throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e);
        }
    }

    public List<RoutineLoadJob> checkPrivAndGetAllJobs(String str) throws MetaNotFoundException, DdlException {
        ArrayList newArrayList = Lists.newArrayList();
        Map<String, List<RoutineLoadJob>> map = this.dbToNameToRoutineLoadJob.get(Long.valueOf(Env.getCurrentInternalCatalog().getDbOrDdlException(str).getId()));
        if (map == null) {
            return newArrayList;
        }
        Iterator<List<RoutineLoadJob>> it = map.values().iterator();
        while (it.hasNext()) {
            for (RoutineLoadJob routineLoadJob : it.next()) {
                if (!routineLoadJob.getState().isFinalState()) {
                    String tableName = routineLoadJob.getTableName();
                    if (routineLoadJob.isMultiTable() || Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), str, tableName, PrivPredicate.LOAD)) {
                        newArrayList.add(routineLoadJob);
                    }
                }
            }
        }
        return newArrayList;
    }

    public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throws UserException {
        List<RoutineLoadJob> newArrayList = Lists.newArrayList();
        if (pauseRoutineLoadStmt.isAll()) {
            newArrayList = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
        } else {
            newArrayList.add(checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(), pauseRoutineLoadStmt.getName()));
        }
        for (RoutineLoadJob routineLoadJob : newArrayList) {
            try {
                routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR, "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"), false);
                LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(routineLoadJob.getId())).add("current_state", routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg", "routine load job has been paused by user").build());
            } catch (UserException e) {
                LOG.warn("failed to pause routine load job {}", routineLoadJob.getName(), e);
                if (!pauseRoutineLoadStmt.isAll()) {
                    throw e;
                }
            }
        }
    }

    public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws UserException {
        List<RoutineLoadJob> newArrayList = Lists.newArrayList();
        if (resumeRoutineLoadStmt.isAll()) {
            newArrayList = checkPrivAndGetAllJobs(resumeRoutineLoadStmt.getDbFullName());
        } else {
            newArrayList.add(checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(), resumeRoutineLoadStmt.getName()));
        }
        for (RoutineLoadJob routineLoadJob : newArrayList) {
            try {
                routineLoadJob.jobStatistic.errorRowsAfterResumed = 0L;
                routineLoadJob.autoResumeCount = 0L;
                routineLoadJob.firstResumeTimestamp = 0L;
                routineLoadJob.autoResumeLock = false;
                routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false);
                LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(routineLoadJob.getId())).add("current_state", routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg", "routine load job has been resumed by user").build());
            } catch (UserException e) {
                LOG.warn("failed to resume routine load job {}", routineLoadJob.getName(), e);
                if (!resumeRoutineLoadStmt.isAll()) {
                    throw e;
                }
            }
        }
    }

    public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws UserException {
        RoutineLoadJob checkPrivAndGetJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(), stopRoutineLoadStmt.getName());
        checkPrivAndGetJob.updateState(RoutineLoadJob.JobState.STOPPED, new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR, "User  " + ConnectContext.get().getQualifiedUser() + " stop routine load job"), false);
        LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(checkPrivAndGetJob.getId())).add("current_state", checkPrivAndGetJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg", "routine load job has been stopped by user").build());
    }

    public int getSizeOfIdToRoutineLoadTask() {
        int i = 0;
        Iterator<RoutineLoadJob> it = this.idToRoutineLoadJob.values().iterator();
        while (it.hasNext()) {
            i += it.next().getSizeOfRoutineLoadTaskInfoList();
        }
        return i;
    }

    public int getClusterIdleSlotNum() {
        readLock();
        try {
            int i = 0;
            Map<Long, Integer> beCurrentTasksNumMap = getBeCurrentTasksNumMap();
            for (Map.Entry<Long, Integer> entry : this.beIdToMaxConcurrentTasks.entrySet()) {
                i = beCurrentTasksNumMap.containsKey(entry.getKey()) ? i + (entry.getValue().intValue() - beCurrentTasksNumMap.get(entry.getKey()).intValue()) : i + entry.getValue().intValue();
            }
            return i;
        } finally {
            readUnlock();
        }
    }

    public long getMinTaskBeId(String str) throws LoadException {
        List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
        if (allBackendIds == null) {
            throw new LoadException("The " + str + " has been deleted");
        }
        readLock();
        try {
            long j = -1;
            int i = 0;
            updateBeIdToMaxConcurrentTasks();
            Map<Long, Integer> beCurrentTasksNumMap = getBeCurrentTasksNumMap();
            for (Long l : allBackendIds) {
                if (this.beIdToMaxConcurrentTasks.containsKey(l)) {
                    int intValue = beCurrentTasksNumMap.containsKey(l) ? this.beIdToMaxConcurrentTasks.get(l).intValue() - beCurrentTasksNumMap.get(l).intValue() : Config.max_routine_load_task_num_per_be;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", l, Integer.valueOf(intValue), beCurrentTasksNumMap.get(l), this.beIdToMaxConcurrentTasks.get(l));
                    }
                    j = i < intValue ? l.longValue() : j;
                    i = Math.max(i, intValue);
                }
            }
            return j;
        } finally {
            readUnlock();
        }
    }

    public long getAvailableBeForTask(long j, long j2, String str) throws LoadException {
        Backend backend;
        List<Long> availableBackendIds = getAvailableBackendIds(j, str);
        readLock();
        try {
            Map<Long, Integer> beCurrentTasksNumMap = getBeCurrentTasksNumMap();
            if (j2 != -1 && availableBackendIds.contains(Long.valueOf(j2)) && (backend = Env.getCurrentSystemInfo().getBackend(j2)) != null && backend.isLoadAvailable()) {
                if ((!this.beIdToMaxConcurrentTasks.containsKey(Long.valueOf(j2)) ? 0 : beCurrentTasksNumMap.containsKey(Long.valueOf(j2)) ? this.beIdToMaxConcurrentTasks.get(Long.valueOf(j2)).intValue() - beCurrentTasksNumMap.get(Long.valueOf(j2)).intValue() : this.beIdToMaxConcurrentTasks.get(Long.valueOf(j2)).intValue()) > 0) {
                    return j2;
                }
            }
            long j3 = -1;
            int i = 0;
            for (Long l : availableBackendIds) {
                if (this.beIdToMaxConcurrentTasks.containsKey(l)) {
                    int intValue = beCurrentTasksNumMap.containsKey(l) ? this.beIdToMaxConcurrentTasks.get(l).intValue() - beCurrentTasksNumMap.get(l).intValue() : Config.max_routine_load_task_num_per_be;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", l, Integer.valueOf(intValue), beCurrentTasksNumMap.get(l), this.beIdToMaxConcurrentTasks.get(l));
                    }
                    j3 = i < intValue ? l.longValue() : j3;
                    i = Math.max(i, intValue);
                }
            }
            long j4 = j3;
            readUnlock();
            return j4;
        } finally {
            readUnlock();
        }
    }

    private List<Long> getAvailableBackendIds(long j, String str) throws LoadException {
        Set<Tag> resourceTags;
        RoutineLoadJob job = getJob(j);
        if (job == null) {
            throw new LoadException("job " + j + " does not exist");
        }
        if (job.getUserIdentity() == null) {
            resourceTags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
        } else {
            resourceTags = Env.getCurrentEnv().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
            if (resourceTags == UserProperty.INVALID_RESOURCE_TAGS) {
                resourceTags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
            }
        }
        return Env.getCurrentSystemInfo().selectBackendIdsByPolicy(new BeSelectionPolicy.Builder().needLoadAvailable().addTags(resourceTags).build(), -1);
    }

    private Set<Tag> getTagsFromReplicaAllocation(long j, long j2) throws LoadException {
        try {
            OlapTable olapTable = (OlapTable) Env.getCurrentInternalCatalog().getDbOrMetaException(j).getTableOrMetaException(j2, TableIf.TableType.OLAP);
            olapTable.readLock();
            try {
                PartitionInfo partitionInfo = olapTable.getPartitionInfo();
                Iterator<Partition> it = olapTable.getPartitions().iterator();
                if (it.hasNext()) {
                    Set<Tag> keySet = partitionInfo.getReplicaAllocation(it.next().getId()).getAllocMap().keySet();
                    olapTable.readUnlock();
                    return keySet;
                }
                HashSet newHashSet = Sets.newHashSet();
                olapTable.readUnlock();
                return newHashSet;
            } catch (Throwable th) {
                olapTable.readUnlock();
                throw th;
            }
        } catch (MetaNotFoundException e) {
            throw new LoadException(e.getMessage());
        }
    }

    public RoutineLoadJob getJob(long j) {
        return this.idToRoutineLoadJob.get(Long.valueOf(j));
    }

    public RoutineLoadJob getJob(String str, String str2) throws MetaNotFoundException {
        List<RoutineLoadJob> job = getJob(str, str2, false, null);
        if (CollectionUtils.isEmpty(job)) {
            return null;
        }
        return job.get(0);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v45, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r0v50, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r6v0, types: [org.apache.doris.load.routineload.RoutineLoadManager] */
    public List<RoutineLoadJob> getJob(String str, String str2, boolean z, PatternMatcher patternMatcher) throws MetaNotFoundException {
        ArrayList arrayList;
        Preconditions.checkArgument(str2 == null || patternMatcher == null, "jobName and matcher cannot be not null at the same time");
        if (str == null) {
            arrayList = new ArrayList(this.idToRoutineLoadJob.values());
            sortRoutineLoadJob(arrayList);
        } else {
            long id = Env.getCurrentInternalCatalog().getDbOrMetaException(str).getId();
            if (!this.dbToNameToRoutineLoadJob.containsKey(Long.valueOf(id))) {
                arrayList = new ArrayList();
            } else if (str2 == null) {
                arrayList = Lists.newArrayList();
                Iterator<List<RoutineLoadJob>> it = this.dbToNameToRoutineLoadJob.get(Long.valueOf(id)).values().iterator();
                while (it.hasNext()) {
                    ArrayList arrayList2 = new ArrayList(it.next());
                    sortRoutineLoadJob(arrayList2);
                    arrayList.addAll(arrayList2);
                }
            } else {
                if (!this.dbToNameToRoutineLoadJob.get(Long.valueOf(id)).containsKey(str2)) {
                    return null;
                }
                arrayList = new ArrayList(this.dbToNameToRoutineLoadJob.get(Long.valueOf(id)).get(str2));
                sortRoutineLoadJob(arrayList);
            }
        }
        if (!z) {
            arrayList = (List) arrayList.stream().filter(routineLoadJob -> {
                return !routineLoadJob.getState().isFinalState();
            }).collect(Collectors.toList());
        }
        if (patternMatcher != null) {
            arrayList = (List) arrayList.stream().filter(routineLoadJob2 -> {
                return patternMatcher.match(routineLoadJob2.getName());
            }).collect(Collectors.toList());
        }
        return arrayList;
    }

    public List<RoutineLoadJob> getJobByName(String str) {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map<String, List<RoutineLoadJob>> map : this.dbToNameToRoutineLoadJob.values()) {
            if (map.containsKey(str)) {
                ArrayList arrayList = new ArrayList(map.get(str));
                sortRoutineLoadJob(arrayList);
                newArrayList.addAll(arrayList);
            }
        }
        return newArrayList;
    }

    private void sortRoutineLoadJob(List<RoutineLoadJob> list) {
        if (list == null) {
            return;
        }
        int i = 0;
        int size = list.size() - 1;
        while (i < size) {
            while (!list.get(i).isFinal() && i < size) {
                i++;
            }
            while (list.get(size).isFinal() && i < size) {
                size--;
            }
            if (i < size) {
                RoutineLoadJob routineLoadJob = list.get(i);
                list.set(i, list.get(size));
                list.set(size, routineLoadJob);
            }
        }
    }

    public boolean checkTaskInJob(RoutineLoadTaskInfo routineLoadTaskInfo) {
        RoutineLoadJob routineLoadJob = this.idToRoutineLoadJob.get(Long.valueOf(routineLoadTaskInfo.getJobId()));
        if (routineLoadJob == null) {
            return false;
        }
        return routineLoadJob.containsTask(routineLoadTaskInfo.getId());
    }

    public List<RoutineLoadJob> getRoutineLoadJobByState(Set<RoutineLoadJob.JobState> set) {
        return (List) this.idToRoutineLoadJob.values().stream().filter(routineLoadJob -> {
            return set.contains(routineLoadJob.getState());
        }).collect(Collectors.toList());
    }

    public void processTimeoutTasks() {
        Iterator<RoutineLoadJob> it = this.idToRoutineLoadJob.values().iterator();
        while (it.hasNext()) {
            it.next().processTimeoutTasks();
        }
    }

    public void cleanOldRoutineLoadJobs() {
        LOG.debug("begin to clean old routine load jobs ");
        writeLock();
        try {
            Iterator<Map.Entry<Long, RoutineLoadJob>> it = this.idToRoutineLoadJob.entrySet().iterator();
            long currentTimeMillis = System.currentTimeMillis();
            while (it.hasNext()) {
                RoutineLoadJob value = it.next().getValue();
                if (value.needRemove()) {
                    unprotectedRemoveJobFromDb(value);
                    it.remove();
                    Env.getCurrentEnv().getEditLog().logRemoveRoutineLoadJob(new RoutineLoadOperation(value.getId(), value.getState()));
                    LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(value.getId())).add("end_timestamp", value.getEndTimestamp()).add("current_timestamp", currentTimeMillis).add("job_state", value.getState()).add("msg", "old job has been cleaned"));
                }
            }
        } finally {
            writeUnlock();
        }
    }

    public void replayRemoveOldRoutineLoad(RoutineLoadOperation routineLoadOperation) {
        writeLock();
        try {
            RoutineLoadJob remove = this.idToRoutineLoadJob.remove(Long.valueOf(routineLoadOperation.getId()));
            if (remove != null) {
                unprotectedRemoveJobFromDb(remove);
            }
            LOG.info("replay remove routine load job: {}", Long.valueOf(routineLoadOperation.getId()));
        } finally {
            writeUnlock();
        }
    }

    private void unprotectedRemoveJobFromDb(RoutineLoadJob routineLoadJob) {
        this.dbToNameToRoutineLoadJob.get(Long.valueOf(routineLoadJob.getDbId())).get(routineLoadJob.getName()).remove(routineLoadJob);
        if (this.dbToNameToRoutineLoadJob.get(Long.valueOf(routineLoadJob.getDbId())).get(routineLoadJob.getName()).isEmpty()) {
            this.dbToNameToRoutineLoadJob.get(Long.valueOf(routineLoadJob.getDbId())).remove(routineLoadJob.getName());
        }
        if (this.dbToNameToRoutineLoadJob.get(Long.valueOf(routineLoadJob.getDbId())).isEmpty()) {
            this.dbToNameToRoutineLoadJob.remove(Long.valueOf(routineLoadJob.getDbId()));
        }
    }

    public void updateRoutineLoadJob() throws UserException {
        for (RoutineLoadJob routineLoadJob : this.idToRoutineLoadJob.values()) {
            if (!routineLoadJob.state.isFinalState()) {
                routineLoadJob.update();
            }
        }
    }

    public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
        unprotectedAddJob(routineLoadJob);
        LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(routineLoadJob.getId())).add("msg", "replay create routine load job").build());
    }

    public void replayChangeRoutineLoadJob(RoutineLoadOperation routineLoadOperation) {
        try {
            getJob(routineLoadOperation.getId()).updateState(routineLoadOperation.getJobState(), null, true);
        } catch (UserException e) {
            LOG.error("should not happened", e);
        }
        LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(routineLoadOperation.getId())).add("current_state", routineLoadOperation.getJobState()).add("msg", "replay change routine load job").build());
    }

    public void alterRoutineLoadJob(AlterRoutineLoadStmt alterRoutineLoadStmt) throws UserException {
        RoutineLoadJob checkPrivAndGetJob = checkPrivAndGetJob(alterRoutineLoadStmt.getDbName(), alterRoutineLoadStmt.getLabel());
        if (alterRoutineLoadStmt.hasDataSourceProperty() && !alterRoutineLoadStmt.getDataSourceProperties().getDataSourceType().equalsIgnoreCase(checkPrivAndGetJob.dataSourceType.name())) {
            throw new DdlException("The specified job type is not: " + alterRoutineLoadStmt.getDataSourceProperties().getDataSourceType());
        }
        checkPrivAndGetJob.modifyProperties(alterRoutineLoadStmt);
    }

    public void replayAlterRoutineLoadJob(AlterRoutineLoadJobOperationLog alterRoutineLoadJobOperationLog) {
        RoutineLoadJob job = getJob(alterRoutineLoadJobOperationLog.getJobId());
        Preconditions.checkNotNull(job, Long.valueOf(alterRoutineLoadJobOperationLog.getJobId()));
        job.replayModifyProperties(alterRoutineLoadJobOperationLog);
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.idToRoutineLoadJob.size());
        Iterator<RoutineLoadJob> it = this.idToRoutineLoadJob.values().iterator();
        while (it.hasNext()) {
            it.next().write(dataOutput);
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            RoutineLoadJob read = RoutineLoadJob.read(dataInput);
            this.idToRoutineLoadJob.put(Long.valueOf(read.getId()), read);
            Map<String, List<RoutineLoadJob>> map = this.dbToNameToRoutineLoadJob.get(Long.valueOf(read.getDbId()));
            if (map == null) {
                map = Maps.newConcurrentMap();
                this.dbToNameToRoutineLoadJob.put(Long.valueOf(read.getDbId()), map);
            }
            List<RoutineLoadJob> list = map.get(read.getName());
            if (list == null) {
                list = Lists.newArrayList();
                map.put(read.getName(), list);
            }
            list.add(read);
            if (!read.getState().isFinalState()) {
                Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(read);
            }
        }
    }
}
