package org.apache.doris.load.sync.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.doris.load.sync.SyncDataReceiver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/sync/canal/CanalSyncDataReceiver.class */
public class CanalSyncDataReceiver extends SyncDataReceiver {
    private static Logger LOG = LogManager.getLogger(CanalSyncDataReceiver.class);
    private CanalSyncJob syncJob;
    private CanalConnector connector;
    private ReentrantLock getLock;
    private CanalSyncDataConsumer consumer;
    private String destination;
    private String filter;
    private long sleepTimeMs;

    public CanalSyncDataReceiver(CanalSyncJob canalSyncJob, CanalConnector canalConnector, String str, String str2, CanalSyncDataConsumer canalSyncDataConsumer, int i, ReentrantLock reentrantLock) {
        super(i);
        this.syncJob = canalSyncJob;
        this.connector = canalConnector;
        this.consumer = canalSyncDataConsumer;
        this.destination = str;
        this.filter = str2;
        this.getLock = reentrantLock;
        this.sleepTimeMs = 20L;
    }

    public void setFilter(String str) {
        this.filter = str;
    }

    @Override // org.apache.doris.load.sync.SyncLifeCycle
    public void start() {
        super.start();
        LOG.info("receiver has been started. destination: {}, filter: {}, batch size: {}", this.destination, this.filter, Integer.valueOf(this.readBatchSize));
    }

    @Override // org.apache.doris.load.sync.SyncDataReceiver, org.apache.doris.load.sync.SyncLifeCycle
    public void process() {
        while (this.running) {
            try {
                try {
                    this.connector.connect();
                    this.connector.subscribe(this.filter);
                    this.connector.rollback();
                    while (this.running) {
                        holdGetLock();
                        try {
                            Message withoutAck = this.connector.getWithoutAck(this.readBatchSize, Long.valueOf(CanalConfigs.getWaitingTimeoutMs), TimeUnit.MILLISECONDS);
                            releaseGetLock();
                            int size = withoutAck.isRaw() ? withoutAck.getRawEntries().size() : withoutAck.getEntries().size();
                            if (withoutAck.getId() == -1 || size == 0) {
                                try {
                                    Thread.sleep(this.sleepTimeMs);
                                } catch (InterruptedException e) {
                                }
                            } else {
                                this.consumer.put(withoutAck, size);
                            }
                        } catch (Throwable th) {
                            releaseGetLock();
                            throw th;
                            break;
                        }
                    }
                    this.connector.disconnect();
                } catch (Throwable th2) {
                    LOG.error("Receiver is error. {}", th2.getMessage());
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e2) {
                    }
                    this.connector.disconnect();
                }
            } catch (Throwable th3) {
                this.connector.disconnect();
                throw th3;
            }
        }
    }

    private void holdGetLock() {
        this.getLock.lock();
    }

    private void releaseGetLock() {
        this.getLock.unlock();
    }
}
