package org.apache.doris.load.sync;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.doris.analysis.CreateDataSyncJobStmt;
import org.apache.doris.analysis.PauseSyncJobStmt;
import org.apache.doris.analysis.ResumeSyncJobStmt;
import org.apache.doris.analysis.StopSyncJobStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.sync.SyncFailMsg;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.canal.CanalDestination;
import org.apache.doris.load.sync.canal.CanalSyncJob;
import org.apache.doris.persist.Storage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/sync/SyncJobManager.class */
public class SyncJobManager implements Writable {
    private static final Logger LOG = LogManager.getLogger(SyncJobManager.class);
    private Map<Long, SyncJob> idToSyncJob = Maps.newConcurrentMap();
    private Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = Maps.newConcurrentMap();
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

    public void addDataSyncJob(CreateDataSyncJobStmt createDataSyncJobStmt) throws DdlException {
        SyncJob fromStmt = SyncJob.fromStmt(Env.getCurrentEnv().getNextId(), createDataSyncJobStmt);
        writeLock();
        try {
            checkDuplicateRemote(fromStmt);
            unprotectedAddSyncJob(fromStmt);
            Env.getCurrentEnv().getEditLog().logCreateSyncJob(fromStmt);
            writeUnlock();
            LOG.info(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(fromStmt.getId())).add(Storage.NODE_NAME, fromStmt.getJobName()).add("type", fromStmt.getJobType()).add("config", fromStmt.getJobConfig()).add("msg", "add sync job.").build());
        } catch (Throwable th) {
            writeUnlock();
            throw th;
        }
    }

    private void checkDuplicateRemote(SyncJob syncJob) throws DdlException {
        if (syncJob.getJobType() == DataSyncJobType.CANAL) {
            CanalDestination remote = ((CanalSyncJob) syncJob).getRemote();
            for (SyncJob syncJob2 : (List) this.idToSyncJob.values().stream().filter(syncJob3 -> {
                return !syncJob3.isCompleted();
            }).collect(Collectors.toList())) {
                if ((syncJob2 instanceof CanalSyncJob) && ((CanalSyncJob) syncJob2).getRemote().equals(remote)) {
                    throw new DdlException("Remote Canal instance already exists. conflict destination: " + remote);
                }
            }
        }
    }

    private void unprotectedAddSyncJob(SyncJob syncJob) {
        this.idToSyncJob.put(Long.valueOf(syncJob.getId()), syncJob);
        long dbId = syncJob.getDbId();
        if (!this.dbIdToJobNameToSyncJobs.containsKey(Long.valueOf(dbId))) {
            this.dbIdToJobNameToSyncJobs.put(Long.valueOf(syncJob.getDbId()), Maps.newConcurrentMap());
        }
        Map<String, List<SyncJob>> map = this.dbIdToJobNameToSyncJobs.get(Long.valueOf(dbId));
        if (!map.containsKey(syncJob.getJobName())) {
            map.put(syncJob.getJobName(), Lists.newArrayList());
        }
        map.get(syncJob.getJobName()).add(syncJob);
    }

    public void pauseSyncJob(PauseSyncJobStmt pauseSyncJobStmt) throws UserException {
        String dbFullName = pauseSyncJobStmt.getDbFullName();
        String jobName = pauseSyncJobStmt.getJobName();
        Database dbOrDdlException = Env.getCurrentInternalCatalog().getDbOrDdlException(dbFullName);
        ArrayList newArrayList = Lists.newArrayList();
        readLock();
        try {
            List<SyncJob> syncJobsByDbAndJobName = getSyncJobsByDbAndJobName(dbOrDdlException.getId(), jobName);
            if (syncJobsByDbAndJobName.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }
            List list = (List) syncJobsByDbAndJobName.stream().filter((v0) -> {
                return v0.isRunning();
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                throw new DdlException("There is no running job with jobName `" + pauseSyncJobStmt.getJobName() + "` to pause");
            }
            newArrayList.addAll(list);
            readUnlock();
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                ((SyncJob) it.next()).pause();
            }
        } catch (Throwable th) {
            readUnlock();
            throw th;
        }
    }

    public void resumeSyncJob(ResumeSyncJobStmt resumeSyncJobStmt) throws UserException {
        String dbFullName = resumeSyncJobStmt.getDbFullName();
        String jobName = resumeSyncJobStmt.getJobName();
        Database dbOrDdlException = Env.getCurrentInternalCatalog().getDbOrDdlException(dbFullName);
        ArrayList newArrayList = Lists.newArrayList();
        readLock();
        try {
            List<SyncJob> syncJobsByDbAndJobName = getSyncJobsByDbAndJobName(dbOrDdlException.getId(), jobName);
            if (syncJobsByDbAndJobName.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }
            List list = (List) syncJobsByDbAndJobName.stream().filter((v0) -> {
                return v0.isPaused();
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                throw new DdlException("There is no paused job with jobName `" + resumeSyncJobStmt.getJobName() + "` to resume");
            }
            newArrayList.addAll(list);
            readUnlock();
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                ((SyncJob) it.next()).resume();
            }
        } catch (Throwable th) {
            readUnlock();
            throw th;
        }
    }

    public void stopSyncJob(StopSyncJobStmt stopSyncJobStmt) throws UserException {
        String dbFullName = stopSyncJobStmt.getDbFullName();
        String jobName = stopSyncJobStmt.getJobName();
        Database dbOrDdlException = Env.getCurrentInternalCatalog().getDbOrDdlException(dbFullName);
        ArrayList newArrayList = Lists.newArrayList();
        readLock();
        try {
            List<SyncJob> syncJobsByDbAndJobName = getSyncJobsByDbAndJobName(dbOrDdlException.getId(), jobName);
            if (syncJobsByDbAndJobName.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }
            List list = (List) syncJobsByDbAndJobName.stream().filter(syncJob -> {
                return !syncJob.isCompleted();
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                throw new DdlException("There is no uncompleted job with jobName `" + stopSyncJobStmt.getJobName() + "`");
            }
            newArrayList.addAll(list);
            readUnlock();
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                ((SyncJob) it.next()).cancel(SyncFailMsg.MsgType.USER_CANCEL, "user cancel");
            }
        } catch (Throwable th) {
            readUnlock();
            throw th;
        }
    }

    private List<SyncJob> getSyncJobsByDbAndJobName(long j, String str) {
        ArrayList newArrayList = Lists.newArrayList();
        Map<String, List<SyncJob>> map = this.dbIdToJobNameToSyncJobs.get(Long.valueOf(j));
        if (map != null && map.containsKey(str)) {
            newArrayList.addAll(map.get(str));
        }
        return newArrayList;
    }

    public List<List<Comparable>> getSyncJobsInfoByDbId(long j) {
        LinkedList linkedList = new LinkedList();
        readLock();
        try {
            if (!this.dbIdToJobNameToSyncJobs.containsKey(Long.valueOf(j))) {
                return linkedList;
            }
            Map<String, List<SyncJob>> map = this.dbIdToJobNameToSyncJobs.get(Long.valueOf(j));
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.addAll((Collection) map.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()));
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                linkedList.add(((SyncJob) it.next()).getShowInfo());
            }
            readUnlock();
            return linkedList;
        } finally {
            readUnlock();
        }
    }

    public List<SyncJob> getSyncJobs(SyncJob.JobState jobState) {
        ArrayList newArrayList = Lists.newArrayList();
        readLock();
        try {
            for (SyncJob syncJob : this.idToSyncJob.values()) {
                if (syncJob.getJobState() == jobState) {
                    newArrayList.add(syncJob);
                }
            }
            return newArrayList;
        } finally {
            readUnlock();
        }
    }

    public boolean isJobNameExist(String str, String str2) throws DdlException {
        Database dbOrDdlException = Env.getCurrentInternalCatalog().getDbOrDdlException(str);
        boolean z = false;
        readLock();
        try {
            Map<String, List<SyncJob>> map = this.dbIdToJobNameToSyncJobs.get(Long.valueOf(dbOrDdlException.getId()));
            if (map != null && map.containsKey(str2)) {
                Iterator<SyncJob> it = map.get(str2).iterator();
                while (it.hasNext()) {
                    if (!it.next().isCancelled()) {
                        z = true;
                    }
                }
            }
            return z;
        } finally {
            readUnlock();
        }
    }

    public void updateNeedSchedule() throws UserException {
        for (SyncJob syncJob : this.idToSyncJob.values()) {
            if (!syncJob.isCompleted()) {
                syncJob.checkAndDoUpdate();
            }
        }
    }

    public void cleanOldSyncJobs() {
        LOG.debug("begin to clean old sync jobs ");
        long currentTimeMillis = System.currentTimeMillis();
        writeLock();
        try {
            Iterator<Map.Entry<Long, SyncJob>> it = this.idToSyncJob.entrySet().iterator();
            while (it.hasNext()) {
                SyncJob value = it.next().getValue();
                if (value.isExpired(currentTimeMillis)) {
                    if (this.dbIdToJobNameToSyncJobs.containsKey(Long.valueOf(value.getDbId()))) {
                        Map<String, List<SyncJob>> map = this.dbIdToJobNameToSyncJobs.get(Long.valueOf(value.getDbId()));
                        List<SyncJob> list = map.get(value.getJobName());
                        list.remove(value);
                        if (list.isEmpty()) {
                            map.remove(value.getJobName());
                        }
                        if (map.isEmpty()) {
                            this.dbIdToJobNameToSyncJobs.remove(Long.valueOf(value.getDbId()));
                        }
                        it.remove();
                        LOG.info(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(value.getId())).add("finishTimeMs", value.getFinishTimeMs()).add("currentTimeMs", currentTimeMillis).add("jobState", value.getJobState()).add("msg", "old sync job has been cleaned"));
                    }
                }
            }
        } finally {
            writeUnlock();
        }
    }

    public SyncJob getSyncJobById(long j) {
        return this.idToSyncJob.get(Long.valueOf(j));
    }

    public void readLock() {
        this.lock.readLock().lock();
    }

    public void readUnlock() {
        this.lock.readLock().unlock();
    }

    private void writeLock() {
        this.lock.writeLock().lock();
    }

    private void writeUnlock() {
        this.lock.writeLock().unlock();
    }

    public void write(DataOutput dataOutput) throws IOException {
        Collection<SyncJob> values = this.idToSyncJob.values();
        dataOutput.writeInt(values.size());
        Iterator<SyncJob> it = values.iterator();
        while (it.hasNext()) {
            it.next().write(dataOutput);
        }
    }

    public void readField(DataInput dataInput) throws IOException {
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            unprotectedAddSyncJob(SyncJob.read(dataInput));
        }
    }

    public void replayAddSyncJob(SyncJob syncJob) {
        writeLock();
        try {
            unprotectedAddSyncJob(syncJob);
            LOG.info(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(syncJob.getId())).add("msg", "replay create sync job.").build());
        } finally {
            writeUnlock();
        }
    }

    public void replayUpdateSyncJobState(SyncJob.SyncJobUpdateStateInfo syncJobUpdateStateInfo) {
        writeLock();
        try {
            long id = syncJobUpdateStateInfo.getId();
            SyncJob syncJob = this.idToSyncJob.get(Long.valueOf(id));
            if (syncJob == null) {
                LOG.warn(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(id)).add("msg", "replay update sync job state failed. Job was not found.").build());
                writeUnlock();
            } else {
                syncJob.replayUpdateSyncJobState(syncJobUpdateStateInfo);
                writeUnlock();
            }
        } catch (Throwable th) {
            writeUnlock();
            throw th;
        }
    }
}
