package org.apache.doris.load.loadv2;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.ExportStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.nereids.trees.expressions.functions.AggStateFunctionBuilder;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TBrokerScanRangeParams;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TDescriptorTable;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TabletQuorumFailedException;
import org.apache.doris.transaction.TransactionState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/loadv2/SparkLoadJob.class */
public class SparkLoadJob extends BulkLoadJob {
    private static final Logger LOG = LogManager.getLogger(SparkLoadJob.class);
    private SparkResource sparkResource;
    private long etlStartTimestamp;
    private String appId;
    private String etlOutputPath;
    private Map<String, Pair<String, Long>> tabletMetaToFileInfo;
    private ResourceDesc resourceDesc;
    private SparkLoadAppHandle sparkLoadAppHandle;
    private long quorumFinishTimestamp;
    private Map<Long, Set<Long>> tableToLoadPartitions;
    private Map<Long, PushBrokerReaderParams> indexToPushBrokerReaderParams;
    private Map<Long, Integer> indexToSchemaHash;
    private Map<Long, Map<Long, PushTask>> tabletToSentReplicaPushTask;
    private Set<Long> finishedReplicas;
    private Set<Long> quorumTablets;
    private Set<Long> fullTablets;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.doris.load.loadv2.SparkLoadJob$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/doris/load/loadv2/SparkLoadJob$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$doris$thrift$TEtlState;

