package org.apache.doris.load.sync.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.analysis.SetUserPropertyVar;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.load.sync.SyncChannelHandle;
import org.apache.doris.load.sync.SyncDataConsumer;
import org.apache.doris.load.sync.model.Events;
import org.apache.doris.load.sync.position.EntryPosition;
import org.apache.doris.load.sync.position.PositionMeta;
import org.apache.doris.load.sync.position.PositionRange;
import org.apache.doris.mysql.MysqlServerStatusFlag;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/sync/canal/CanalSyncDataConsumer.class */
public class CanalSyncDataConsumer extends SyncDataConsumer {
    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private CanalSyncJob syncJob;
    private CanalConnector connector;
    private Map<Long, CanalSyncChannel> idToChannels;
    private Set<Long> ackBatches;
    private PositionMeta<EntryPosition> positionMeta;
    private LinkedBlockingQueue<Events<CanalEntry.Entry, EntryPosition>> dataBlockingQueue;
    private SyncChannelHandle handle;
    private ReentrantLock getLock;
    private int sleepTimeMs;
    private long commitIntervalSecond;
    private static Logger logger = LogManager.getLogger(CanalSyncDataConsumer.class);
    private static final long MIN_COMMIT_EVENT_SIZE = Config.min_sync_commit_size;
    private static final long MIN_COMMIT_MEM_SIZE = Config.min_bytes_sync_commit;
    private static final long MAX_COMMIT_MEM_SIZE = Config.max_bytes_sync_commit;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.doris.load.sync.canal.CanalSyncDataConsumer$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/doris/load/sync/canal/CanalSyncDataConsumer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType = new int[CanalEntry.EventType.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.CREATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.ALTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.TRUNCATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.ERASE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.QUERY.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.RENAME.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.CINDEX.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[CanalEntry.EventType.DINDEX.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    public void setChannels(Map<Long, CanalSyncChannel> map) {
        for (CanalSyncChannel canalSyncChannel : map.values()) {
            this.positionMeta.setCommitPosition(canalSyncChannel.getId(), EntryPosition.MIN_POS);
            canalSyncChannel.setCallback(this.handle);
        }
        this.idToChannels = map;
    }

    public CanalSyncDataConsumer(CanalSyncJob canalSyncJob, CanalConnector canalConnector, ReentrantLock reentrantLock, boolean z) {
        super(z);
        this.syncJob = canalSyncJob;
        this.connector = canalConnector;
        this.dataBlockingQueue = Queues.newLinkedBlockingQueue(MysqlServerStatusFlag.SERVER_STATUS_METADATA_CHANGED);
        this.ackBatches = Sets.newHashSet();
        this.positionMeta = new PositionMeta<>();
        this.getLock = reentrantLock;
        this.handle = new SyncChannelHandle();
        this.commitIntervalSecond = Config.sync_commit_interval_second;
        this.sleepTimeMs = StatisticConstants.FETCH_INTERVAL_IN_MS;
    }

    public void stop(boolean z) {
        super.stop();
        if (z) {
            cleanUp();
        }
    }

    @Override // org.apache.doris.load.sync.SyncDataConsumer
    public void beginForTxn() {
        this.handle.reset(this.idToChannels.size());
        for (CanalSyncChannel canalSyncChannel : this.idToChannels.values()) {
            canalSyncChannel.initTxn(Config.max_stream_load_timeout_second);
            this.handle.mark(canalSyncChannel);
        }
    }

    @Override // org.apache.doris.load.sync.SyncDataConsumer
    public void abortForTxn(String str) {
        abortForTxn(this.idToChannels.values(), str);
    }

    public void abortForTxn(Collection<CanalSyncChannel> collection, String str) {
        logger.info("client is aborting transactions. JobId: {}, reason: {}", Long.valueOf(this.syncJob.getId()), str);
        for (CanalSyncChannel canalSyncChannel : collection) {
            if (canalSyncChannel.isTxnBegin()) {
                try {
                    canalSyncChannel.abortTxn(str);
                } catch (Exception e) {
                    logger.warn("Abort channel failed. jobId: {}，channel: {}, target: {}, msg: {}", Long.valueOf(this.syncJob.getId()), Long.valueOf(canalSyncChannel.getId()), canalSyncChannel.getTargetTable(), e.getMessage());
                }
            }
        }
        rollback();
    }

    @Override // org.apache.doris.load.sync.SyncDataConsumer
    public void commitForTxn() {
        logger.info("client is committing transactions. JobId: {}", Long.valueOf(this.syncJob.getId()));
        boolean z = true;
        EntryPosition latestPosition = this.positionMeta.getLatestPosition();
        for (CanalSyncChannel canalSyncChannel : this.idToChannels.values()) {
            if (canalSyncChannel.isTxnBegin()) {
                try {
                    canalSyncChannel.commitTxn();
                    this.positionMeta.setCommitPosition(canalSyncChannel.getId(), latestPosition);
                } catch (Exception e) {
                    logger.warn("Commit channel failed. JobId: {}, channel: {}, target: {}, msg: {}", Long.valueOf(this.syncJob.getId()), Long.valueOf(canalSyncChannel.getId()), canalSyncChannel.getTargetTable(), e.getMessage());
                    try {
                        canalSyncChannel.abortTxn(e.getMessage());
                    } catch (Exception e2) {
                        logger.warn("Abort channel failed. JobId: {}，channel: {}, target: {}, msg: {}", Long.valueOf(this.syncJob.getId()), Long.valueOf(canalSyncChannel.getId()), canalSyncChannel.getTargetTable(), e2.getMessage());
                    }
                    z = false;
                }
            }
        }
        if (z) {
            ack();
        } else {
            rollback();
        }
    }

    public Status waitForTxn() {
        Iterator<CanalSyncChannel> it = this.idToChannels.values().iterator();
        while (it.hasNext()) {
            it.next().submitEOF();
        }
        Status status = Status.CANCELLED;
        try {
            this.handle.join();
            status = this.handle.getStatus();
        } catch (InterruptedException e) {
            logger.warn("InterruptedException: ", e);
        }
        return status;
    }

    public void cleanForTxn() {
        for (CanalSyncChannel canalSyncChannel : this.idToChannels.values()) {
            if (canalSyncChannel.isTxnInit()) {
                canalSyncChannel.clearTxn();
            }
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:20:0x00d0 A[LOOP:1: B:5:0x0014->B:20:0x00d0, LOOP_END] */
    /* JADX WARN: Removed duplicated region for block: B:21:0x00cd A[SYNTHETIC] */
    @Override // org.apache.doris.load.sync.SyncDataConsumer, org.apache.doris.load.sync.SyncLifeCycle
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void process() {
        /*
            Method dump skipped, instructions count: 337
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.doris.load.sync.canal.CanalSyncDataConsumer.process():void");
    }

    public void put(Message message, int i) {
        List<CanalEntry.Entry> entries;
        if (message.isRaw()) {
            entries = new ArrayList(message.getRawEntries().size());
            Iterator it = message.getRawEntries().iterator();
            while (it.hasNext()) {
                try {
                    entries.add(CanalEntry.Entry.parseFrom((ByteString) it.next()));
                } catch (InvalidProtocolBufferException e) {
                    throw new CanalException(e);
                }
            }
        } else {
            entries = message.getEntries();
        }
        int i2 = 0;
        EntryPosition ackPosition = this.positionMeta.getAckPosition();
        if (ackPosition != null) {
            EntryPosition createPosition = EntryPosition.createPosition(entries.get(0));
            if (EntryPosition.min(createPosition, ackPosition).equals(createPosition)) {
                for (int i3 = 0; i3 <= entries.size() - 1; i3++) {
                    i2++;
                    if (EntryPosition.checkPosition(entries.get(i3), ackPosition)) {
                        break;
                    }
                }
            }
        }
        if (i2 <= i - 1) {
            Events<CanalEntry.Entry, EntryPosition> events = new Events<>(Long.valueOf(message.getId()));
            PositionRange<EntryPosition> positionRange = new PositionRange<>();
            events.setPositionRange(positionRange);
            positionRange.setStart(EntryPosition.createPosition(entries.get(i2)));
            positionRange.setEnd(EntryPosition.createPosition(entries.get(i - 1)));
            events.setDatas(entries);
            long j = 0;
            Iterator<CanalEntry.Entry> it2 = entries.iterator();
            while (it2.hasNext()) {
                j += it2.next().getHeader().getEventLength();
            }
            events.setMemSize(Long.valueOf(j));
            try {
                this.dataBlockingQueue.put(events);
            } catch (InterruptedException e2) {
                logger.error("put message to executor error:", e2);
                throw new CanalException(e2);
            }
        }
    }

    private void executeOneBatch(Events<CanalEntry.Entry, EntryPosition> events) throws UserException {
        long longValue = events.getId().longValue();
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        EntryPosition start = events.getPositionRange().getStart();
        EntryPosition end = events.getPositionRange().getEnd();
        for (CanalSyncChannel canalSyncChannel : this.idToChannels.values()) {
            String fullName = CanalUtils.getFullName(canalSyncChannel.getSrcDataBase(), canalSyncChannel.getSrcTable());
            EntryPosition commitPosition = this.positionMeta.getCommitPosition(canalSyncChannel.getId());
            if (commitPosition == null) {
                newHashMap.put(fullName, canalSyncChannel);
            } else if (commitPosition.compareTo(start) < 0) {
                newHashMap.put(fullName, canalSyncChannel);
            } else if (commitPosition.compareTo(end) < 0) {
                newHashMap2.put(fullName, canalSyncChannel);
            }
        }
        for (CanalEntry.Entry entry : events.getDatas()) {
            try {
                if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                    try {
                        CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        CanalEntry.Header header = entry.getHeader();
                        CanalEntry.EventType eventType = parseFrom.getEventType();
                        if (!CanalUtils.isDML(eventType) || parseFrom.getIsDdl()) {
                            processDDL(header, eventType, parseFrom.getSql());
                            return;
                        }
                        String fullName2 = CanalUtils.getFullName(header.getSchemaName(), header.getTableName());
                        if (newHashMap.containsKey(fullName2)) {
                            ((CanalSyncChannel) newHashMap.get(fullName2)).submit(longValue, eventType, parseFrom);
                        } else if (newHashMap2.containsKey(fullName2)) {
                            CanalSyncChannel canalSyncChannel2 = (CanalSyncChannel) newHashMap2.get(fullName2);
                            if (EntryPosition.createPosition(entry).compareTo(this.positionMeta.getCommitPosition(canalSyncChannel2.getId())) > 0) {
                                canalSyncChannel2.submit(longValue, eventType, parseFrom);
                            }
                        }
                        if (this.debug) {
                            CanalUtils.printRow(parseFrom, header);
                        }
                    } catch (InvalidProtocolBufferException e) {
                        throw new CanalException("parse event has an error , data:" + entry.toString(), e);
                    }
                }
            } catch (Exception e2) {
                logger.error("execute event has an error, data: {}, msg: {}", entry.toString(), e2);
                throw new UserException("execute batch failed, batchId: " + longValue + " ,msg: " + e2.getMessage());
            }
        }
    }

    private void processDDL(CanalEntry.Header header, CanalEntry.EventType eventType, String str) {
        String str2 = header.getSchemaName() + SetUserPropertyVar.DOT_SEPARATOR + header.getTableName();
        switch (AnonymousClass1.$SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EventType[eventType.ordinal()]) {
            case 1:
                logger.warn("parse create table event, table: {}, sql: {}", str2, str);
                return;
            case 2:
                logger.warn("parse alter table event, table: {}, sql: {}", str2, str);
                return;
            case 3:
                logger.warn("parse truncate table event, table: {}, sql: {}", str2, str);
                return;
            case 4:
            case 5:
                logger.warn("parse event : {}, sql: {} . ignored!", eventType.name(), str);
                return;
            case 6:
                logger.warn("parse rename table event, table: {}, sql: {}", str2, str);
                return;
            case 7:
                logger.warn("parse create index event, table: {}, sql: {}", str2, str);
                return;
            case 8:
                logger.warn("parse delete index event, table: {}, sql: {}", str2, str);
                return;
            default:
                logger.warn("parse unknown event: {}, table: {}, sql: {}", eventType.name(), str2, str);
                return;
        }
    }

    private void ack() {
        if (this.ackBatches.isEmpty()) {
            return;
        }
        logger.info("client ack batches: {}", this.ackBatches);
        while (!this.ackBatches.isEmpty()) {
            long longValue = ((Long) Collections.min(this.ackBatches)).longValue();
            this.connector.ack(longValue);
            this.ackBatches.remove(Long.valueOf(longValue));
            this.positionMeta.setAckPosition(this.positionMeta.removeBatch(longValue).getEnd());
            this.positionMeta.setAckTime(System.currentTimeMillis());
        }
    }

    private void rollback() {
        holdGetLock();
        try {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
            if (!this.ackBatches.isEmpty()) {
                this.connector.rollback();
            }
            this.dataBlockingQueue.clear();
            this.ackBatches.clear();
            this.positionMeta.clearAllBatch();
        } finally {
            releaseGetLock();
        }
    }

    public String getPositionInfo() {
        EntryPosition ackPosition = this.positionMeta.getAckPosition();
        long ackTime = this.positionMeta.getAckTime();
        StringBuilder sb = new StringBuilder();
        if (ackPosition != null) {
            DateTimeFormatter withZone = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
            sb.append("position:").append(ackPosition).append(", executeTime:[").append(withZone.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneId.systemDefault()))).append("], ").append("delay:").append(ackTime - ackPosition.getExecuteTime().longValue()).append("ms");
            if (StringUtils.isNotEmpty(ackPosition.getGtid())) {
                sb.append(", gtid(").append(ackPosition.getGtid()).append(") ");
            }
        } else {
            sb.append("position:").append("N/A");
        }
        return sb.toString();
    }

    private void cleanUp() {
        this.dataBlockingQueue.clear();
        this.ackBatches.clear();
        this.positionMeta.cleanUp();
    }

    private void holdGetLock() {
        this.getLock.lock();
    }

    private void releaseGetLock() {
        this.getLock.unlock();
    }
}
