package org.apache.doris.load.loadv2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CleanLabelStmt;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.UserManager;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/loadv2/LoadManager.class */
public class LoadManager implements Writable {
    private static final Logger LOG = LogManager.getLogger(LoadManager.class);
    private LoadJobScheduler loadJobScheduler;
    private Map<Long, LoadJob> idToLoadJob = Maps.newConcurrentMap();
    private Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = Maps.newConcurrentMap();
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private TokenManager tokenManager = new TokenManager();
    private MysqlLoadManager mysqlLoadManager = new MysqlLoadManager(this.tokenManager);

    public LoadManager(LoadJobScheduler loadJobScheduler) {
        this.loadJobScheduler = loadJobScheduler;
    }

    public void start() {
        this.tokenManager.start();
        this.mysqlLoadManager.start();
    }

    public long createLoadJobFromStmt(LoadStmt loadStmt) throws DdlException {
        long id = checkDb(loadStmt.getLabel().getDbName()).getId();
        writeLock();
        try {
            if (loadStmt.getBrokerDesc() == null || !loadStmt.getBrokerDesc().isMultiLoadBroker()) {
                checkLabelUsed(id, loadStmt.getLabel().getLabelName());
                if (loadStmt.getBrokerDesc() == null && loadStmt.getResourceDesc() == null) {
                    throw new DdlException("LoadManager only support the broker and spark load.");
                }
                if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) {
                    throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " unfinished load jobs, please retry later. You can use `SHOW LOAD` to view submitted jobs");
                }
            } else if (!Env.getCurrentEnv().getLoadInstance().isUncommittedLabel(id, loadStmt.getLabel().getLabelName())) {
                throw new DdlException("label: " + loadStmt.getLabel().getLabelName() + " not found!");
            }
            BulkLoadJob fromLoadStmt = BulkLoadJob.fromLoadStmt(loadStmt);
            createLoadJob(fromLoadStmt);
            writeUnlock();
            Env.getCurrentEnv().getEditLog().logCreateLoadJob(fromLoadStmt);
            this.loadJobScheduler.submitJob(fromLoadStmt);
            return fromLoadStmt.getId();
        } catch (Throwable th) {
            writeUnlock();
            throw th;
        }
    }

    private long unprotectedGetUnfinishedJobNum() {
        return this.idToLoadJob.values().stream().filter(loadJob -> {
            return (loadJob.getState() == JobState.FINISHED || loadJob.getState() == JobState.CANCELLED) ? false : true;
        }).count();
    }

    public void createLoadJobV1FromMultiStart(String str, String str2) throws DdlException {
        Database checkDb = checkDb(str);
        writeLock();
        try {
            checkLabelUsed(checkDb.getId(), str2);
            Env.getCurrentEnv().getLoadInstance().registerMiniLabel(str, str2, System.currentTimeMillis());
            writeUnlock();
        } catch (Throwable th) {
            writeUnlock();
            throw th;
        }
    }

    public MysqlLoadManager getMysqlLoadManager() {
        return this.mysqlLoadManager;
    }

    public TokenManager getTokenManager() {
        return this.tokenManager;
    }

    public void replayCreateLoadJob(LoadJob loadJob) {
        createLoadJob(loadJob);
        LOG.info(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(loadJob.getId())).add("msg", "replay create load job").build());
    }

    private void createLoadJob(LoadJob loadJob) {
        if (loadJob.isExpired(System.currentTimeMillis())) {
            return;
        }
        addLoadJob(loadJob);
        if (loadJob.isCompleted()) {
            return;
        }
        Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
    }

    private void addLoadJob(LoadJob loadJob) {
        this.idToLoadJob.put(Long.valueOf(loadJob.getId()), loadJob);
        long dbId = loadJob.getDbId();
        if (!this.dbIdToLabelToLoadJobs.containsKey(Long.valueOf(dbId))) {
            this.dbIdToLabelToLoadJobs.put(Long.valueOf(loadJob.getDbId()), new ConcurrentHashMap());
        }
        Map<String, List<LoadJob>> map = this.dbIdToLabelToLoadJobs.get(Long.valueOf(dbId));
        if (!map.containsKey(loadJob.getLabel())) {
            map.put(loadJob.getLabel(), new ArrayList());
        }
        map.get(loadJob.getLabel()).add(loadJob);
    }

    public void recordFinishedLoadJob(String str, long j, String str2, long j2, EtlJobType etlJobType, long j3, String str3, String str4, UserIdentity userIdentity) throws MetaNotFoundException {
        Database dbOrMetaException = Env.getCurrentInternalCatalog().getDbOrMetaException(str2);
        switch (etlJobType) {
            case INSERT:
                InsertLoadJob insertLoadJob = new InsertLoadJob(str, j, dbOrMetaException.getId(), j2, j3, str3, str4, userIdentity);
                addLoadJob(insertLoadJob);
                Env.getCurrentEnv().getEditLog().logCreateLoadJob(insertLoadJob);
                return;
            default:
                return;
        }
    }

    @VisibleForTesting
    public static void addNeedCancelLoadJob(CancelLoadStmt cancelLoadStmt, List<LoadJob> list, List<LoadJob> list2) throws AnalysisException {
        String label = cancelLoadStmt.getLabel();
        String state = cancelLoadStmt.getState();
        PatternMatcher createMysqlPattern = PatternMatcherWrapper.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
        list2.addAll((Collection) list.stream().filter(loadJob -> {
            return loadJob.getState() != JobState.CANCELLED;
        }).filter(loadJob2 -> {
            if (cancelLoadStmt.getOperator() != null) {
                boolean match = label.contains(UserManager.ANY_HOST) ? createMysqlPattern.match(loadJob2.getLabel()) : loadJob2.getLabel().equalsIgnoreCase(label);
                boolean equalsIgnoreCase = loadJob2.getState().name().equalsIgnoreCase(state);
                return CompoundPredicate.Operator.AND.equals(cancelLoadStmt.getOperator()) ? match && equalsIgnoreCase : match || equalsIgnoreCase;
            }
            if (StringUtils.isNotEmpty(label)) {
                return label.contains(UserManager.ANY_HOST) ? createMysqlPattern.match(loadJob2.getLabel()) : loadJob2.getLabel().equalsIgnoreCase(label);
            }
            if (StringUtils.isNotEmpty(state)) {
                return loadJob2.getState().name().equalsIgnoreCase(state);
            }
            return false;
        }).collect(Collectors.toList()));
    }

    public void cancelLoadJob(CancelLoadStmt cancelLoadStmt) throws DdlException, AnalysisException {
        Database dbOrDdlException = Env.getCurrentInternalCatalog().getDbOrDdlException(cancelLoadStmt.getDbName());
        Lists.newArrayList();
        readLock();
        try {
            Map<String, List<LoadJob>> map = this.dbIdToLabelToLoadJobs.get(Long.valueOf(dbOrDdlException.getId()));
            if (map == null) {
                throw new DdlException("Load job does not exist");
            }
            ArrayList newArrayList = Lists.newArrayList();
            addNeedCancelLoadJob(cancelLoadStmt, (List) map.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()), newArrayList);
            if (newArrayList.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }
            List<LoadJob> list = (List) newArrayList.stream().filter(loadJob -> {
                return !loadJob.isTxnDone();
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                throw new DdlException("There is no uncompleted job");
            }
            for (LoadJob loadJob2 : list) {
                try {
                    loadJob2.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
                } catch (DdlException e) {
                    throw new DdlException("Cancel load job [" + loadJob2.getId() + "] fail, label=[" + loadJob2.getLabel() + "] failed msg=" + e.getMessage());
                }
            }
        } finally {
            readUnlock();
        }
    }

    public void replayEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) {
        LoadJob loadJob = this.idToLoadJob.get(Long.valueOf(loadJobFinalOperation.getId()));
        if (loadJob == null) {
            LOG.warn("job does not exist when replaying end load job edit log: {}", loadJobFinalOperation);
        } else {
            loadJob.unprotectReadEndOperation(loadJobFinalOperation);
            LOG.info(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(loadJobFinalOperation.getId())).add("operation", loadJobFinalOperation).add("msg", "replay end load job").build());
        }
    }

    public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo loadJobStateUpdateInfo) {
        long jobId = loadJobStateUpdateInfo.getJobId();
        LoadJob loadJob = this.idToLoadJob.get(Long.valueOf(jobId));
        if (loadJob == null) {
            LOG.warn("replay update load job state failed. error: job not found, id: {}", Long.valueOf(jobId));
        } else {
            loadJob.replayUpdateStateInfo(loadJobStateUpdateInfo);
        }
    }

    public int getLoadJobNum(JobState jobState, long j) {
        readLock();
        try {
            Map<String, List<LoadJob>> map = this.dbIdToLabelToLoadJobs.get(Long.valueOf(j));
            if (map == null) {
                return 0;
            }
            int count = (int) ((List) map.values().stream().flatMap(list -> {
                return list.stream();
            }).collect(Collectors.toList())).stream().filter(loadJob -> {
                return loadJob.getState() == jobState;
            }).count();
            readUnlock();
            return count;
        } finally {
            readUnlock();
        }
    }

    public int getLoadJobNum(JobState jobState) {
        readLock();
        try {
            HashMap hashMap = new HashMap();
            for (Long l : this.dbIdToLabelToLoadJobs.keySet()) {
                if (Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), Env.getCurrentEnv().getCatalogMgr().getDbNullable(l.longValue()).getFullName(), PrivPredicate.LOAD)) {
                    hashMap.putAll(this.dbIdToLabelToLoadJobs.get(l));
                }
            }
            int count = (int) ((List) hashMap.values().stream().flatMap(list -> {
                return list.stream();
            }).collect(Collectors.toList())).stream().filter(loadJob -> {
                return loadJob.getState() == jobState;
            }).count();
            readUnlock();
            return count;
        } catch (Throwable th) {
            readUnlock();
            throw th;
        }
    }

    public long getLoadJobNum(JobState jobState, EtlJobType etlJobType) {
        readLock();
        try {
            long count = this.idToLoadJob.values().stream().filter(loadJob -> {
                return loadJob.getState() == jobState && loadJob.getJobType() == etlJobType;
            }).count();
            readUnlock();
            return count;
        } catch (Throwable th) {
            readUnlock();
            throw th;
        }
    }

    public void removeOldLoadJob() {
        long currentTimeMillis = System.currentTimeMillis();
        writeLock();
        try {
            Iterator<Map.Entry<Long, LoadJob>> it = this.idToLoadJob.entrySet().iterator();
            while (it.hasNext()) {
                LoadJob value = it.next().getValue();
                if (value.isExpired(currentTimeMillis)) {
                    it.remove();
                    Map<String, List<LoadJob>> map = this.dbIdToLabelToLoadJobs.get(Long.valueOf(value.getDbId()));
                    List<LoadJob> list = map.get(value.getLabel());
                    list.remove(value);
                    if (value instanceof SparkLoadJob) {
                        ((SparkLoadJob) value).clearSparkLauncherLog();
                    }
                    if (list.isEmpty()) {
                        map.remove(value.getLabel());
                    }
                    if (map.isEmpty()) {
                        this.dbIdToLabelToLoadJobs.remove(Long.valueOf(value.getDbId()));
                    }
                }
            }
        } finally {
            writeUnlock();
        }
    }

    public void processEtlStateJobs() {
        this.idToLoadJob.values().stream().filter(loadJob -> {
            return loadJob.jobType == EtlJobType.SPARK && loadJob.state == JobState.ETL;
        }).forEach(loadJob2 -> {
            try {
                ((SparkLoadJob) loadJob2).updateEtlStatus();
            } catch (DataQualityException e) {
                LOG.info("update load job etl status failed. job id: {}", Long.valueOf(loadJob2.getId()), e);
                loadJob2.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG), true, true);
            } catch (UserException e2) {
                LOG.warn("update load job etl status failed. job id: {}", Long.valueOf(loadJob2.getId()), e2);
                loadJob2.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, e2.getMessage()), true, true);
            } catch (Exception e3) {
                LOG.warn("update load job etl status failed. job id: {}", Long.valueOf(loadJob2.getId()), e3);
            }
        });
    }

    public void processLoadingStateJobs() {
        this.idToLoadJob.values().stream().filter(loadJob -> {
            return loadJob.jobType == EtlJobType.SPARK && loadJob.state == JobState.LOADING;
        }).forEach(loadJob2 -> {
            try {
                ((SparkLoadJob) loadJob2).updateLoadingStatus();
            } catch (UserException e) {
                LOG.warn("update load job loading status failed. job id: {}", Long.valueOf(loadJob2.getId()), e);
                loadJob2.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
            } catch (Exception e2) {
                LOG.warn("update load job loading status failed. job id: {}", Long.valueOf(loadJob2.getId()), e2);
            }
        });
    }

    public List<Pair<Long, String>> getCreateLoadStmt(long j, String str) throws DdlException {
        ArrayList arrayList = new ArrayList();
        readLock();
        try {
            if (!this.dbIdToLabelToLoadJobs.containsKey(Long.valueOf(j))) {
                throw new DdlException("Database does not exist");
            }
            Map<String, List<LoadJob>> map = this.dbIdToLabelToLoadJobs.get(Long.valueOf(j));
            if (!map.containsKey(str)) {
                throw new DdlException("Label does not exist: " + str);
            }
            for (LoadJob loadJob : map.get(str)) {
                try {
                    Method method = loadJob.getClass().getMethod("getOriginStmt", new Class[0]);
                    if (method == null) {
                        throw new DdlException("Not support load job type: " + loadJob.getClass().getName());
                    }
                    arrayList.add(Pair.of(Long.valueOf(loadJob.getId()), ((OriginStatement) method.invoke(loadJob, new Object[0])).originStmt));
                } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                    throw new DdlException("Not support load job type: " + loadJob.getClass().getName());
                }
            }
            return arrayList;
        } finally {
            readUnlock();
        }
    }

    public List<List<Comparable>> getLoadJobInfosByDb(long j, String str, boolean z, Set<String> set) throws AnalysisException {
        LinkedList linkedList = new LinkedList();
        if (!this.dbIdToLabelToLoadJobs.containsKey(Long.valueOf(j))) {
            return linkedList;
        }
        HashSet newHashSet = Sets.newHashSet();
        if (set == null || set.size() == 0) {
            newHashSet.addAll(EnumSet.allOf(JobState.class));
        } else {
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                try {
                    newHashSet.add(JobState.valueOf(it.next()));
                } catch (IllegalArgumentException e) {
                }
            }
        }
        readLock();
        try {
            Map<String, List<LoadJob>> map = this.dbIdToLabelToLoadJobs.get(Long.valueOf(j));
            ArrayList<LoadJob> newArrayList = Lists.newArrayList();
            if (Strings.isNullOrEmpty(str)) {
                newArrayList.addAll((Collection) map.values().stream().flatMap((v0) -> {
                    return v0.stream();
                }).collect(Collectors.toList()));
            } else if (!z) {
                PatternMatcher createMysqlPattern = PatternMatcherWrapper.createMysqlPattern(str, CaseSensibility.LABEL.getCaseSensibility());
                for (Map.Entry<String, List<LoadJob>> entry : map.entrySet()) {
                    if (createMysqlPattern.match(entry.getKey())) {
                        newArrayList.addAll(entry.getValue());
                    }
                }
            } else {
                if (!map.containsKey(str)) {
                    return linkedList;
                }
                newArrayList.addAll(map.get(str));
            }
            for (LoadJob loadJob : newArrayList) {
                try {
                    if (newHashSet.contains(loadJob.getState())) {
                        linkedList.add(loadJob.getShowInfo());
                    }
                } catch (DdlException e2) {
                }
            }
            readUnlock();
            return linkedList;
        } finally {
            readUnlock();
        }
    }

    public List<List<Comparable>> getAllLoadJobInfos() {
        LinkedList linkedList = new LinkedList();
        readLock();
        try {
            HashMap hashMap = new HashMap();
            for (Long l : this.dbIdToLabelToLoadJobs.keySet()) {
                if (Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), Env.getCurrentEnv().getCatalogMgr().getDbNullable(l.longValue()).getFullName(), PrivPredicate.LOAD)) {
                    hashMap.putAll(this.dbIdToLabelToLoadJobs.get(l));
                }
            }
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.addAll((Collection) hashMap.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()));
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                try {
                    linkedList.add(((LoadJob) it.next()).getShowInfo());
                } catch (DdlException e) {
                }
            }
            return linkedList;
        } finally {
            readUnlock();
        }
    }

    public void getLoadJobInfo(Load.JobInfo jobInfo) throws DdlException {
        jobInfo.dbName = ClusterNamespace.getFullName(jobInfo.clusterName, jobInfo.dbName);
        Database checkDb = checkDb(jobInfo.dbName);
        readLock();
        try {
            Map<String, List<LoadJob>> map = this.dbIdToLabelToLoadJobs.get(Long.valueOf(checkDb.getId()));
            if (map == null) {
                throw new DdlException("No jobs belong to database(" + jobInfo.dbName + ")");
            }
            List<LoadJob> list = map.get(jobInfo.label);
            if (list == null || list.isEmpty()) {
                throw new DdlException("Unknown job(" + jobInfo.label + ")");
            }
            list.get(list.size() - 1).getJobInfo(jobInfo);
            readUnlock();
        } catch (Throwable th) {
            readUnlock();
            throw th;
        }
    }

    public LoadJob getLoadJob(long j) {
        return this.idToLoadJob.get(Long.valueOf(j));
    }

    public void prepareJobs() {
        analyzeLoadJobs();
        submitJobs();
    }

    private void submitJobs() {
        this.loadJobScheduler.submitJob((List<LoadJob>) this.idToLoadJob.values().stream().filter(loadJob -> {
            return loadJob.state == JobState.PENDING;
        }).collect(Collectors.toList()));
    }

    private void analyzeLoadJobs() {
        for (LoadJob loadJob : this.idToLoadJob.values()) {
            if (loadJob.getState() == JobState.PENDING) {
                loadJob.analyze();
            }
        }
    }

    private Database checkDb(String str) throws DdlException {
        return Env.getCurrentInternalCatalog().getDbOrDdlException(str);
    }

    private void checkLabelUsed(long j, String str) throws DdlException {
        Env.getCurrentEnv().getLoadInstance().isLabelUsed(j, str);
        if (this.dbIdToLabelToLoadJobs.containsKey(Long.valueOf(j))) {
            Map<String, List<LoadJob>> map = this.dbIdToLabelToLoadJobs.get(Long.valueOf(j));
            if (map.containsKey(str) && map.get(str).stream().filter(loadJob -> {
                return loadJob.getState() != JobState.CANCELLED;
            }).findFirst().isPresent()) {
                LOG.warn("Failed to add load job when label {} has been used.", str);
                throw new LabelAlreadyUsedException(str);
            }
        }
    }

    public void cleanLabel(CleanLabelStmt cleanLabelStmt) throws DdlException {
        String db = cleanLabelStmt.getDb();
        cleanLabelInternal(Env.getCurrentInternalCatalog().getDbOrDdlException(db).getId(), cleanLabelStmt.getLabel(), false);
    }

    public void replayCleanLabel(CleanLabelOperationLog cleanLabelOperationLog) {
        cleanLabelInternal(cleanLabelOperationLog.getDbId(), cleanLabelOperationLog.getLabel(), true);
    }

    private void cleanLabelInternal(long j, String str, boolean z) {
        int i = 0;
        writeLock();
        try {
            if (this.dbIdToLabelToLoadJobs.containsKey(Long.valueOf(j))) {
                Map<String, List<LoadJob>> map = this.dbIdToLabelToLoadJobs.get(Long.valueOf(j));
                if (Strings.isNullOrEmpty(str)) {
                    Iterator<Map.Entry<String, List<LoadJob>>> it = map.entrySet().iterator();
                    while (it.hasNext()) {
                        List<LoadJob> value = it.next().getValue();
                        Iterator<LoadJob> it2 = value.iterator();
                        while (it2.hasNext()) {
                            LoadJob next = it2.next();
                            if (next.isCompleted()) {
                                it2.remove();
                                this.idToLoadJob.remove(Long.valueOf(next.getId()));
                                i++;
                            }
                        }
                        if (value.isEmpty()) {
                            it.remove();
                        }
                    }
                } else {
                    List<LoadJob> list = map.get(str);
                    if (list == null) {
                        return;
                    }
                    Iterator<LoadJob> it3 = list.iterator();
                    while (it3.hasNext()) {
                        LoadJob next2 = it3.next();
                        if (next2.isCompleted()) {
                            it3.remove();
                            this.idToLoadJob.remove(Long.valueOf(next2.getId()));
                            i++;
                        }
                    }
                    if (list.isEmpty()) {
                        map.remove(str);
                    }
                }
            }
            writeUnlock();
            LOG.info("clean {} labels on db {} with label '{}' in load mgr.", Integer.valueOf(i), Long.valueOf(j), str);
            try {
                Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(j).cleanLabel(str);
            } catch (AnalysisException e) {
            }
            if (!z) {
                Env.getCurrentEnv().getEditLog().logCleanLabel(new CleanLabelOperationLog(j, str));
            }
            LOG.info("finished to clean label on db {} with label {}. is replay: {}", Long.valueOf(j), str, Boolean.valueOf(z));
        } finally {
            writeUnlock();
        }
    }

    private void readLock() {
        this.lock.readLock().lock();
    }

    private void readUnlock() {
        this.lock.readLock().unlock();
    }

    private void writeLock() {
        this.lock.writeLock().lock();
    }

    private void writeUnlock() {
        this.lock.writeLock().unlock();
    }

    public void initJobProgress(Long l, TUniqueId tUniqueId, Set<TUniqueId> set, List<Long> list) {
        LoadJob loadJob = this.idToLoadJob.get(l);
        if (loadJob != null) {
            loadJob.initLoadProgress(tUniqueId, set, list);
        }
    }

    public void updateJobProgress(Long l, Long l2, TUniqueId tUniqueId, TUniqueId tUniqueId2, long j, long j2, boolean z) {
        LoadJob loadJob = this.idToLoadJob.get(l);
        if (loadJob != null) {
            loadJob.updateProgress(l2, tUniqueId, tUniqueId2, j, j2, z);
        }
    }

    public void write(DataOutput dataOutput) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        List list = (List) this.idToLoadJob.values().stream().filter(loadJob -> {
            return !loadJob.isExpired(currentTimeMillis);
        }).filter(loadJob2 -> {
            return !(loadJob2 instanceof MiniLoadJob);
        }).collect(Collectors.toList());
        dataOutput.writeInt(list.size());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            ((LoadJob) it.next()).write(dataOutput);
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            LoadJob read = LoadJob.read(dataInput);
            if (!read.isExpired(currentTimeMillis)) {
                if (read.getJobType() == EtlJobType.MINI) {
                    LOG.warn("skip mini load job {} in db {} as it is no longer supported", Long.valueOf(read.getId()), Long.valueOf(read.getDbId()));
                } else {
                    this.idToLoadJob.put(Long.valueOf(read.getId()), read);
                    Map<String, List<LoadJob>> map = this.dbIdToLabelToLoadJobs.get(Long.valueOf(read.getDbId()));
                    if (map == null) {
                        map = Maps.newConcurrentMap();
                        this.dbIdToLabelToLoadJobs.put(Long.valueOf(read.getDbId()), map);
                    }
                    List<LoadJob> list = map.get(read.getLabel());
                    if (list == null) {
                        list = Lists.newArrayList();
                        map.put(read.getLabel(), list);
                    }
                    list.add(read);
                    if (!read.isCompleted()) {
                        Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(read);
                    }
                }
            }
        }
    }

    public long createLoadJobFromStmt(InsertStmt insertStmt) throws DdlException {
        long id = checkDb(insertStmt.getLoadLabel().getDbName()).getId();
        writeLock();
        BrokerDesc brokerDesc = (BrokerDesc) insertStmt.getResourceDesc();
        if (brokerDesc != null) {
            try {
                if (brokerDesc.isMultiLoadBroker()) {
                    if (!Env.getCurrentEnv().getLoadInstance().isUncommittedLabel(id, insertStmt.getLoadLabel().getLabelName())) {
                        throw new DdlException("label: " + insertStmt.getLoadLabel().getLabelName() + " not found!");
                    }
                    BulkLoadJob fromInsertStmt = BulkLoadJob.fromInsertStmt(insertStmt);
                    createLoadJob(fromInsertStmt);
                    writeUnlock();
                    Env.getCurrentEnv().getEditLog().logCreateLoadJob(fromInsertStmt);
                    this.loadJobScheduler.submitJob(fromInsertStmt);
                    return fromInsertStmt.getId();
                }
            } catch (Throwable th) {
                writeUnlock();
                throw th;
            }
        }
        checkLabelUsed(id, insertStmt.getLoadLabel().getLabelName());
        if (brokerDesc == null && insertStmt.getResourceDesc() == null) {
            throw new DdlException("LoadManager only support the broker and spark load.");
        }
        if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) {
            throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " unfinished load jobs, please retry later. You can use `SHOW LOAD` to view submitted jobs");
        }
        BulkLoadJob fromInsertStmt2 = BulkLoadJob.fromInsertStmt(insertStmt);
        createLoadJob(fromInsertStmt2);
        writeUnlock();
        Env.getCurrentEnv().getEditLog().logCreateLoadJob(fromInsertStmt2);
        this.loadJobScheduler.submitJob(fromInsertStmt2);
        return fromInsertStmt2.getId();
    }
}
