package org.apache.doris.load.loadv2;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.sparkdpp.EtlJobConfig;
import org.apache.doris.transaction.TransactionState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/loadv2/SparkLoadPendingTask.class */
public class SparkLoadPendingTask extends LoadTask {
    private static final Logger LOG = LogManager.getLogger(SparkLoadPendingTask.class);
    private final Map<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups;
    private final SparkResource resource;
    private final BrokerDesc brokerDesc;
    private final long dbId;
    private final String loadLabel;
    private final long loadJobId;
    private final long transactionId;
    private EtlJobConfig etlJobConfig;
    private SparkLoadAppHandle sparkLoadAppHandle;

    public SparkLoadPendingTask(SparkLoadJob sparkLoadJob, Map<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> map, SparkResource sparkResource, BrokerDesc brokerDesc, LoadTask.Priority priority) {
        super(sparkLoadJob, LoadTask.TaskType.PENDING, priority);
        this.retryTime = 3;
        this.attachment = new SparkPendingTaskAttachment(this.signature);
        this.aggKeyToBrokerFileGroups = map;
        this.resource = sparkResource;
        this.brokerDesc = brokerDesc;
        this.dbId = sparkLoadJob.getDbId();
        this.loadJobId = sparkLoadJob.getId();
        this.loadLabel = sparkLoadJob.getLabel();
        this.transactionId = sparkLoadJob.getTransactionId();
        this.sparkLoadAppHandle = sparkLoadJob.getHandle();
        this.failMsg = new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL);
        toLowCaseForFileGroups();
    }

    void toLowCaseForFileGroups() {
        this.aggKeyToBrokerFileGroups.values().forEach(list -> {
            list.forEach(brokerFileGroup -> {
                brokerFileGroup.getColumnExprList().forEach(importColumnDesc -> {
                    importColumnDesc.setColumnName(importColumnDesc.getColumnName().toLowerCase(Locale.ROOT));
                });
            });
        });
    }

    @Override // org.apache.doris.load.loadv2.LoadTask
    void executeTask() throws UserException {
        LOG.info("begin to execute spark pending task. load job id: {}", Long.valueOf(this.loadJobId));
        submitEtlJob();
    }

    private void submitEtlJob() throws LoadException {
        SparkPendingTaskAttachment sparkPendingTaskAttachment = (SparkPendingTaskAttachment) this.attachment;
        this.etlJobConfig.outputPath = EtlJobConfig.getOutputPath(this.resource.getWorkingDir(), this.dbId, this.loadLabel, this.signature);
        sparkPendingTaskAttachment.setOutputPath(this.etlJobConfig.outputPath);
        new SparkEtlJobHandler().submitEtlJob(this.loadJobId, this.loadLabel, this.etlJobConfig, this.resource, this.brokerDesc, this.sparkLoadAppHandle, sparkPendingTaskAttachment);
        LOG.info("submit spark etl job success. load job id: {}, attachment: {}", Long.valueOf(this.loadJobId), sparkPendingTaskAttachment);
    }

    @Override // org.apache.doris.load.loadv2.LoadTask
    public void init() throws LoadException {
        createEtlJobConf();
    }

    private void createEtlJobConf() throws LoadException {
        EtlJobConfig.EtlTable etlTable;
        Database dbOrException = Env.getCurrentInternalCatalog().getDbOrException(this.dbId, l -> {
            return new LoadException("db does not exist. id: " + l);
        });
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        HashSet newHashSet = Sets.newHashSet();
        prepareTablePartitionInfos(dbOrException, newHashMap2, newHashSet);
        try {
            List<Table> tablesOnIdOrderOrThrowException = dbOrException.getTablesOnIdOrderOrThrowException(Lists.newArrayList(newHashSet));
            MetaLockUtils.readLockTables(tablesOnIdOrderOrThrowException);
            try {
                for (Map.Entry<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> entry : this.aggKeyToBrokerFileGroups.entrySet()) {
                    long tableId = entry.getKey().getTableId();
                    OlapTable olapTable = (OlapTable) dbOrException.getTableOrException(tableId, l2 -> {
                        return new LoadException("table does not exist. id: " + l2);
                    });
                    if (newHashMap.containsKey(Long.valueOf(tableId))) {
                        etlTable = (EtlJobConfig.EtlTable) newHashMap.get(Long.valueOf(tableId));
                    } else {
                        etlTable = new EtlJobConfig.EtlTable(createEtlIndexes(olapTable), createEtlPartitionInfo(olapTable, newHashMap2.get(Long.valueOf(tableId))));
                        newHashMap.put(Long.valueOf(tableId), etlTable);
                        TransactionState transactionState = Env.getCurrentGlobalTransactionMgr().getTransactionState(this.dbId, this.transactionId);
                        if (transactionState == null) {
                            throw new LoadException("txn does not exist. id: " + this.transactionId);
                        }
                        transactionState.addTableIndexes(olapTable);
                    }
                    Iterator<BrokerFileGroup> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        etlTable.addFileGroup(createEtlFileGroup(it.next(), newHashMap2.get(Long.valueOf(tableId)), dbOrException, olapTable));
                    }
                }
                String outputFilePattern = EtlJobConfig.getOutputFilePattern(this.loadLabel, EtlJobConfig.FilePatternVersion.V1);
                EtlJobConfig.EtlJobProperty etlJobProperty = new EtlJobConfig.EtlJobProperty();
                etlJobProperty.strictMode = ((LoadJob) this.callback).isStrictMode();
                etlJobProperty.timezone = ((LoadJob) this.callback).getTimeZone();
                this.etlJobConfig = new EtlJobConfig(newHashMap, outputFilePattern, this.loadLabel, etlJobProperty);
            } finally {
                MetaLockUtils.readUnlockTables(tablesOnIdOrderOrThrowException);
            }
        } catch (MetaNotFoundException e) {
            throw new LoadException(e.getMessage());
        }
    }

    private void prepareTablePartitionInfos(Database database, Map<Long, Set<Long>> map, Set<Long> set) throws LoadException {
        Set<Long> newHashSet;
        for (BrokerFileGroupAggInfo.FileGroupAggKey fileGroupAggKey : this.aggKeyToBrokerFileGroups.keySet()) {
            long tableId = fileGroupAggKey.getTableId();
            if (!set.contains(Long.valueOf(tableId))) {
                OlapTable olapTable = (OlapTable) database.getTableOrException(tableId, l -> {
                    return new LoadException("table does not exist. id: " + l);
                });
                olapTable.readLock();
                try {
                    if (map.containsKey(Long.valueOf(tableId))) {
                        newHashSet = map.get(Long.valueOf(tableId));
                    } else {
                        newHashSet = Sets.newHashSet();
                        map.put(Long.valueOf(tableId), newHashSet);
                    }
                    Set<Long> partitionIds = fileGroupAggKey.getPartitionIds();
                    if (partitionIds == null || partitionIds.isEmpty()) {
                        Iterator<Partition> it = olapTable.getPartitions().iterator();
                        while (it.hasNext()) {
                            newHashSet.add(Long.valueOf(it.next().getId()));
                        }
                        set.add(Long.valueOf(tableId));
                    } else {
                        newHashSet.addAll(partitionIds);
                    }
                } finally {
                    olapTable.readUnlock();
                }
            }
        }
    }

    private List<EtlJobConfig.EtlIndex> createEtlIndexes(OlapTable olapTable) throws LoadException {
        String str;
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<Long, List<Column>> entry : olapTable.getIndexIdToSchema().entrySet()) {
            long longValue = entry.getKey().longValue();
            int schemaHashByIndexId = olapTable.getSchemaHashByIndexId(Long.valueOf(longValue));
            boolean z = olapTable.getKeysTypeByIndexId(longValue).equals(KeysType.UNIQUE_KEYS) && olapTable.getTableProperty().getEnableUniqueKeyMergeOnWrite();
            ArrayList newArrayList2 = Lists.newArrayList();
            Iterator<Column> it = entry.getValue().iterator();
            while (it.hasNext()) {
                newArrayList2.add(createEtlColumn(it.next(), z));
            }
            DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
            if (defaultDistributionInfo.getType() != DistributionInfo.DistributionInfoType.HASH) {
                String str2 = "Unsupported distribution type. type: " + defaultDistributionInfo.getType().name();
                LOG.warn(str2);
                throw new LoadException(str2);
            }
            KeysType keysTypeByIndexId = olapTable.getKeysTypeByIndexId(longValue);
            switch (keysTypeByIndexId) {
                case DUP_KEYS:
                    str = "DUPLICATE";
                    break;
                case AGG_KEYS:
                    str = "AGGREGATE";
                    break;
                case UNIQUE_KEYS:
                    str = "UNIQUE";
                    break;
                default:
                    String str3 = "unknown keys type. type: " + keysTypeByIndexId.name();
                    LOG.warn(str3);
                    throw new LoadException(str3);
            }
            newArrayList.add(new EtlJobConfig.EtlIndex(longValue, newArrayList2, schemaHashByIndexId, str, longValue == olapTable.getBaseIndexId()));
        }
        return newArrayList;
    }

    private EtlJobConfig.EtlColumn createEtlColumn(Column column, boolean z) {
        String lowerCase = column.getName().toLowerCase(Locale.ROOT);
        PrimitiveType dataType = column.getDataType();
        String primitiveType = column.getDataType().toString();
        boolean isAllowNull = column.isAllowNull();
        boolean isKey = column.isKey();
        String str = null;
        if (column.getAggregationType() != null) {
            str = (!z || column.isKey()) ? column.getAggregationType().toString() : AggregateType.REPLACE.toSql();
        }
        String str2 = null;
        if (column.getDefaultValue() != null) {
            str2 = column.getDefaultValue();
        }
        if (column.isAllowNull() && column.getDefaultValue() == null) {
            str2 = StmtExecutor.NULL_VALUE_FOR_LOAD;
        }
        int i = 0;
        if (dataType.isStringType()) {
            i = column.getStrLen();
        }
        int i2 = 0;
        int i3 = 0;
        if (dataType.isDecimalV2Type() || dataType.isDecimalV3Type()) {
            i2 = column.getPrecision();
            i3 = column.getScale();
        }
        return new EtlJobConfig.EtlColumn(lowerCase, primitiveType, isAllowNull, isKey, str, str2, i, i2, i3);
    }

    private EtlJobConfig.EtlPartitionInfo createEtlPartitionInfo(OlapTable olapTable, Set<Long> set) throws LoadException {
        PartitionType type = olapTable.getPartitionInfo().getType();
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        if (type == PartitionType.RANGE) {
            RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
            Iterator<Column> it = rangePartitionInfo.getPartitionColumns().iterator();
            while (it.hasNext()) {
                newArrayList.add(it.next().getName());
            }
            for (Map.Entry<Long, PartitionItem> entry : rangePartitionInfo.getAllPartitionItemEntryList(true)) {
                long longValue = entry.getKey().longValue();
                if (set.contains(Long.valueOf(longValue))) {
                    Partition partition = olapTable.getPartition(longValue);
                    if (partition == null) {
                        throw new LoadException("partition does not exist. id: " + longValue);
                    }
                    int bucketNum = partition.getDistributionInfo().getBucketNum();
                    Range range = (Range) entry.getValue().getItems();
                    boolean isMaxValue = ((PartitionKey) range.upperEndpoint()).isMaxValue();
                    List<LiteralExpr> keys = ((PartitionKey) range.lowerEndpoint()).getKeys();
                    ArrayList newArrayList3 = Lists.newArrayList();
                    for (int i = 0; i < keys.size(); i++) {
                        newArrayList3.add(keys.get(i).getRealValue());
                    }
                    ArrayList newArrayList4 = Lists.newArrayList();
                    if (!isMaxValue) {
                        List<LiteralExpr> keys2 = ((PartitionKey) range.upperEndpoint()).getKeys();
                        for (int i2 = 0; i2 < keys2.size(); i2++) {
                            newArrayList4.add(keys2.get(i2).getRealValue());
                        }
                    }
                    newArrayList2.add(new EtlJobConfig.EtlPartition(longValue, newArrayList3, newArrayList4, isMaxValue, bucketNum));
                }
            }
        } else {
            if (type != PartitionType.UNPARTITIONED) {
                throw new LoadException("Spark Load does not support list partition yet");
            }
            Preconditions.checkState(set.size() == 1);
            for (Long l : set) {
                Partition partition2 = olapTable.getPartition(l.longValue());
                if (partition2 == null) {
                    throw new LoadException("partition does not exist. id: " + l);
                }
                newArrayList2.add(new EtlJobConfig.EtlPartition(l.longValue(), Lists.newArrayList(), Lists.newArrayList(), true, partition2.getDistributionInfo().getBucketNum()));
            }
        }
        ArrayList newArrayList5 = Lists.newArrayList();
        DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
        Preconditions.checkState(defaultDistributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH);
        Iterator<Column> it2 = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns().iterator();
        while (it2.hasNext()) {
            newArrayList5.add(it2.next().getName());
        }
        return new EtlJobConfig.EtlPartitionInfo(type.typeString, newArrayList, newArrayList5, newArrayList2);
    }

    private EtlJobConfig.EtlFileGroup createEtlFileGroup(BrokerFileGroup brokerFileGroup, Set<Long> set, Database database, OlapTable olapTable) throws LoadException {
        ArrayList<ImportColumnDesc> newArrayList = Lists.newArrayList(brokerFileGroup.getColumnExprList());
        TreeMap newTreeMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
        for (ImportColumnDesc importColumnDesc : newArrayList) {
            if (!importColumnDesc.isColumn()) {
                newTreeMap.put(importColumnDesc.getColumnName(), importColumnDesc.getExpr());
            }
        }
        try {
            Load.initColumns(olapTable, newArrayList, brokerFileGroup.getColumnToHadoopFunction());
            for (ImportColumnDesc importColumnDesc2 : Load.getSchemaChangeShadowColumnDesc(olapTable, newTreeMap)) {
                newArrayList.add(importColumnDesc2);
                newTreeMap.put(importColumnDesc2.getColumnName(), importColumnDesc2.getExpr());
            }
            if (brokerFileGroup.isNegative()) {
                for (Column column : olapTable.getBaseSchema()) {
                    if (!column.isKey() && column.getAggregationType() != AggregateType.SUM) {
                        throw new LoadException("Column is not SUM AggreateType. column:" + column.getName());
                    }
                }
            }
            List<String> fileFieldNames = brokerFileGroup.getFileFieldNames();
            if (fileFieldNames == null || fileFieldNames.isEmpty()) {
                fileFieldNames = Lists.newArrayList();
                Iterator<Column> it = olapTable.getBaseSchema().iterator();
                while (it.hasNext()) {
                    fileFieldNames.add(it.next().getName());
                }
            }
            Map<String, Pair<String, List<String>>> columnToHadoopFunction = brokerFileGroup.getColumnToHadoopFunction();
            HashMap newHashMap = Maps.newHashMap();
            if (columnToHadoopFunction != null) {
                for (Map.Entry<String, Pair<String, List<String>>> entry : columnToHadoopFunction.entrySet()) {
                    newHashMap.put(entry.getKey(), new EtlJobConfig.EtlColumnMapping((String) entry.getValue().first, (List) entry.getValue().second));
                }
            }
            for (ImportColumnDesc importColumnDesc3 : newArrayList) {
                if (!importColumnDesc3.isColumn() && !newHashMap.containsKey(importColumnDesc3.getColumnName())) {
                    newHashMap.put(importColumnDesc3.getColumnName(), new EtlJobConfig.EtlColumnMapping(importColumnDesc3.getExpr().toSql()));
                }
            }
            List<Long> partitionIds = brokerFileGroup.getPartitionIds();
            if (partitionIds == null || partitionIds.isEmpty()) {
                partitionIds = Lists.newArrayList(set);
            }
            String sql = brokerFileGroup.getWhereExpr() != null ? brokerFileGroup.getWhereExpr().toSql() : "";
            String str = "";
            HashMap newHashMap2 = Maps.newHashMap();
            if (brokerFileGroup.isLoadFromTable()) {
                HiveTable hiveTable = (HiveTable) database.getTableOrException(brokerFileGroup.getSrcTableId(), l -> {
                    return new LoadException("table does not exist. id: " + l);
                });
                str = hiveTable.getHiveDbTable();
                newHashMap2.putAll(hiveTable.getHiveProperties());
            }
            for (Column column2 : olapTable.getBaseSchema()) {
                String name = column2.getName();
                PrimitiveType dataType = column2.getDataType();
                Expr expr = (Expr) newTreeMap.get(name);
                if (dataType == PrimitiveType.HLL) {
                    checkHllMapping(name, expr);
                }
                if (dataType == PrimitiveType.BITMAP) {
                    checkBitmapMapping(name, expr, brokerFileGroup.isLoadFromTable());
                }
            }
            return brokerFileGroup.isLoadFromTable() ? new EtlJobConfig.EtlFileGroup(EtlJobConfig.SourceType.HIVE, str, newHashMap2, brokerFileGroup.isNegative(), newHashMap, sql, partitionIds) : new EtlJobConfig.EtlFileGroup(EtlJobConfig.SourceType.FILE, brokerFileGroup.getFilePaths(), fileFieldNames, brokerFileGroup.getColumnNamesFromPath(), brokerFileGroup.getColumnSeparator(), brokerFileGroup.getLineDelimiter(), brokerFileGroup.isNegative(), brokerFileGroup.getFileFormat(), newHashMap, sql, partitionIds);
        } catch (UserException e) {
            throw new LoadException(e.getMessage());
        }
    }

    private void checkHllMapping(String str, Expr expr) throws LoadException {
        if (expr == null) {
            throw new LoadException("HLL column func is not assigned. column:" + str);
        }
        String str2 = "HLL column must use hll function, like " + str + "=hll_hash(xxx) or " + str + "=hll_empty()";
        if (!(expr instanceof FunctionCallExpr)) {
            throw new LoadException(str2);
        }
        String function = ((FunctionCallExpr) expr).getFnName().getFunction();
        if (!function.equalsIgnoreCase(FunctionSet.HLL_HASH) && !function.equalsIgnoreCase("hll_empty")) {
            throw new LoadException(str2);
        }
    }

    private void checkBitmapMapping(String str, Expr expr, boolean z) throws LoadException {
        if (expr == null) {
            throw new LoadException("BITMAP column func is not assigned. column:" + str);
        }
        String str2 = "BITMAP column must use bitmap function, like " + str + "=to_bitmap(xxx) or " + str + "=bitmap_hash() or " + str + "=bitmap_dict()";
        if (!(expr instanceof FunctionCallExpr)) {
            throw new LoadException(str2);
        }
        String function = ((FunctionCallExpr) expr).getFnName().getFunction();
        if (!function.equalsIgnoreCase(FunctionSet.TO_BITMAP) && !function.equalsIgnoreCase(FunctionSet.BITMAP_HASH) && !function.equalsIgnoreCase("bitmap_dict") && !function.equalsIgnoreCase("binary_bitmap")) {
            throw new LoadException(str2);
        }
        if (function.equalsIgnoreCase("bitmap_dict") && !z) {
            throw new LoadException("Bitmap global dict should load data from hive table");
        }
    }
}
