package org.apache.doris.load.sync.canal;

import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SetUserPropertyVar;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.SqlBlockUtil;
import org.apache.doris.load.sync.SyncChannel;
import org.apache.doris.load.sync.SyncChannelCallback;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.model.Data;
import org.apache.doris.nereids.trees.expressions.functions.AggStateFunctionBuilder;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.SyncTask;
import org.apache.doris.task.SyncTaskPool;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

/* loaded from: input_file:org/apache/doris/load/sync/canal/CanalSyncChannel.class */
public class CanalSyncChannel extends SyncChannel {
    private static final Logger LOG = LogManager.getLogger(CanalSyncChannel.class);
    private static final String DELETE_COLUMN = "_delete_sign_";
    private static final String DELETE_CONDITION = "_delete_sign_=1";
    private static final String NULL_VALUE_FOR_LOAD = "\\N";
    private final int index;
    private long timeoutSecond;
    private long lastBatchId;
    private Data<InternalService.PDataRow> batchBuffer;
    private InsertStreamTxnExecutor txnExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.doris.load.sync.canal.CanalSyncChannel$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/doris/load/sync/canal/CanalSyncChannel$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType = new int[CanalEntry.EventType.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.DELETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.INSERT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/doris/load/sync/canal/CanalSyncChannel$EOFTask.class */
    private static final class EOFTask extends SyncTask {
        public EOFTask(long j, int i, SyncChannelCallback syncChannelCallback) {
            super(j, i, syncChannelCallback);
        }

        @Override // org.apache.doris.task.SyncTask
        public void exec() throws Exception {
            this.callback.onFinished(this.signature);
        }
    }

    /* loaded from: input_file:org/apache/doris/load/sync/canal/CanalSyncChannel$SendTask.class */
    private static final class SendTask extends SyncTask {
        private final InsertStreamTxnExecutor executor;
        private final Data<InternalService.PDataRow> rows;

        public SendTask(long j, int i, SyncChannelCallback syncChannelCallback, Data<InternalService.PDataRow> data, InsertStreamTxnExecutor insertStreamTxnExecutor) {
            super(j, i, syncChannelCallback);
            this.executor = insertStreamTxnExecutor;
            this.rows = data;
        }

        @Override // org.apache.doris.task.SyncTask
        public void exec() throws Exception {
            this.executor.getTxnEntry().setDataToSend(this.rows.getDatas());
            this.executor.sendData();
        }
    }

    public CanalSyncChannel(long j, SyncJob syncJob, Database database, OlapTable olapTable, List<String> list, String str, String str2) {
        super(j, syncJob, database, olapTable, list, str, str2);
        this.index = SyncTaskPool.getNextIndex();
        this.batchBuffer = new Data<>();
        this.lastBatchId = -1L;
        this.timeoutSecond = -1L;
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public void beginTxn(long j) throws UserException, TException, TimeoutException, InterruptedException, ExecutionException {
        if (isTxnBegin()) {
            return;
        }
        String str = "label_job" + this.jobId + "_channel" + this.id + "_db" + this.db.getId() + "_tbl" + this.tbl.getId() + "_batch" + j + AggStateFunctionBuilder.COMBINATOR_LINKER + System.currentTimeMillis();
        String str2 = Joiner.on(",").join(this.columns) + "," + DELETE_COLUMN;
        GlobalTransactionMgr currentGlobalTransactionMgr = Env.getCurrentGlobalTransactionMgr();
        DatabaseTransactionMgr databaseTransactionMgr = currentGlobalTransactionMgr.getDatabaseTransactionMgr(this.db.getId());
        long transactionQuotaSize = this.db.getTransactionQuotaSize();
        if (databaseTransactionMgr.getRunningTxnNums() >= transactionQuotaSize) {
            String str3 = "current running txns on db " + this.db.getId() + " is " + databaseTransactionMgr.getRunningTxnNums() + ", larger than limit " + transactionQuotaSize;
            LOG.warn(str3);
            throw new BeginTransactionException(str3);
        }
        TransactionEntry txnEntry = this.txnExecutor.getTxnEntry();
        TTxnParams txnConf = txnEntry.getTxnConf();
        TStreamLoadPutRequest tStreamLoadPutRequest = null;
        try {
            long beginTransaction = currentGlobalTransactionMgr.beginTransaction(this.db.getId(), Lists.newArrayList(new Long[]{Long.valueOf(this.tbl.getId())}), str, new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), TransactionState.LoadJobSourceType.INSERT_STREAMING, this.timeoutSecond);
            String acquireToken = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
            tStreamLoadPutRequest = new TStreamLoadPutRequest().setTxnId(beginTransaction).setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN).setThriftRpcTimeoutMs(5000L).setLoadId(this.txnExecutor.getLoadId()).setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION).setColumns(str2);
            txnConf.setTxnId(beginTransaction).setToken(acquireToken);
            txnEntry.setLabel(str);
            this.txnExecutor.setTxnId(beginTransaction);
        } catch (AnalysisException | BeginTransactionException e) {
            LOG.warn("encounter an error when beginning txn in channel {}, table: {}", Long.valueOf(this.id), this.targetTable);
            throw e;
        } catch (DuplicatedRequestException e2) {
            LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}", Long.valueOf(this.id), e2.getDuplicatedRequestId(), Long.valueOf(e2.getTxnId()), this.targetTable);
            this.txnExecutor.setTxnId(e2.getTxnId());
        } catch (LabelAlreadyUsedException e3) {
            LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", Long.valueOf(this.id), str, this.targetTable, Long.valueOf(j));
            return;
        } catch (UserException e4) {
            LOG.warn("encounter an error when creating plan in channel {}, table: {}", Long.valueOf(this.id), this.targetTable);
            throw e4;
        }
        try {
            if (this.txnExecutor.getTxnId() != -1) {
                this.txnExecutor.beginTransaction(tStreamLoadPutRequest);
                LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", Long.valueOf(this.id), this.targetTable, str, Long.valueOf(this.txnExecutor.getTxnId()));
            }
        } catch (TException e5) {
            LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", Long.valueOf(this.id), this.targetTable, Long.valueOf(this.txnExecutor.getTxnId()), e5.getMessage());
            throw e5;
        } catch (InterruptedException | ExecutionException | TimeoutException e6) {
            LOG.warn("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}", Long.valueOf(this.id), this.targetTable, Long.valueOf(this.txnExecutor.getTxnId()), e6.getMessage());
            throw e6;
        }
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public void abortTxn(String str) throws TException, TimeoutException, InterruptedException, ExecutionException {
        try {
            if (!isTxnBegin()) {
                LOG.warn("No transaction to abort in channel {}, table: {}", Long.valueOf(this.id), this.targetTable);
                return;
            }
            try {
                this.txnExecutor.abortTransaction();
                LOG.info("abort txn in channel {}, table: {}, txn id: {}, last batch: {}, reason: {}", Long.valueOf(this.id), this.targetTable, Long.valueOf(this.txnExecutor.getTxnId()), Long.valueOf(this.lastBatchId), str);
                this.batchBuffer = new Data<>();
                updateBatchId(-1L);
            } catch (TException e) {
                LOG.warn("Failed to abort txn in channel {}, table: {}, txn: {}, msg:{}", Long.valueOf(this.id), this.targetTable, Long.valueOf(this.txnExecutor.getTxnId()), e.getMessage());
                throw e;
            } catch (InterruptedException | ExecutionException | TimeoutException e2) {
                LOG.warn("Error occur while waiting abort txn response in channel {}, table: {}, txn: {}, msg:{}", Long.valueOf(this.id), this.targetTable, Long.valueOf(this.txnExecutor.getTxnId()), e2.getMessage());
                throw e2;
            }
        } catch (Throwable th) {
            this.batchBuffer = new Data<>();
            updateBatchId(-1L);
            throw th;
        }
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public void commitTxn() throws TException, TimeoutException, InterruptedException, ExecutionException {
        try {
            if (!isTxnBegin()) {
                LOG.warn("No transaction to commit in channel {}, table: {}", Long.valueOf(this.id), this.targetTable);
                return;
            }
            try {
                flushData();
                this.txnExecutor.commitTransaction();
                LOG.info("commit txn in channel {}, table: {}, txn id: {}, last batch: {}", Long.valueOf(this.id), this.targetTable, Long.valueOf(this.txnExecutor.getTxnId()), Long.valueOf(this.lastBatchId));
                this.batchBuffer = new Data<>();
                updateBatchId(-1L);
            } catch (TException e) {
                LOG.warn("Failed to commit txn in channel {}, table: {}, txn: {}, msg:{}", Long.valueOf(this.id), this.targetTable, Long.valueOf(this.txnExecutor.getTxnId()), e.getMessage());
                throw e;
            } catch (InterruptedException | ExecutionException | TimeoutException e2) {
                LOG.warn("Error occur while waiting commit txn return in channel {}, table: {}, txn: {}, msg:{}", Long.valueOf(this.id), this.targetTable, Long.valueOf(this.txnExecutor.getTxnId()), e2.getMessage());
                throw e2;
            }
        } catch (Throwable th) {
            this.batchBuffer = new Data<>();
            updateBatchId(-1L);
            throw th;
        }
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public void initTxn(long j) {
        if (isTxnInit()) {
            return;
        }
        UUID randomUUID = UUID.randomUUID();
        TUniqueId tUniqueId = new TUniqueId(randomUUID.getMostSignificantBits(), randomUUID.getLeastSignificantBits());
        this.timeoutSecond = j;
        this.txnExecutor = new InsertStreamTxnExecutor(new TransactionEntry(new TTxnParams().setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load).setThriftRpcTimeoutMs(5000L).setTxnId(-1L).setDb(this.db.getFullName()).setTbl(this.tbl.getName()).setDbId(this.db.getId()), this.db, this.tbl));
        this.txnExecutor.setTxnId(-1L);
        this.txnExecutor.setLoadId(tUniqueId);
    }

    public void clearTxn() {
        this.txnExecutor = null;
    }

    public void submit(long j, CanalEntry.EventType eventType, CanalEntry.RowChange rowChange) {
        Iterator it = rowChange.getRowDatasList().iterator();
        while (it.hasNext()) {
            List<InternalService.PDataRow> parseRow = parseRow(eventType, (CanalEntry.RowData) it.next());
            try {
                Preconditions.checkState(isTxnInit());
                if (j > this.lastBatchId) {
                    if (isTxnBegin()) {
                        SyncTaskPool.submit(new SendTask(this.id, this.index, this.callback, this.batchBuffer, this.txnExecutor));
                        this.batchBuffer = new Data<>();
                    } else {
                        beginTxn(j);
                    }
                    updateBatchId(j);
                }
                this.batchBuffer.addRows(parseRow);
            } catch (Exception e) {
                String str = "encounter exception when submit in channel " + this.id + ", table: " + this.targetTable + ", batch: " + j;
                LOG.error(str, e);
                throw new CanalException(str, e);
            }
        }
    }

    public void submitEOF() {
        SyncTaskPool.submit(new EOFTask(this.id, this.index, this.callback));
    }

    private List<InternalService.PDataRow> parseRow(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        ArrayList newArrayList = Lists.newArrayList();
        switch (AnonymousClass1.$SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[eventType.ordinal()]) {
            case 1:
                newArrayList.add(parseRow(CanalEntry.EventType.DELETE, rowData.getBeforeColumnsList()));
                break;
            case 2:
                newArrayList.add(parseRow(CanalEntry.EventType.INSERT, rowData.getAfterColumnsList()));
                break;
            case 3:
                newArrayList.add(parseRow(CanalEntry.EventType.DELETE, rowData.getBeforeColumnsList()));
                newArrayList.add(parseRow(CanalEntry.EventType.INSERT, rowData.getAfterColumnsList()));
                break;
            default:
                LOG.warn("ignore event, channel: {}, schema: {}, table: {}", Long.valueOf(this.id), this.srcDataBase, this.srcTable);
                break;
        }
        return newArrayList;
    }

    private InternalService.PDataRow parseRow(CanalEntry.EventType eventType, List<CanalEntry.Column> list) {
        InternalService.PDataRow.Builder newBuilder = InternalService.PDataRow.newBuilder();
        for (CanalEntry.Column column : list) {
            if (column.getIsNull()) {
                newBuilder.addColBuilder().setValue("\\N");
            } else {
                newBuilder.addColBuilder().setValue(column.getValue());
            }
        }
        if (eventType == CanalEntry.EventType.DELETE) {
            newBuilder.addColBuilder().setValue("1");
        } else {
            newBuilder.addColBuilder().setValue(SqlBlockUtil.LONG_DEFAULT);
        }
        return newBuilder.build();
    }

    public void flushData() throws TException, TimeoutException, InterruptedException, ExecutionException {
        if (this.batchBuffer.isNotEmpty()) {
            this.txnExecutor.getTxnEntry().setDataToSend(this.batchBuffer.getDatas());
            this.txnExecutor.sendData();
            this.batchBuffer = new Data<>();
        }
    }

    public boolean isTxnBegin() {
        return isTxnInit() && this.txnExecutor.getTxnId() != -1;
    }

    public boolean isTxnInit() {
        return this.txnExecutor != null;
    }

    private void updateBatchId(long j) {
        this.lastBatchId = j;
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public String getInfo() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.srcDataBase).append(SetUserPropertyVar.DOT_SEPARATOR).append(this.srcTable);
        sb.append("->");
        sb.append(this.targetTable);
        return sb.toString();
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public long getId() {
        return this.id;
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public String getSrcTable() {
        return this.srcTable;
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public String getSrcDataBase() {
        return this.srcDataBase;
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public String getTargetTable() {
        return this.targetTable;
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public void setCallback(SyncChannelCallback syncChannelCallback) {
        this.callback = syncChannelCallback;
    }

    @Override // org.apache.doris.load.sync.SyncChannel
    public void setPartitions(PartitionNames partitionNames) {
        this.partitionNames = partitionNames;
    }
}
