package org.apache.doris.load.loadv2;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.nereids.trees.expressions.functions.AggStateFunctionBuilder;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/loadv2/BrokerLoadJob.class */
public class BrokerLoadJob extends BulkLoadJob {
    private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class);
    private Profile jobProfile;
    private boolean enableProfile;

    public BrokerLoadJob() {
        super(EtlJobType.BROKER);
        this.enableProfile = false;
    }

    public BrokerLoadJob(long j, String str, BrokerDesc brokerDesc, OriginStatement originStatement, UserIdentity userIdentity) throws MetaNotFoundException {
        super(EtlJobType.BROKER, j, str, originStatement, userIdentity);
        this.enableProfile = false;
        this.brokerDesc = brokerDesc;
        if (ConnectContext.get() == null || !ConnectContext.get().getSessionVariable().enableProfile()) {
            return;
        }
        this.enableProfile = true;
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException, QuotaExceedException, MetaNotFoundException {
        this.transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(this.dbId, Lists.newArrayList(this.fileGroupAggInfo.getAllTableIds()), this.label, null, new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, this.id, getTimeout());
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    protected void unprotectedExecuteJob() {
        BrokerLoadPendingTask brokerLoadPendingTask = new BrokerLoadPendingTask(this, this.fileGroupAggInfo.getAggKeyToFileGroups(), this.brokerDesc, getPriority());
        this.idToTasks.put(Long.valueOf(brokerLoadPendingTask.getSignature()), brokerLoadPendingTask);
        Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(brokerLoadPendingTask);
    }

    @Override // org.apache.doris.load.loadv2.LoadJob, org.apache.doris.load.loadv2.LoadTaskCallback
    public void onTaskFinished(TaskAttachment taskAttachment) {
        if (taskAttachment instanceof BrokerPendingTaskAttachment) {
            onPendingTaskFinished((BrokerPendingTaskAttachment) taskAttachment);
        } else if (taskAttachment instanceof BrokerLoadingTaskAttachment) {
            onLoadingTaskFinished((BrokerLoadingTaskAttachment) taskAttachment);
        }
    }

    private void onPendingTaskFinished(BrokerPendingTaskAttachment brokerPendingTaskAttachment) {
        writeLock();
        try {
            if (isTxnDone()) {
                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add(AggStateFunctionBuilder.STATE, this.state).add("error_msg", "this task will be ignored when job is: " + this.state).build());
                return;
            }
            if (this.finishedTaskIds.contains(Long.valueOf(brokerPendingTaskAttachment.getTaskId()))) {
                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("task_id", brokerPendingTaskAttachment.getTaskId()).add("error_msg", "this is a duplicated callback of pending task when broker already has loading task").build());
                return;
            }
            this.finishedTaskIds.add(Long.valueOf(brokerPendingTaskAttachment.getTaskId()));
            try {
                createLoadingTask(getDb(), brokerPendingTaskAttachment);
                this.loadStartTimestamp = System.currentTimeMillis();
            } catch (RejectedExecutionException e) {
                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("database_id", this.dbId).add("error_msg", "the task queque is full.").build(), e);
                cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
            } catch (UserException e2) {
                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("database_id", this.dbId).add("error_msg", "Failed to divide job into loading task.").build(), e2);
                cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, e2.getMessage()), true, true);
            }
        } finally {
            writeUnlock();
        }
    }

    private void createLoadingTask(Database database, BrokerPendingTaskAttachment brokerPendingTaskAttachment) throws UserException {
        List<Table> tablesOnIdOrderOrThrowException = database.getTablesOnIdOrderOrThrowException(Lists.newArrayList(this.fileGroupAggInfo.getAllTableIds()));
        ArrayList newArrayList = Lists.newArrayList();
        this.jobProfile = new Profile("BrokerLoadJob " + this.id + ". " + this.label, true);
        Env.getCurrentProgressManager().registerProgressSimple(String.valueOf(this.id));
        MetaLockUtils.readLockTables(tablesOnIdOrderOrThrowException);
        try {
            for (Map.Entry<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> entry : this.fileGroupAggInfo.getAggKeyToFileGroups().entrySet()) {
                BrokerFileGroupAggInfo.FileGroupAggKey key = entry.getKey();
                List<BrokerFileGroup> value = entry.getValue();
                OlapTable olapTable = (OlapTable) database.getTableNullable(key.getTableId());
                LoadLoadingTask loadLoadingTask = new LoadLoadingTask(database, olapTable, this.brokerDesc, value, getDeadlineMs(), getExecMemLimit(), isStrictMode(), isPartialUpdate(), this.transactionId, this, getTimeZone(), getTimeout(), getLoadParallelism(), getSendBatchParallelism(), getMaxFilterRatio() <= 0.0d, this.enableProfile ? this.jobProfile : null, isSingleTabletLoadPerSink(), useNewLoadScanNode(), getPriority());
                UUID randomUUID = UUID.randomUUID();
                loadLoadingTask.init(new TUniqueId(randomUUID.getMostSignificantBits(), randomUUID.getLeastSignificantBits()), brokerPendingTaskAttachment.getFileStatusByTable(key), brokerPendingTaskAttachment.getFileNumByTable(key), getUserInfo());
                this.idToTasks.put(Long.valueOf(loadLoadingTask.getSignature()), loadLoadingTask);
                newArrayList.add(loadLoadingTask);
                TransactionState transactionState = Env.getCurrentGlobalTransactionMgr().getTransactionState(this.dbId, this.transactionId);
                if (transactionState == null) {
                    throw new UserException("txn does not exist: " + this.transactionId);
                }
                transactionState.addTableIndexes(olapTable);
                if (isPartialUpdate()) {
                    transactionState.setSchemaForPartialUpdate(olapTable);
                }
            }
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                Env.getCurrentEnv().getLoadingLoadTaskScheduler().submit((LoadTask) it.next());
            }
        } finally {
            MetaLockUtils.readUnlockTables(tablesOnIdOrderOrThrowException);
        }
    }

    private void onLoadingTaskFinished(BrokerLoadingTaskAttachment brokerLoadingTaskAttachment) {
        writeLock();
        try {
            if (isTxnDone()) {
                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add(AggStateFunctionBuilder.STATE, this.state).add("error_msg", "this task will be ignored when job is: " + this.state).build());
                return;
            }
            if (this.finishedTaskIds.contains(Long.valueOf(brokerLoadingTaskAttachment.getTaskId()))) {
                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("task_id", brokerLoadingTaskAttachment.getTaskId()).add("error_msg", "this is a duplicated callback of loading task").build());
                return;
            }
            this.finishedTaskIds.add(Long.valueOf(brokerLoadingTaskAttachment.getTaskId()));
            updateLoadingStatus(brokerLoadingTaskAttachment);
            if (this.finishedTaskIds.size() != this.idToTasks.size()) {
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("commit_infos", Joiner.on(",").join(this.commitInfos)).build());
            }
            if (!checkDataQuality()) {
                cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG), true, true);
                return;
            }
            try {
                List<Table> tablesOnIdOrderOrThrowException = getDb().getTablesOnIdOrderOrThrowException(Lists.newArrayList(this.fileGroupAggInfo.getAllTableIds()));
                MetaLockUtils.writeLockTablesOrMetaException(tablesOnIdOrderOrThrowException);
                try {
                    try {
                        LOG.info(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("txn_id", this.transactionId).add("msg", "Load job try to commit txn").build());
                        Env.getCurrentGlobalTransactionMgr().commitTransaction(this.dbId, tablesOnIdOrderOrThrowException, this.transactionId, this.commitInfos, new LoadJobFinalOperation(this.id, this.loadingStatus, this.progress, this.loadStartTimestamp, this.finishTimestamp, this.state, this.failMsg));
                        MetaLockUtils.writeUnlockTables(tablesOnIdOrderOrThrowException);
                    } catch (Throwable th) {
                        MetaLockUtils.writeUnlockTables(tablesOnIdOrderOrThrowException);
                        throw th;
                    }
                } catch (UserException e) {
                    LOG.warn(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("database_id", this.dbId).add("error_msg", "Failed to commit txn with error:" + e.getMessage()).build(), e);
                    cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
                    MetaLockUtils.writeUnlockTables(tablesOnIdOrderOrThrowException);
                }
            } catch (MetaNotFoundException e2) {
                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("database_id", this.dbId).add("error_msg", "db has been deleted when job is loading").build(), e2);
                cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e2.getMessage()), true, true);
            }
        } finally {
            writeUnlock();
        }
    }

    private void writeProfile() {
        if (this.enableProfile) {
            this.jobProfile.update(this.createTimestamp, getSummaryInfo(true), true);
        }
    }

    private Map<String, String> getSummaryInfo(boolean z) {
        long currentTimeMillis = System.currentTimeMillis();
        SummaryProfile.SummaryBuilder summaryBuilder = new SummaryProfile.SummaryBuilder();
        summaryBuilder.profileId(String.valueOf(this.id));
        summaryBuilder.dorisVersion("doris-2.0.3-rc03dev-ffd7521080");
        summaryBuilder.taskType(ProfileManager.ProfileType.LOAD.name());
        summaryBuilder.startTime(TimeUtils.longToTimeString(this.createTimestamp));
        if (z) {
            summaryBuilder.endTime(TimeUtils.longToTimeString(currentTimeMillis));
            summaryBuilder.totalTime(DebugUtil.getPrettyStringMs(currentTimeMillis - this.createTimestamp));
        }
        summaryBuilder.taskState("FINISHED");
        summaryBuilder.user(getUserInfo() != null ? getUserInfo().getQualifiedUser() : "N/A");
        summaryBuilder.defaultDb(getDefaultDb());
        summaryBuilder.sqlStatement(getOriginStmt().originStmt);
        return summaryBuilder.build();
    }

    private String getDefaultDb() {
        Database orElse = Env.getCurrentEnv().getInternalCatalog().getDb(this.dbId).orElse(null);
        return orElse == null ? "N/A" : orElse.getFullName();
    }

    private void updateLoadingStatus(BrokerLoadingTaskAttachment brokerLoadingTaskAttachment) {
        this.loadingStatus.replaceCounter(LoadEtlTask.DPP_ABNORMAL_ALL, increaseCounter(LoadEtlTask.DPP_ABNORMAL_ALL, brokerLoadingTaskAttachment.getCounter(LoadEtlTask.DPP_ABNORMAL_ALL)));
        this.loadingStatus.replaceCounter(LoadEtlTask.DPP_NORMAL_ALL, increaseCounter(LoadEtlTask.DPP_NORMAL_ALL, brokerLoadingTaskAttachment.getCounter(LoadEtlTask.DPP_NORMAL_ALL)));
        this.loadingStatus.replaceCounter(LoadJob.UNSELECTED_ROWS, increaseCounter(LoadJob.UNSELECTED_ROWS, brokerLoadingTaskAttachment.getCounter(LoadJob.UNSELECTED_ROWS)));
        if (brokerLoadingTaskAttachment.getTrackingUrl() != null) {
            this.loadingStatus.setTrackingUrl(brokerLoadingTaskAttachment.getTrackingUrl());
        }
        this.commitInfos.addAll(brokerLoadingTaskAttachment.getCommitInfoList());
        this.errorTabletInfos.addAll((Collection) brokerLoadingTaskAttachment.getErrorTabletInfos().stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList()));
        this.progress = (int) ((this.finishedTaskIds.size() / this.idToTasks.size()) * 100.0d);
        if (this.progress == 100) {
            this.progress = 99;
        }
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    public void updateProgress(Long l, TUniqueId tUniqueId, TUniqueId tUniqueId2, long j, long j2, boolean z) {
        super.updateProgress(l, tUniqueId, tUniqueId2, j, j2, z);
        this.progress = (int) ((this.loadStatistic.getLoadBytes() / this.loadStatistic.totalFileSizeB) * 100.0d);
        if (this.progress >= 100) {
            this.progress = 99;
        }
    }

    private String increaseCounter(String str, String str2) {
        long j = 0;
        if (this.loadingStatus.getCounters().containsKey(str)) {
            j = Long.valueOf(this.loadingStatus.getCounters().get(str)).longValue();
        }
        if (str2 != null) {
            j += Long.valueOf(str2).longValue();
        }
        return String.valueOf(j);
    }

    @Override // org.apache.doris.load.loadv2.LoadJob, org.apache.doris.transaction.AbstractTxnStateChangeCallback, org.apache.doris.transaction.TxnStateChangeCallback
    public void afterVisible(TransactionState transactionState, boolean z) {
        super.afterVisible(transactionState, z);
        writeProfile();
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    protected String getResourceName() {
        StorageBackend.StorageType storageType = this.brokerDesc.getStorageType();
        return storageType == StorageBackend.StorageType.BROKER ? this.brokerDesc.getName() : storageType == StorageBackend.StorageType.S3 ? (String) Optional.ofNullable(this.brokerDesc.getProperties()).map(map -> {
            return (String) map.get(S3Properties.Env.ENDPOINT);
        }).orElse("s3_cluster") : storageType.name().toLowerCase().concat("_cluster");
    }
}
