package org.apache.doris.load.routineload;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.SetUserPropertyVar;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.KafkaUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
import org.apache.doris.nereids.trees.expressions.functions.AggStateFunctionBuilder;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/routineload/KafkaRoutineLoadJob.class */
public class KafkaRoutineLoadJob extends RoutineLoadJob {
    private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class);
    public static final String KAFKA_FILE_CATALOG = "kafka";
    public static final String PROP_GROUP_ID = "group.id";
    private String brokerList;
    private String topic;
    private List<Integer> customKafkaPartitions;
    private List<Integer> currentKafkaPartitions;
    private String kafkaDefaultOffSet;
    private Map<String, String> customProperties;
    private Map<String, String> convertedCustomProperties;
    private Map<Integer, Long> cachedPartitionWithLatestOffsets;
    private List<Integer> newCurrentKafkaPartition;

    public KafkaRoutineLoadJob() {
        super(-1L, LoadDataSourceType.KAFKA);
        this.customKafkaPartitions = Lists.newArrayList();
        this.currentKafkaPartitions = Lists.newArrayList();
        this.kafkaDefaultOffSet = "";
        this.customProperties = Maps.newHashMap();
        this.convertedCustomProperties = Maps.newHashMap();
        this.cachedPartitionWithLatestOffsets = Maps.newConcurrentMap();
        this.newCurrentKafkaPartition = Lists.newArrayList();
    }

    public KafkaRoutineLoadJob(Long l, String str, String str2, long j, long j2, String str3, String str4, UserIdentity userIdentity) {
        super(l, str, str2, j, j2, LoadDataSourceType.KAFKA, userIdentity);
        this.customKafkaPartitions = Lists.newArrayList();
        this.currentKafkaPartitions = Lists.newArrayList();
        this.kafkaDefaultOffSet = "";
        this.customProperties = Maps.newHashMap();
        this.convertedCustomProperties = Maps.newHashMap();
        this.cachedPartitionWithLatestOffsets = Maps.newConcurrentMap();
        this.newCurrentKafkaPartition = Lists.newArrayList();
        this.brokerList = str3;
        this.topic = str4;
        this.progress = new KafkaProgress();
    }

    public KafkaRoutineLoadJob(Long l, String str, String str2, long j, String str3, String str4, UserIdentity userIdentity, boolean z) {
        super(l, str, str2, j, LoadDataSourceType.KAFKA, userIdentity);
        this.customKafkaPartitions = Lists.newArrayList();
        this.currentKafkaPartitions = Lists.newArrayList();
        this.kafkaDefaultOffSet = "";
        this.customProperties = Maps.newHashMap();
        this.convertedCustomProperties = Maps.newHashMap();
        this.cachedPartitionWithLatestOffsets = Maps.newConcurrentMap();
        this.newCurrentKafkaPartition = Lists.newArrayList();
        this.brokerList = str3;
        this.topic = str4;
        this.progress = new KafkaProgress();
        setMultiTable(z);
    }

    public String getTopic() {
        return this.topic;
    }

    public String getBrokerList() {
        return this.brokerList;
    }

    public Map<String, String> getConvertedCustomProperties() {
        return this.convertedCustomProperties;
    }

    private boolean isOffsetForTimes() {
        return TimeUtils.timeStringToLong(this.kafkaDefaultOffSet) != -1;
    }

    private long convertedDefaultOffsetToTimestamp() {
        return TimeUtils.timeStringToLong(this.kafkaDefaultOffSet, TimeUtils.getOrSystemTimeZone(getTimezone()));
    }

    private long convertedDefaultOffsetToLong() {
        if (this.kafkaDefaultOffSet.isEmpty()) {
            return -1L;
        }
        if (isOffsetForTimes()) {
            return convertedDefaultOffsetToTimestamp();
        }
        if (this.kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
            return -2L;
        }
        return this.kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END) ? -1L : -1L;
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    public void prepare() throws UserException {
        super.prepare();
        convertCustomProperties(true);
    }

    private void convertCustomProperties(boolean z) throws DdlException {
        if (this.customProperties.isEmpty()) {
            return;
        }
        if (z || this.convertedCustomProperties.isEmpty()) {
            if (z) {
                this.convertedCustomProperties.clear();
            }
            SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr();
            for (Map.Entry<String, String> entry : this.customProperties.entrySet()) {
                if (entry.getValue().startsWith("FILE:")) {
                    SmallFileMgr.SmallFile smallFile = smallFileMgr.getSmallFile(this.dbId, KAFKA_FILE_CATALOG, entry.getValue().substring(entry.getValue().indexOf(ClusterNamespace.CLUSTER_DELIMITER) + 1), true);
                    this.convertedCustomProperties.put(entry.getKey(), "FILE:" + smallFile.id + ClusterNamespace.CLUSTER_DELIMITER + smallFile.md5);
                } else {
                    this.convertedCustomProperties.put(entry.getKey(), entry.getValue());
                }
            }
            if (this.convertedCustomProperties.containsKey(KafkaConfiguration.KAFKA_ORIGIN_DEFAULT_OFFSETS.getName())) {
                this.kafkaDefaultOffSet = this.convertedCustomProperties.remove(KafkaConfiguration.KAFKA_ORIGIN_DEFAULT_OFFSETS.getName());
            } else if (this.convertedCustomProperties.containsKey(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName())) {
                this.kafkaDefaultOffSet = this.convertedCustomProperties.remove(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName());
            }
        }
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    public void divideRoutineLoadJob(int i) throws UserException {
        ArrayList arrayList = new ArrayList();
        writeLock();
        try {
            if (this.state == RoutineLoadJob.JobState.NEED_SCHEDULE) {
                for (int i2 = 0; i2 < i; i2++) {
                    HashMap newHashMap = Maps.newHashMap();
                    int i3 = i2;
                    while (i3 < this.currentKafkaPartitions.size()) {
                        int intValue = this.currentKafkaPartitions.get(i3).intValue();
                        newHashMap.put(Integer.valueOf(intValue), ((KafkaProgress) this.progress).getOffsetByPartition(intValue));
                        i3 += i;
                    }
                    KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), this.id, this.clusterName, this.maxBatchIntervalS * 2 * 1000, newHashMap, isMultiTable());
                    this.routineLoadTaskInfoList.add(kafkaTaskInfo);
                    arrayList.add(kafkaTaskInfo);
                }
                if (arrayList.size() != 0) {
                    unprotectUpdateState(RoutineLoadJob.JobState.RUNNING, null, false);
                }
            } else {
                LOG.debug("Ignore to divide routine load job while job state {}", this.state);
            }
            Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTasksInQueue(arrayList);
            writeUnlock();
        } catch (Throwable th) {
            writeUnlock();
            throw th;
        }
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    public int calculateCurrentConcurrentTaskNum() {
        int size = this.currentKafkaPartitions.size();
        if (this.desireTaskConcurrentNum == 0) {
            this.desireTaskConcurrentNum = Config.max_routine_load_task_concurrent_num;
        }
        LOG.debug("current concurrent task number is min(partition num: {}, desire task concurrent num: {} config: {})", Integer.valueOf(size), Integer.valueOf(this.desireTaskConcurrentNum), Integer.valueOf(Config.max_routine_load_task_concurrent_num));
        this.currentTaskConcurrentNum = Math.min(size, Math.min(this.desireTaskConcurrentNum, Config.max_routine_load_task_concurrent_num));
        return this.currentTaskConcurrentNum;
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rLTaskTxnCommitAttachment, TransactionState transactionState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) {
        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
            return true;
        }
        LOG.debug("no need to update the progress of kafka routine load. txn status: {}, txnStatusChangeReason: {}, task: {}, job: {}", transactionState.getTransactionStatus(), txnStatusChangeReason, DebugUtil.printId(rLTaskTxnCommitAttachment.getTaskId()), Long.valueOf(this.id));
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    public void updateProgress(RLTaskTxnCommitAttachment rLTaskTxnCommitAttachment) throws UserException {
        super.updateProgress(rLTaskTxnCommitAttachment);
        this.progress.update(rLTaskTxnCommitAttachment);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    public void replayUpdateProgress(RLTaskTxnCommitAttachment rLTaskTxnCommitAttachment) {
        super.replayUpdateProgress(rLTaskTxnCommitAttachment);
        this.progress.update(rLTaskTxnCommitAttachment);
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) {
        KafkaTaskInfo kafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo;
        KafkaTaskInfo kafkaTaskInfo2 = new KafkaTaskInfo(kafkaTaskInfo, ((KafkaProgress) this.progress).getPartitionIdToOffset(kafkaTaskInfo.getPartitions()), isMultiTable());
        this.routineLoadTaskInfoList.remove(routineLoadTaskInfo);
        this.routineLoadTaskInfoList.add(kafkaTaskInfo2);
        return kafkaTaskInfo2;
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected void unprotectUpdateProgress() throws UserException {
        updateNewPartitionProgress();
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected void preCheckNeedSchedule() throws UserException {
        if (this.state == RoutineLoadJob.JobState.RUNNING || this.state == RoutineLoadJob.JobState.NEED_SCHEDULE) {
            if (this.customKafkaPartitions == null || this.customKafkaPartitions.isEmpty()) {
                updateKafkaPartitions();
            }
        }
    }

    private void updateKafkaPartitions() throws UserException {
        try {
            this.newCurrentKafkaPartition = getAllKafkaPartitions();
        } catch (Exception e) {
            LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(this.id)).add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage()).build(), e);
            if (this.state == RoutineLoadJob.JobState.NEED_SCHEDULE) {
                unprotectUpdateState(RoutineLoadJob.JobState.PAUSED, new ErrorReason(InternalErrorCode.PARTITIONS_ERR, "Job failed to fetch all current partition with error " + e.getMessage()), false);
            }
        }
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected boolean unprotectNeedReschedule() throws UserException {
        if (this.state != RoutineLoadJob.JobState.RUNNING && this.state != RoutineLoadJob.JobState.NEED_SCHEDULE) {
            if (this.state == RoutineLoadJob.JobState.PAUSED) {
                return ScheduleRule.isNeedAutoSchedule(this);
            }
            return false;
        }
        if (CollectionUtils.isNotEmpty(this.customKafkaPartitions)) {
            this.currentKafkaPartitions = this.customKafkaPartitions;
            return false;
        }
        Preconditions.checkNotNull(this.newCurrentKafkaPartition);
        if (!new HashSet(this.currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) {
            this.currentKafkaPartitions = this.newCurrentKafkaPartition;
            if (!LOG.isDebugEnabled()) {
                return true;
            }
            LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(this.id)).add("current_kafka_partitions", Joiner.on(",").join(this.currentKafkaPartitions)).add("msg", "current kafka partitions has been change").build());
            return true;
        }
        if (this.currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) {
            this.currentKafkaPartitions = this.newCurrentKafkaPartition;
            if (!LOG.isDebugEnabled()) {
                return true;
            }
            LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(this.id)).add("current_kafka_partitions", Joiner.on(",").join(this.currentKafkaPartitions)).add("msg", "current kafka partitions has been change").build());
            return true;
        }
        Iterator<Integer> it = this.currentKafkaPartitions.iterator();
        while (it.hasNext()) {
            if (!((KafkaProgress) this.progress).containsPartition(it.next())) {
                return true;
            }
        }
        return false;
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected String getStatistic() {
        return new GsonBuilder().disableHtmlEscaping().create().toJson(this.jobStatistic.summary());
    }

    private List<Integer> getAllKafkaPartitions() throws UserException {
        convertCustomProperties(false);
        return KafkaUtil.getAllKafkaPartitions(this.brokerList, this.topic, this.convertedCustomProperties);
    }

    public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt createRoutineLoadStmt) throws UserException {
        KafkaRoutineLoadJob kafkaRoutineLoadJob;
        Database dbOrDdlException = Env.getCurrentInternalCatalog().getDbOrDdlException(createRoutineLoadStmt.getDBName());
        long nextId = Env.getCurrentEnv().getNextId();
        KafkaDataSourceProperties kafkaDataSourceProperties = (KafkaDataSourceProperties) createRoutineLoadStmt.getDataSourceProperties();
        if (kafkaDataSourceProperties.isMultiTable()) {
            kafkaRoutineLoadJob = new KafkaRoutineLoadJob(Long.valueOf(nextId), createRoutineLoadStmt.getName(), dbOrDdlException.getClusterName(), dbOrDdlException.getId(), kafkaDataSourceProperties.getBrokerList(), kafkaDataSourceProperties.getTopic(), createRoutineLoadStmt.getUserInfo(), true);
        } else {
            OlapTable olapTableOrDdlException = dbOrDdlException.getOlapTableOrDdlException(createRoutineLoadStmt.getTableName());
            checkMeta(olapTableOrDdlException, createRoutineLoadStmt.getRoutineLoadDesc());
            kafkaRoutineLoadJob = new KafkaRoutineLoadJob(Long.valueOf(nextId), createRoutineLoadStmt.getName(), dbOrDdlException.getClusterName(), dbOrDdlException.getId(), olapTableOrDdlException.getId(), kafkaDataSourceProperties.getBrokerList(), kafkaDataSourceProperties.getTopic(), createRoutineLoadStmt.getUserInfo());
        }
        kafkaRoutineLoadJob.setOptional(createRoutineLoadStmt);
        kafkaRoutineLoadJob.checkCustomProperties();
        kafkaRoutineLoadJob.checkCustomPartition();
        return kafkaRoutineLoadJob;
    }

    private void checkCustomPartition() throws UserException {
        if (this.customKafkaPartitions.isEmpty()) {
            return;
        }
        List<Integer> allKafkaPartitions = getAllKafkaPartitions();
        for (Integer num : this.customKafkaPartitions) {
            if (!allKafkaPartitions.contains(num)) {
                throw new LoadException("there is a custom kafka partition " + num + " which is invalid for topic " + this.topic);
            }
        }
    }

    private void checkCustomProperties() throws DdlException {
        SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr();
        for (Map.Entry<String, String> entry : this.customProperties.entrySet()) {
            if (entry.getValue().startsWith("FILE:")) {
                String substring = entry.getValue().substring(entry.getValue().indexOf(ClusterNamespace.CLUSTER_DELIMITER) + 1);
                if (!smallFileMgr.containsFile(this.dbId, KAFKA_FILE_CATALOG, substring)) {
                    throw new DdlException("File " + substring + " does not exist in db " + this.dbId + " with catalog: " + KAFKA_FILE_CATALOG);
                }
            }
        }
    }

    private void updateNewPartitionProgress() throws UserException {
        try {
            for (Integer num : this.currentKafkaPartitions) {
                if (!((KafkaProgress) this.progress).containsPartition(num)) {
                    ArrayList newArrayList = Lists.newArrayList();
                    newArrayList.add(num);
                    List<Pair<Integer, Long>> newPartitionOffsetsFromDefaultOffset = getNewPartitionOffsetsFromDefaultOffset(newArrayList);
                    Preconditions.checkState(newPartitionOffsetsFromDefaultOffset.size() == 1);
                    for (Pair<Integer, Long> pair : newPartitionOffsetsFromDefaultOffset) {
                        ((KafkaProgress) this.progress).addPartitionOffset(pair);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(this.id)).add("kafka_partition_id", pair.first).add("begin_offset", pair.second).add("msg", "The new partition has been added in job"));
                        }
                    }
                }
            }
        } catch (UserException e) {
            unprotectUpdateState(RoutineLoadJob.JobState.PAUSED, new ErrorReason(InternalErrorCode.PARTITIONS_ERR, e.getMessage()), false);
            throw e;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private List<Pair<Integer, Long>> getNewPartitionOffsetsFromDefaultOffset(List<Integer> list) throws UserException {
        List newArrayList = Lists.newArrayList();
        long convertedDefaultOffsetToLong = convertedDefaultOffsetToLong();
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            newArrayList.add(Pair.of(it.next(), Long.valueOf(convertedDefaultOffsetToLong)));
        }
        if (isOffsetForTimes()) {
            try {
                newArrayList = KafkaUtil.getOffsetsForTimes(this.brokerList, this.topic, this.convertedCustomProperties, newArrayList);
            } catch (LoadException e) {
                LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, Long.valueOf(this.id)).add("partition:timestamp", Joiner.on(",").join(newArrayList)).add("error_msg", "Job failed to fetch current offsets from times with error " + e.getMessage()).build(), e);
                throw new UserException(e);
            }
        }
        return newArrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    public void setOptional(CreateRoutineLoadStmt createRoutineLoadStmt) throws UserException {
        super.setOptional(createRoutineLoadStmt);
        KafkaDataSourceProperties kafkaDataSourceProperties = (KafkaDataSourceProperties) createRoutineLoadStmt.getDataSourceProperties();
        if (CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets())) {
            setCustomKafkaPartitions(kafkaDataSourceProperties);
        }
        if (MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) {
            setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
        }
        this.customProperties.putIfAbsent(PROP_GROUP_ID, this.name + AggStateFunctionBuilder.COMBINATOR_LINKER + UUID.randomUUID());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void setCustomKafkaPartitions(KafkaDataSourceProperties kafkaDataSourceProperties) throws LoadException {
        List<Pair<Integer, Long>> kafkaPartitionOffsets = kafkaDataSourceProperties.getKafkaPartitionOffsets();
        if (kafkaDataSourceProperties.isOffsetsForTimes()) {
            kafkaPartitionOffsets = KafkaUtil.getOffsetsForTimes(kafkaDataSourceProperties.getBrokerList(), kafkaDataSourceProperties.getTopic(), this.convertedCustomProperties, kafkaDataSourceProperties.getKafkaPartitionOffsets());
        }
        for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
            this.customKafkaPartitions.add(pair.first);
            ((KafkaProgress) this.progress).addPartitionOffset(pair);
        }
    }

    private void setCustomKafkaProperties(Map<String, String> map) {
        this.customProperties = map;
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected String dataSourcePropertiesJsonToString() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("brokerList", this.brokerList);
        newHashMap.put("topic", this.topic);
        ArrayList newArrayList = Lists.newArrayList(this.currentKafkaPartitions);
        Collections.sort(newArrayList);
        newHashMap.put("currentKafkaPartitions", Joiner.on(",").join(newArrayList));
        return new GsonBuilder().disableHtmlEscaping().create().toJson(newHashMap);
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected String customPropertiesJsonToString() {
        return new GsonBuilder().disableHtmlEscaping().create().toJson(this.customProperties);
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected Map<String, String> getDataSourceProperties() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("kafka_broker_list", this.brokerList);
        newHashMap.put("kafka_topic", this.topic);
        return newHashMap;
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected Map<String, String> getCustomProperties() {
        HashMap hashMap = new HashMap();
        this.customProperties.forEach((str, str2) -> {
        });
        return hashMap;
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    public void write(DataOutput dataOutput) throws IOException {
        super.write(dataOutput);
        Text.writeString(dataOutput, this.brokerList);
        Text.writeString(dataOutput, this.topic);
        dataOutput.writeInt(this.customKafkaPartitions.size());
        Iterator<Integer> it = this.customKafkaPartitions.iterator();
        while (it.hasNext()) {
            dataOutput.writeInt(it.next().intValue());
        }
        dataOutput.writeInt(this.customProperties.size());
        for (Map.Entry<String, String> entry : this.customProperties.entrySet()) {
            Text.writeString(dataOutput, "property." + entry.getKey());
            Text.writeString(dataOutput, entry.getValue());
        }
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    public void readFields(DataInput dataInput) throws IOException {
        super.readFields(dataInput);
        this.brokerList = Text.readString(dataInput);
        this.topic = Text.readString(dataInput);
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            this.customKafkaPartitions.add(Integer.valueOf(dataInput.readInt()));
        }
        int readInt2 = dataInput.readInt();
        for (int i2 = 0; i2 < readInt2; i2++) {
            String readString = Text.readString(dataInput);
            String readString2 = Text.readString(dataInput);
            if (readString.startsWith("property.")) {
                this.customProperties.put(readString.substring(readString.indexOf(SetUserPropertyVar.DOT_SEPARATOR) + 1), readString2);
            }
        }
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    public void modifyProperties(AlterRoutineLoadStmt alterRoutineLoadStmt) throws UserException {
        Map<String, String> analyzedJobProperties = alterRoutineLoadStmt.getAnalyzedJobProperties();
        KafkaDataSourceProperties kafkaDataSourceProperties = (KafkaDataSourceProperties) alterRoutineLoadStmt.getDataSourceProperties();
        if (null != kafkaDataSourceProperties && kafkaDataSourceProperties.isOffsetsForTimes()) {
            convertTimestampToOffset(kafkaDataSourceProperties);
        }
        writeLock();
        try {
            if (getState() != RoutineLoadJob.JobState.PAUSED) {
                throw new DdlException("Only supports modification of PAUSED jobs");
            }
            modifyPropertiesInternal(analyzedJobProperties, kafkaDataSourceProperties);
            Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(new AlterRoutineLoadJobOperationLog(this.id, analyzedJobProperties, kafkaDataSourceProperties));
            writeUnlock();
        } catch (Throwable th) {
            writeUnlock();
            throw th;
        }
    }

    private void convertTimestampToOffset(KafkaDataSourceProperties kafkaDataSourceProperties) throws UserException {
        List<Pair<Integer, Long>> kafkaPartitionOffsets = kafkaDataSourceProperties.getKafkaPartitionOffsets();
        if (kafkaPartitionOffsets.isEmpty()) {
            return;
        }
        kafkaDataSourceProperties.setKafkaPartitionOffsets(KafkaUtil.getOffsetsForTimes(this.brokerList, this.topic, this.convertedCustomProperties, kafkaPartitionOffsets));
    }

    private void modifyPropertiesInternal(Map<String, String> map, KafkaDataSourceProperties kafkaDataSourceProperties) throws DdlException {
        if (null != kafkaDataSourceProperties) {
            List<Pair<Integer, Long>> newArrayList = Lists.newArrayList();
            Map<String, String> newHashMap = Maps.newHashMap();
            if (MapUtils.isNotEmpty(kafkaDataSourceProperties.getOriginalDataSourceProperties())) {
                newArrayList = kafkaDataSourceProperties.getKafkaPartitionOffsets();
                newHashMap = kafkaDataSourceProperties.getCustomKafkaProperties();
            }
            if (!newArrayList.isEmpty()) {
                ((KafkaProgress) this.progress).modifyOffset(newArrayList);
            }
            if (!newHashMap.isEmpty()) {
                this.customProperties.putAll(newHashMap);
                convertCustomProperties(true);
            }
            if (!Strings.isNullOrEmpty(kafkaDataSourceProperties.getBrokerList())) {
                this.brokerList = kafkaDataSourceProperties.getBrokerList();
            }
            if (!Strings.isNullOrEmpty(kafkaDataSourceProperties.getTopic())) {
                this.topic = kafkaDataSourceProperties.getTopic();
            }
        }
        if (!map.isEmpty()) {
            HashMap newHashMap2 = Maps.newHashMap(map);
            modifyCommonJobProperties(newHashMap2);
            this.jobProperties.putAll(newHashMap2);
            if (map.containsKey("partial_columns")) {
                this.isPartialUpdate = BooleanUtils.toBoolean(map.get("partial_columns"));
            }
        }
        LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}", Long.valueOf(this.id), map, kafkaDataSourceProperties);
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    public void replayModifyProperties(AlterRoutineLoadJobOperationLog alterRoutineLoadJobOperationLog) {
        try {
            modifyPropertiesInternal(alterRoutineLoadJobOperationLog.getJobProperties(), (KafkaDataSourceProperties) alterRoutineLoadJobOperationLog.getDataSourceProperties());
        } catch (DdlException e) {
            LOG.error("failed to replay modify kafka routine load job: {}", Long.valueOf(this.id), e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public boolean hasMoreDataToConsume(UUID uuid, Map<Integer, Long> map) {
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            if (this.cachedPartitionWithLatestOffsets.containsKey(entry.getKey()) && entry.getValue().longValue() < this.cachedPartitionWithLatestOffsets.get(entry.getKey()).longValue()) {
                LOG.debug("has more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}", map, this.cachedPartitionWithLatestOffsets, uuid, Long.valueOf(this.id));
                return true;
            }
        }
        try {
            for (Pair<Integer, Long> pair : KafkaUtil.getLatestOffsets(this.id, uuid, getBrokerList(), getTopic(), getConvertedCustomProperties(), Lists.newArrayList(map.keySet()))) {
                this.cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
            }
            for (Map.Entry<Integer, Long> entry2 : map.entrySet()) {
                if (this.cachedPartitionWithLatestOffsets.containsKey(entry2.getKey()) && entry2.getValue().longValue() < this.cachedPartitionWithLatestOffsets.get(entry2.getKey()).longValue()) {
                    LOG.debug("has more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}", map, this.cachedPartitionWithLatestOffsets, uuid, Long.valueOf(this.id));
                    return true;
                }
            }
            LOG.debug("no more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}", map, this.cachedPartitionWithLatestOffsets, uuid, Long.valueOf(this.id));
            return false;
        } catch (Exception e) {
            LOG.warn("failed to get latest partition offset. {}", e.getMessage(), e);
            return false;
        }
    }

    @Override // org.apache.doris.load.routineload.RoutineLoadJob
    protected String getLag() {
        return new Gson().toJson(((KafkaProgress) this.progress).getLag(this.cachedPartitionWithLatestOffsets));
    }

    @Override // org.apache.doris.task.LoadTaskInfo
    public TFileCompressType getCompressType() {
        return TFileCompressType.PLAIN;
    }

    @Override // org.apache.doris.task.LoadTaskInfo
    public double getMaxFilterRatio() {
        return this.maxFilterRatio;
    }
}