        static {
            try {
                $SwitchMap$org$apache$doris$load$loadv2$JobState[JobState.ETL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$doris$load$loadv2$JobState[JobState.LOADING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$doris$thrift$TEtlState = new int[TEtlState.values().length];
            try {
                $SwitchMap$org$apache$doris$thrift$TEtlState[TEtlState.RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$doris$thrift$TEtlState[TEtlState.FINISHED.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$doris$thrift$TEtlState[TEtlState.CANCELLED.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/doris/load/loadv2/SparkLoadJob$PushBrokerReaderParams.class */
    public static class PushBrokerReaderParams {
        TBrokerScanRange tBrokerScanRange = new TBrokerScanRange();
        TDescriptorTable tDescriptorTable = null;

        public void init(List<Column> list, BrokerDesc brokerDesc) throws UserException {
            DescriptorTable descriptorTable = new DescriptorTable();
            TupleDescriptor createTupleDescriptor = descriptorTable.createTupleDescriptor();
            for (Column column : list) {
                SlotDescriptor addSlotDescriptor = descriptorTable.addSlotDescriptor(createTupleDescriptor);
                addSlotDescriptor.setIsMaterialized(true);
                addSlotDescriptor.setColumn(column);
                addSlotDescriptor.setIsNullable(column.isAllowNull());
            }
            initTBrokerScanRange(descriptorTable, createTupleDescriptor, list, brokerDesc);
            initTDescriptorTable(descriptorTable);
        }

        private void initTBrokerScanRange(DescriptorTable descriptorTable, TupleDescriptor tupleDescriptor, List<Column> list, BrokerDesc brokerDesc) throws AnalysisException {
            TBrokerScanRangeParams tBrokerScanRangeParams = new TBrokerScanRangeParams();
            tBrokerScanRangeParams.setStrictMode(false);
            tBrokerScanRangeParams.setProperties(brokerDesc.getProperties());
            TupleDescriptor createTupleDescriptor = descriptorTable.createTupleDescriptor();
            HashMap newHashMap = Maps.newHashMap();
            for (Column column : list) {
                SlotDescriptor addSlotDescriptor = descriptorTable.addSlotDescriptor(createTupleDescriptor);
                addSlotDescriptor.setIsMaterialized(true);
                addSlotDescriptor.setIsNullable(true);
                if (column.getDataType() == PrimitiveType.BITMAP) {
                    addSlotDescriptor.setType(ScalarType.createType(PrimitiveType.BITMAP));
                    addSlotDescriptor.setColumn(new Column(column.getName(), PrimitiveType.BITMAP));
                } else {
                    addSlotDescriptor.setType(ScalarType.createType(PrimitiveType.VARCHAR));
                    addSlotDescriptor.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR));
                }
                tBrokerScanRangeParams.addToSrcSlotIds(addSlotDescriptor.getId().asInt());
                newHashMap.put(column.getName(), addSlotDescriptor);
            }
            HashMap newHashMap2 = Maps.newHashMap();
            Iterator<SlotDescriptor> it = tupleDescriptor.getSlots().iterator();
            while (it.hasNext()) {
                SlotDescriptor next = it.next();
                if (next.isMaterialized()) {
                    SlotDescriptor slotDescriptor = (SlotDescriptor) newHashMap.get(next.getColumn().getName());
                    newHashMap2.put(Integer.valueOf(next.getId().asInt()), Integer.valueOf(slotDescriptor.getId().asInt()));
                    tBrokerScanRangeParams.putToExprOfDestSlot(next.getId().asInt(), castToSlot(next, new SlotRef(slotDescriptor)).treeToThrift());
                }
            }
            tBrokerScanRangeParams.setDestSidToSrcSidWithoutTrans(newHashMap2);
            tBrokerScanRangeParams.setSrcTupleId(createTupleDescriptor.getId().asInt());
            tBrokerScanRangeParams.setDestTupleId(tupleDescriptor.getId().asInt());
            this.tBrokerScanRange.setParams(tBrokerScanRangeParams);
            this.tBrokerScanRange.setBrokerAddresses(Lists.newArrayList());
            TBrokerRangeDesc tBrokerRangeDesc = new TBrokerRangeDesc();
            tBrokerRangeDesc.setFileType(TFileType.FILE_BROKER);
            tBrokerRangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
            tBrokerRangeDesc.setSplittable(false);
            tBrokerRangeDesc.setStartOffset(0L);
            tBrokerRangeDesc.setSize(-1L);
            this.tBrokerScanRange.setRanges(Lists.newArrayList(new TBrokerRangeDesc[]{tBrokerRangeDesc}));
        }

        private Expr castToSlot(SlotDescriptor slotDescriptor, Expr expr) throws AnalysisException {
            PrimitiveType primitiveType = slotDescriptor.getType().getPrimitiveType();
            PrimitiveType primitiveType2 = expr.getType().getPrimitiveType();
            return (primitiveType == PrimitiveType.BOOLEAN && primitiveType2 == PrimitiveType.VARCHAR) ? new CastExpr((Type) Type.BOOLEAN, (Expr) new CastExpr((Type) Type.TINYINT, expr)) : primitiveType != primitiveType2 ? expr.castTo(slotDescriptor.getType()) : expr;
        }

        private void initTDescriptorTable(DescriptorTable descriptorTable) {
            descriptorTable.computeStatAndMemLayout();
            this.tDescriptorTable = descriptorTable.toThrift();
        }
    }

    /* loaded from: input_file:org/apache/doris/load/loadv2/SparkLoadJob$SparkLoadJobStateUpdateInfo.class */
    public static class SparkLoadJobStateUpdateInfo extends LoadJob.LoadJobStateUpdateInfo {

        @SerializedName("sparkLoadAppHandle")
        private SparkLoadAppHandle sparkLoadAppHandle;

        @SerializedName("etlStartTimestamp")
        private long etlStartTimestamp;

        @SerializedName("appId")
        private String appId;

        @SerializedName("etlOutputPath")
        private String etlOutputPath;

        @SerializedName("tabletMetaToFileInfo")
        private Map<String, Pair<String, Long>> tabletMetaToFileInfo;

        public SparkLoadJobStateUpdateInfo(long j, JobState jobState, long j2, SparkLoadAppHandle sparkLoadAppHandle, long j3, String str, String str2, long j4, Map<String, Pair<String, Long>> map) {
            super(j, jobState, j2, j4);
            this.sparkLoadAppHandle = sparkLoadAppHandle;
            this.etlStartTimestamp = j3;
            this.appId = str;
            this.etlOutputPath = str2;
            this.tabletMetaToFileInfo = map;
        }

        public SparkLoadAppHandle getSparkLoadAppHandle() {
            return this.sparkLoadAppHandle;
        }

        public long getEtlStartTimestamp() {
            return this.etlStartTimestamp;
        }

        public String getAppId() {
            return this.appId;
        }

        public String getEtlOutputPath() {
            return this.etlOutputPath;
        }

        public Map<String, Pair<String, Long>> getTabletMetaToFileInfo() {
            return this.tabletMetaToFileInfo;
        }
    }

    public SparkLoadJob() {
        super(EtlJobType.SPARK);
        this.etlStartTimestamp = -1L;
        this.appId = "";
        this.etlOutputPath = "";
        this.tabletMetaToFileInfo = Maps.newHashMap();
        this.sparkLoadAppHandle = new SparkLoadAppHandle();
        this.quorumFinishTimestamp = -1L;
        this.tableToLoadPartitions = Maps.newHashMap();
        this.indexToPushBrokerReaderParams = Maps.newHashMap();
        this.indexToSchemaHash = Maps.newHashMap();
        this.tabletToSentReplicaPushTask = Maps.newHashMap();
        this.finishedReplicas = Sets.newHashSet();
        this.quorumTablets = Sets.newHashSet();
        this.fullTablets = Sets.newHashSet();
    }

    public SparkLoadJob(long j, String str, ResourceDesc resourceDesc, OriginStatement originStatement, UserIdentity userIdentity) throws MetaNotFoundException {
        super(EtlJobType.SPARK, j, str, originStatement, userIdentity);
        this.etlStartTimestamp = -1L;
        this.appId = "";
        this.etlOutputPath = "";
        this.tabletMetaToFileInfo = Maps.newHashMap();
        this.sparkLoadAppHandle = new SparkLoadAppHandle();
        this.quorumFinishTimestamp = -1L;
        this.tableToLoadPartitions = Maps.newHashMap();
        this.indexToPushBrokerReaderParams = Maps.newHashMap();
        this.indexToSchemaHash = Maps.newHashMap();
        this.tabletToSentReplicaPushTask = Maps.newHashMap();
        this.finishedReplicas = Sets.newHashSet();
        this.quorumTablets = Sets.newHashSet();
        this.fullTablets = Sets.newHashSet();
        this.resourceDesc = resourceDesc;
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    public void setJobProperties(Map<String, String> map) throws DdlException {
        super.setJobProperties(map);
        setResourceInfo();
    }

    private void setResourceInfo() throws DdlException {
        if (this.resourceDesc == null) {
            return;
        }
        String name = this.resourceDesc.getName();
        Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(name);
        if (resource == null) {
            throw new DdlException("Resource does not exist. name: " + name);
        }
        this.sparkResource = ((SparkResource) resource).getCopiedResource();
        this.sparkResource.update(this.resourceDesc);
        this.brokerDesc = new BrokerDesc(this.sparkResource.getBroker(), this.sparkResource.getBrokerPropertiesWithoutPrefix());
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException, QuotaExceedException, MetaNotFoundException {
        this.transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(this.dbId, Lists.newArrayList(this.fileGroupAggInfo.getAllTableIds()), this.label, null, new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), TransactionState.LoadJobSourceType.FRONTEND, this.id, getTimeout());
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    protected void unprotectedExecuteJob() throws LoadException {
        try {
            beginTxn();
            SparkLoadPendingTask sparkLoadPendingTask = new SparkLoadPendingTask(this, this.fileGroupAggInfo.getAggKeyToFileGroups(), this.sparkResource, this.brokerDesc, getPriority());
            sparkLoadPendingTask.init();
            this.idToTasks.put(Long.valueOf(sparkLoadPendingTask.getSignature()), sparkLoadPendingTask);
            Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(sparkLoadPendingTask);
        } catch (UserException e) {
            LOG.warn("failed to begin transaction for spark load job {}", Long.valueOf(this.id), e);
            throw new LoadException(e.getMessage());
        }
    }

    @Override // org.apache.doris.load.loadv2.LoadJob, org.apache.doris.load.loadv2.LoadTaskCallback
    public void onTaskFinished(TaskAttachment taskAttachment) {
        if (taskAttachment instanceof SparkPendingTaskAttachment) {
            onPendingTaskFinished((SparkPendingTaskAttachment) taskAttachment);
        }
    }

    private void onPendingTaskFinished(SparkPendingTaskAttachment sparkPendingTaskAttachment) {
        writeLock();
        try {
            if (isTxnDone()) {
                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add(AggStateFunctionBuilder.STATE, this.state).add("error_msg", "this task will be ignored when job is: " + this.state).build());
                return;
            }
            if (this.finishedTaskIds.contains(Long.valueOf(sparkPendingTaskAttachment.getTaskId()))) {
                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("task_id", sparkPendingTaskAttachment.getTaskId()).add("error_msg", "this is a duplicated callback of pending task when broker already has loading task").build());
                return;
            }
            this.finishedTaskIds.add(Long.valueOf(sparkPendingTaskAttachment.getTaskId()));
            this.sparkLoadAppHandle = sparkPendingTaskAttachment.getHandle();
            this.appId = sparkPendingTaskAttachment.getAppId();
            this.etlOutputPath = sparkPendingTaskAttachment.getOutputPath();
            executeEtl();
            unprotectedLogUpdateStateInfo();
        } finally {
            writeUnlock();
        }
    }

    private void executeEtl() {
        this.etlStartTimestamp = System.currentTimeMillis();
        this.state = JobState.ETL;
        LOG.info("update to {} state success. job id: {}", this.state, Long.valueOf(this.id));
    }

    private boolean checkState(JobState jobState) {
        readLock();
        try {
            return this.state == jobState;
        } finally {
            readUnlock();
        }
    }

    public void updateEtlStatus() throws Exception {
        if (checkState(JobState.ETL)) {
            SparkEtlJobHandler sparkEtlJobHandler = new SparkEtlJobHandler();
            EtlStatus etlJobStatus = sparkEtlJobHandler.getEtlJobStatus(this.sparkLoadAppHandle, this.appId, this.id, this.etlOutputPath, this.sparkResource, this.brokerDesc);
            writeLock();
            try {
                switch (AnonymousClass1.$SwitchMap$org$apache$doris$thrift$TEtlState[etlJobStatus.getState().ordinal()]) {
                    case 1:
                        unprotectedUpdateEtlStatusInternal(etlJobStatus);
                        break;
                    case 2:
                        unprotectedProcessEtlFinish(etlJobStatus, sparkEtlJobHandler);
                        break;
                    case 3:
                        throw new LoadException("spark etl job failed. msg: " + etlJobStatus.getFailMsg());
                    default:
                        LOG.warn("unknown etl state: {}", etlJobStatus.getState().name());
                        break;
                }
                if (checkState(JobState.LOADING)) {
                    submitPushTasks();
                }
            } finally {
                writeUnlock();
            }
        }
    }

    private void unprotectedUpdateEtlStatusInternal(EtlStatus etlStatus) {
        this.loadingStatus = etlStatus;
        this.progress = etlStatus.getProgress();
        if (!this.sparkResource.isYarnMaster()) {
            this.loadingStatus.setTrackingUrl(this.appId);
        }
        DppResult dppResult = etlStatus.getDppResult();
        if (dppResult != null) {
            this.loadStatistic.fileNum = (int) dppResult.fileNumber;
            this.loadStatistic.totalFileSizeB = dppResult.fileSize;
            TUniqueId tUniqueId = new TUniqueId(0L, 0L);
            this.loadStatistic.initLoad(tUniqueId, Sets.newHashSet(new TUniqueId[]{tUniqueId}), Lists.newArrayList(new Long[]{-1L}));
            this.loadStatistic.updateLoadProgress(-1L, tUniqueId, tUniqueId, dppResult.scannedRows, dppResult.scannedBytes, true);
            Map<String, String> counters = this.loadingStatus.getCounters();
            counters.put(LoadEtlTask.DPP_NORMAL_ALL, String.valueOf(dppResult.normalRows));
            counters.put(LoadEtlTask.DPP_ABNORMAL_ALL, String.valueOf(dppResult.abnormalRows));
            counters.put(LoadJob.UNSELECTED_ROWS, String.valueOf(dppResult.unselectRows));
        }
    }

    private void unprotectedProcessEtlFinish(EtlStatus etlStatus, SparkEtlJobHandler sparkEtlJobHandler) throws Exception {
        unprotectedUpdateEtlStatusInternal(etlStatus);
        if (!checkDataQuality()) {
            throw new DataQualityException(DataQualityException.QUALITY_FAIL_MSG);
        }
        unprotectedUpdateToLoadingState(etlStatus, sparkEtlJobHandler.getEtlFilePaths(this.etlOutputPath, this.brokerDesc));
        unprotectedLogUpdateStateInfo();
        unprotectedPrepareLoadingInfos();
    }

    private void unprotectedUpdateToLoadingState(EtlStatus etlStatus, Map<String, Long> map) throws LoadException {
        try {
            for (Map.Entry<String, Long> entry : map.entrySet()) {
                String key = entry.getKey();
                if (key.endsWith(FileFormatConstants.FORMAT_PARQUET)) {
                    this.tabletMetaToFileInfo.put(EtlJobConfig.getTabletMetaStr(key), Pair.of(key, entry.getValue()));
                }
            }
            this.loadingStatus = etlStatus;
            this.progress = 0;
            unprotectedUpdateState(JobState.LOADING);
            LOG.info("update to {} state success. job id: {}", this.state, Long.valueOf(this.id));
        } catch (Exception e) {
            LOG.warn("update to {} state failed. job id: {}", this.state, Long.valueOf(this.id), e);
            throw new LoadException(e.getMessage(), e);
        }
    }

    private void unprotectedPrepareLoadingInfos() {
        Iterator<String> it = this.tabletMetaToFileInfo.keySet().iterator();
        while (it.hasNext()) {
            String[] split = it.next().split("\\.");
            Preconditions.checkState(split.length == 5);
            long parseLong = Long.parseLong(split[0]);
            long parseLong2 = Long.parseLong(split[1]);
            long parseLong3 = Long.parseLong(split[2]);
            int parseInt = Integer.parseInt(split[4]);
            if (!this.tableToLoadPartitions.containsKey(Long.valueOf(parseLong))) {
                this.tableToLoadPartitions.put(Long.valueOf(parseLong), Sets.newHashSet());
            }
            this.tableToLoadPartitions.get(Long.valueOf(parseLong)).add(Long.valueOf(parseLong2));
            this.indexToSchemaHash.put(Long.valueOf(parseLong3), Integer.valueOf(parseInt));
        }
    }

    private PushBrokerReaderParams getPushBrokerReaderParams(OlapTable olapTable, long j) throws UserException {
        if (!this.indexToPushBrokerReaderParams.containsKey(Long.valueOf(j))) {
            PushBrokerReaderParams pushBrokerReaderParams = new PushBrokerReaderParams();
            ArrayList arrayList = new ArrayList();
            olapTable.getSchemaByIndexId(Long.valueOf(j)).forEach(column -> {
                Column column = new Column(column);
                column.setName(column.getName().toLowerCase(Locale.ROOT));
                arrayList.add(column);
            });
            pushBrokerReaderParams.init(arrayList, this.brokerDesc);
            this.indexToPushBrokerReaderParams.put(Long.valueOf(j), pushBrokerReaderParams);
        }
        return this.indexToPushBrokerReaderParams.get(Long.valueOf(j));
    }

    private Set<Long> submitPushTasks() throws UserException {
        try {
            Database db = getDb();
            AgentBatchTask agentBatchTask = new AgentBatchTask();
            boolean z = false;
            HashSet newHashSet = Sets.newHashSet();
            List<Table> tablesOnIdOrderOrThrowException = db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(this.tableToLoadPartitions.keySet()));
            MetaLockUtils.readLockTables(tablesOnIdOrderOrThrowException);
            try {
                writeLock();
                try {
                    if (this.state != JobState.LOADING) {
                        LOG.warn("job state is not loading. job id: {}, state: {}", Long.valueOf(this.id), this.state);
                        writeUnlock();
                        MetaLockUtils.readUnlockTables(tablesOnIdOrderOrThrowException);
                        return newHashSet;
                    }
                    for (Table table : tablesOnIdOrderOrThrowException) {
                        Set<Long> set = this.tableToLoadPartitions.get(Long.valueOf(table.getId()));
                        OlapTable olapTable = (OlapTable) table;
                        Iterator<Long> it = set.iterator();
                        while (it.hasNext()) {
                            long longValue = it.next().longValue();
                            Partition partition = olapTable.getPartition(longValue);
                            if (partition == null) {
                                LOG.warn("partition does not exist. id: {}", Long.valueOf(longValue));
                            } else {
                                z = true;
                                int totalReplicaNum = (olapTable.getPartitionInfo().getReplicaAllocation(longValue).getTotalReplicaNum() / 2) + 1;
                                for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
                                    long id = materializedIndex.getId();
                                    int intValue = this.indexToSchemaHash.get(Long.valueOf(id)).intValue();
                                    ArrayList arrayList = new ArrayList();
                                    Iterator<Column> it2 = olapTable.getSchemaByIndexId(Long.valueOf(id)).iterator();
                                    while (it2.hasNext()) {
                                        TColumn thrift = it2.next().toThrift();
                                        thrift.setColumnName(thrift.getColumnName().toLowerCase(Locale.ROOT));
                                        arrayList.add(thrift);
                                    }
                                    int i = 0;
                                    for (Tablet tablet : materializedIndex.getTablets()) {
                                        long id2 = tablet.getId();
                                        newHashSet.add(Long.valueOf(id2));
                                        int i2 = i;
                                        i++;
                                        String format = String.format("%d.%d.%d.%d.%d", Long.valueOf(olapTable.getId()), Long.valueOf(longValue), Long.valueOf(id), Integer.valueOf(i2), Integer.valueOf(intValue));
                                        HashSet newHashSet2 = Sets.newHashSet();
                                        HashSet newHashSet3 = Sets.newHashSet();
                                        for (Replica replica : tablet.getReplicas()) {
                                            long id3 = replica.getId();
                                            newHashSet2.add(Long.valueOf(id3));
                                            if (!this.tabletToSentReplicaPushTask.containsKey(Long.valueOf(id2)) || !this.tabletToSentReplicaPushTask.get(Long.valueOf(id2)).containsKey(Long.valueOf(id3))) {
                                                long backendId = replica.getBackendId();
                                                long nextTransactionId = Env.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();
                                                PushBrokerReaderParams pushBrokerReaderParams = getPushBrokerReaderParams(olapTable, id);
                                                TBrokerScanRange tBrokerScanRange = new TBrokerScanRange(pushBrokerReaderParams.tBrokerScanRange);
                                                TBrokerRangeDesc tBrokerRangeDesc = (TBrokerRangeDesc) tBrokerScanRange.getRanges().get(0);
                                                tBrokerRangeDesc.setPath("");
                                                tBrokerRangeDesc.setFileSize(-1L);
                                                if (this.tabletMetaToFileInfo.containsKey(format)) {
                                                    Pair<String, Long> pair = this.tabletMetaToFileInfo.get(format);
                                                    tBrokerRangeDesc.setPath((String) pair.first);
                                                    tBrokerRangeDesc.setFileSize(((Long) pair.second).longValue());
                                                }
                                                Env.getCurrentEnv();
                                                FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getBroker(this.brokerDesc.getName(), Env.getCurrentSystemInfo().getBackend(backendId).getHost());
                                                tBrokerScanRange.getBrokerAddresses().add(new TNetworkAddress(broker.host, broker.port));
                                                LOG.debug("push task for replica {}, broker {}:{}, backendId {}, filePath {}, fileSize {}", Long.valueOf(id3), broker.host, Integer.valueOf(broker.port), Long.valueOf(backendId), tBrokerRangeDesc.path, Long.valueOf(tBrokerRangeDesc.file_size));
                                                PushTask pushTask = new PushTask(backendId, this.dbId, olapTable.getId(), longValue, id, id2, id3, intValue, 0, this.id, TPushType.LOAD_V2, TPriority.NORMAL, this.transactionId, nextTransactionId, tBrokerScanRange, pushBrokerReaderParams.tDescriptorTable, arrayList);
                                                if (AgentTaskQueue.addTask(pushTask)) {
                                                    agentBatchTask.addTask(pushTask);
                                                    if (!this.tabletToSentReplicaPushTask.containsKey(Long.valueOf(id2))) {
                                                        this.tabletToSentReplicaPushTask.put(Long.valueOf(id2), Maps.newHashMap());
                                                    }
                                                    this.tabletToSentReplicaPushTask.get(Long.valueOf(id2)).put(Long.valueOf(id3), pushTask);
                                                }
                                            }
                                            if (this.finishedReplicas.contains(Long.valueOf(id3)) && replica.getLastFailedVersion() < 0) {
                                                newHashSet3.add(Long.valueOf(id3));
                                            }
                                        }
                                        if (newHashSet2.size() == 0) {
                                            LOG.error("invalid situation. tablet is empty. id: {}", Long.valueOf(id2));
                                        }
                                        if (newHashSet3.size() >= totalReplicaNum) {
                                            this.quorumTablets.add(Long.valueOf(id2));
                                            if (newHashSet3.size() == newHashSet2.size()) {
                                                this.fullTablets.add(Long.valueOf(id2));
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                    if (agentBatchTask.getTaskNum() > 0) {
                        AgentTaskExecutor.submit(agentBatchTask);
                    }
                    if (!z) {
                        throw new LoadException(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("database_id", this.dbId).add(ExportStmt.LABEL, this.label).add("error_msg", "all partitions have no load data").build());
                    }
                    MetaLockUtils.readUnlockTables(tablesOnIdOrderOrThrowException);
                    return newHashSet;
                } finally {
                    writeUnlock();
                }
            } catch (Throwable th) {
                MetaLockUtils.readUnlockTables(tablesOnIdOrderOrThrowException);
                throw th;
            }
        } catch (MetaNotFoundException e) {
            throw new MetaNotFoundException(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("database_id", this.dbId).add(ExportStmt.LABEL, this.label).add("error_msg", "db has been deleted when job is loading").build());
        }
    }

    public void addFinishedReplica(long j, long j2, long j3) {
        writeLock();
        try {
            if (this.finishedReplicas.add(Long.valueOf(j))) {
                this.commitInfos.add(new TabletCommitInfo(j2, j3));
                Map<Long, PushTask> map = this.tabletToSentReplicaPushTask.get(Long.valueOf(j2));
                if (map != null && map.containsKey(Long.valueOf(j))) {
                    map.put(Long.valueOf(j), null);
                }
            }
        } finally {
            writeUnlock();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:32:0x009d, code lost:
    
        if (r5.fullTablets.containsAll(r0) != false) goto L24;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void updateLoadingStatus() throws org.apache.doris.common.UserException {
        /*
            r5 = this;
            r0 = r5
            org.apache.doris.load.loadv2.JobState r1 = org.apache.doris.load.loadv2.JobState.LOADING
            boolean r0 = r0.checkState(r1)
            if (r0 != 0) goto Lb
            return
        Lb:
            r0 = r5
            java.util.Set r0 = r0.submitPushTasks()
            r6 = r0
            r0 = r6
            boolean r0 = r0.isEmpty()
            if (r0 == 0) goto L30
            org.apache.logging.log4j.Logger r0 = org.apache.doris.load.loadv2.SparkLoadJob.LOG
            java.lang.String r1 = "total tablets set is empty. job id: {}, state: {}"
            r2 = r5
            long r2 = r2.id
            java.lang.Long r2 = java.lang.Long.valueOf(r2)
            r3 = r5
            org.apache.doris.load.loadv2.JobState r3 = r3.state
            r0.warn(r1, r2, r3)
            return
        L30:
            r0 = 0
            r7 = r0
            r0 = r5
            r0.writeLock()
            r0 = r5
            r1 = r5
            java.util.Set<java.lang.Long> r1 = r1.fullTablets     // Catch: java.lang.Throwable -> La9
            int r1 = r1.size()     // Catch: java.lang.Throwable -> La9
            r2 = 100
            int r1 = r1 * r2
            r2 = r6
            int r2 = r2.size()     // Catch: java.lang.Throwable -> La9
            int r1 = r1 / r2
            r0.progress = r1     // Catch: java.lang.Throwable -> La9
            r0 = r5
            int r0 = r0.progress     // Catch: java.lang.Throwable -> La9
            r1 = 100
            if (r0 != r1) goto L5c
            r0 = r5
            r1 = 99
            r0.progress = r1     // Catch: java.lang.Throwable -> La9
        L5c:
            r0 = r5
            long r0 = r0.quorumFinishTimestamp     // Catch: java.lang.Throwable -> La9
            r1 = 0
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 >= 0) goto L79
            r0 = r5
            java.util.Set<java.lang.Long> r0 = r0.quorumTablets     // Catch: java.lang.Throwable -> La9
            r1 = r6
            boolean r0 = r0.containsAll(r1)     // Catch: java.lang.Throwable -> La9
            if (r0 == 0) goto L79
            r0 = r5
            long r1 = java.lang.System.currentTimeMillis()     // Catch: java.lang.Throwable -> La9
            r0.quorumFinishTimestamp = r1     // Catch: java.lang.Throwable -> La9
        L79:
            r0 = 300000(0x493e0, double:1.482197E-318)
            r8 = r0
            r0 = r5
            long r0 = r0.quorumFinishTimestamp     // Catch: java.lang.Throwable -> La9
            r1 = 0
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 <= 0) goto L93
            long r0 = java.lang.System.currentTimeMillis()     // Catch: java.lang.Throwable -> La9
            r1 = r5
            long r1 = r1.quorumFinishTimestamp     // Catch: java.lang.Throwable -> La9
            long r0 = r0 - r1
            r1 = r8
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 > 0) goto La0
        L93:
            r0 = r5
            java.util.Set<java.lang.Long> r0 = r0.fullTablets     // Catch: java.lang.Throwable -> La9
            r1 = r6
            boolean r0 = r0.containsAll(r1)     // Catch: java.lang.Throwable -> La9
            if (r0 == 0) goto La2
        La0:
            r0 = 1
            r7 = r0
        La2:
            r0 = r5
            r0.writeUnlock()
            goto Lb2
        La9:
            r10 = move-exception
            r0 = r5
            r0.writeUnlock()
            r0 = r10
            throw r0
        Lb2:
            r0 = r7
            if (r0 == 0) goto Lba
            r0 = r5
            r0.tryCommitJob()
        Lba:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.doris.load.loadv2.SparkLoadJob.updateLoadingStatus():void");
    }

    private void tryCommitJob() throws UserException {
        LOG.info(new LogBuilder(LogKey.LOAD_JOB, Long.valueOf(this.id)).add("txn_id", this.transactionId).add("msg", "Load job try to commit txn").build());
        List<Table> tablesOnIdOrderOrThrowException = getDb().getTablesOnIdOrderOrThrowException(Lists.newArrayList(this.tableToLoadPartitions.keySet()));
        MetaLockUtils.writeLockTablesOrMetaException(tablesOnIdOrderOrThrowException);
        try {
            Env.getCurrentGlobalTransactionMgr().commitTransaction(this.dbId, tablesOnIdOrderOrThrowException, this.transactionId, this.commitInfos, new LoadJobFinalOperation(this.id, this.loadingStatus, this.progress, this.loadStartTimestamp, this.finishTimestamp, this.state, this.failMsg));
            MetaLockUtils.writeUnlockTables(tablesOnIdOrderOrThrowException);
        } catch (TabletQuorumFailedException e) {
            MetaLockUtils.writeUnlockTables(tablesOnIdOrderOrThrowException);
        } catch (Throwable th) {
            MetaLockUtils.writeUnlockTables(tablesOnIdOrderOrThrowException);
            throw th;
        }
    }

    private void clearJob() {
        Preconditions.checkState(this.state == JobState.FINISHED || this.state == JobState.CANCELLED);
        LOG.debug("kill etl job and delete etl files. id: {}, state: {}", Long.valueOf(this.id), this.state);
        SparkEtlJobHandler sparkEtlJobHandler = new SparkEtlJobHandler();
        if (this.state == JobState.CANCELLED && ((!Strings.isNullOrEmpty(this.appId) && this.sparkResource.isYarnMaster()) || this.sparkLoadAppHandle != null)) {
            try {
                sparkEtlJobHandler.killEtlJob(this.sparkLoadAppHandle, this.appId, this.id, this.sparkResource);
            } catch (Exception e) {
                LOG.warn("kill etl job failed. id: {}, state: {}", Long.valueOf(this.id), this.state, e);
            }
        }
        if (!Strings.isNullOrEmpty(this.etlOutputPath)) {
            try {
                sparkEtlJobHandler.deleteEtlOutputPath(this.etlOutputPath.substring(0, this.etlOutputPath.lastIndexOf(S3URI.PATH_DELIM)), this.brokerDesc);
            } catch (Exception e2) {
                LOG.warn("delete etl files failed. id: {}, state: {}", Long.valueOf(this.id), this.state, e2);
            }
        }
        LOG.debug("clear push tasks and infos that not persist. id: {}, state: {}", Long.valueOf(this.id), this.state);
        writeLock();
        try {
            Iterator<Map<Long, PushTask>> it = this.tabletToSentReplicaPushTask.values().iterator();
            while (it.hasNext()) {
                for (PushTask pushTask : it.next().values()) {
                    if (pushTask != null) {
                        AgentTaskQueue.removeTask(pushTask.getBackendId(), pushTask.getTaskType(), pushTask.getSignature());
                    }
                }
            }
            this.sparkLoadAppHandle = null;
            this.resourceDesc = null;
            this.etlOutputPath = "";
            this.appId = "";
            this.tableToLoadPartitions.clear();
            this.indexToPushBrokerReaderParams.clear();
            this.indexToSchemaHash.clear();
            this.tabletToSentReplicaPushTask.clear();
            this.finishedReplicas.clear();
            this.quorumTablets.clear();
            this.fullTablets.clear();
            writeUnlock();
        } catch (Throwable th) {
            writeUnlock();
            throw th;
        }
    }

    @Override // org.apache.doris.load.loadv2.LoadJob, org.apache.doris.transaction.AbstractTxnStateChangeCallback, org.apache.doris.transaction.TxnStateChangeCallback
    public void afterVisible(TransactionState transactionState, boolean z) {
        super.afterVisible(transactionState, z);
        clearJob();
    }

    @Override // org.apache.doris.load.loadv2.LoadJob, org.apache.doris.transaction.AbstractTxnStateChangeCallback, org.apache.doris.transaction.TxnStateChangeCallback
    public void afterAborted(TransactionState transactionState, boolean z, String str) throws UserException {
        super.afterAborted(transactionState, z, str);
        clearJob();
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    public void cancelJobWithoutCheck(FailMsg failMsg, boolean z, boolean z2) {
        super.cancelJobWithoutCheck(failMsg, z, z2);
        clearJob();
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    public void cancelJob(FailMsg failMsg) throws DdlException {
        super.cancelJob(failMsg);
        clearJob();
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    protected String getResourceName() {
        return this.sparkResource.getName();
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    protected long getEtlStartTimestamp() {
        return this.etlStartTimestamp;
    }

    public SparkLoadAppHandle getHandle() {
        return this.sparkLoadAppHandle;
    }

    public void clearSparkLauncherLog() {
        if (this.sparkLoadAppHandle != null) {
            String logPath = this.sparkLoadAppHandle.getLogPath();
            if (Strings.isNullOrEmpty(logPath)) {
                return;
            }
            File file = new File(logPath);
            if (file.exists()) {
                file.delete();
            }
        }
    }

    @Override // org.apache.doris.load.loadv2.BulkLoadJob, org.apache.doris.load.loadv2.LoadJob
    public void write(DataOutput dataOutput) throws IOException {
        super.write(dataOutput);
        this.sparkResource.write(dataOutput);
        this.sparkLoadAppHandle.write(dataOutput);
        dataOutput.writeLong(this.etlStartTimestamp);
        Text.writeString(dataOutput, this.appId);
        Text.writeString(dataOutput, this.etlOutputPath);
        dataOutput.writeInt(this.tabletMetaToFileInfo.size());
        for (Map.Entry<String, Pair<String, Long>> entry : this.tabletMetaToFileInfo.entrySet()) {
            Text.writeString(dataOutput, entry.getKey());
            Text.writeString(dataOutput, (String) entry.getValue().first);
            dataOutput.writeLong(((Long) entry.getValue().second).longValue());
        }
    }

    @Override // org.apache.doris.load.loadv2.BulkLoadJob, org.apache.doris.load.loadv2.LoadJob
    public void readFields(DataInput dataInput) throws IOException {
        super.readFields(dataInput);
        this.sparkResource = (SparkResource) Resource.read(dataInput);
        this.sparkLoadAppHandle = SparkLoadAppHandle.read(dataInput);
        this.etlStartTimestamp = dataInput.readLong();
        this.appId = Text.readString(dataInput);
        this.etlOutputPath = Text.readString(dataInput);
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            this.tabletMetaToFileInfo.put(Text.readString(dataInput), Pair.of(Text.readString(dataInput), Long.valueOf(dataInput.readLong())));
        }
    }

    private void unprotectedLogUpdateStateInfo() {
        Env.getCurrentEnv().getEditLog().logUpdateLoadJob(new SparkLoadJobStateUpdateInfo(this.id, this.state, this.transactionId, this.sparkLoadAppHandle, this.etlStartTimestamp, this.appId, this.etlOutputPath, this.loadStartTimestamp, this.tabletMetaToFileInfo));
    }

    @Override // org.apache.doris.load.loadv2.LoadJob
    public void replayUpdateStateInfo(LoadJob.LoadJobStateUpdateInfo loadJobStateUpdateInfo) {
        super.replayUpdateStateInfo(loadJobStateUpdateInfo);
        SparkLoadJobStateUpdateInfo sparkLoadJobStateUpdateInfo = (SparkLoadJobStateUpdateInfo) loadJobStateUpdateInfo;
        this.sparkLoadAppHandle = sparkLoadJobStateUpdateInfo.getSparkLoadAppHandle();
        this.etlStartTimestamp = sparkLoadJobStateUpdateInfo.getEtlStartTimestamp();
        this.appId = sparkLoadJobStateUpdateInfo.getAppId();
        this.etlOutputPath = sparkLoadJobStateUpdateInfo.getEtlOutputPath();
        this.tabletMetaToFileInfo = sparkLoadJobStateUpdateInfo.getTabletMetaToFileInfo();
        switch (this.state) {
            case ETL:
                return;
            case LOADING:
                unprotectedPrepareLoadingInfos();
                return;
            default:
                LOG.warn("replay update load job state info failed. error: wrong state. job id: {}, state: {}", Long.valueOf(this.id), this.state);
                return;
        }
    }
}
