package com.alibaba.otter.node.etl.select.selector.canal;

import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.extend.communication.CanalConfigClient;
import com.alibaba.otter.canal.extend.ha.MediaHAController;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.sink.AbstractCanalEventSink;
import com.alibaba.otter.node.common.config.ConfigClientService;
import com.alibaba.otter.node.etl.OtterConstants;
import com.alibaba.otter.node.etl.OtterContextLocator;
import com.alibaba.otter.node.etl.select.exceptions.SelectException;
import com.alibaba.otter.node.etl.select.selector.Message;
import com.alibaba.otter.node.etl.select.selector.MessageDumper;
import com.alibaba.otter.node.etl.select.selector.MessageParser;
import com.alibaba.otter.node.etl.select.selector.OtterSelector;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.etl.model.EventData;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/otter/node/etl/select/selector/canal/CanalEmbedSelector.class */
public class CanalEmbedSelector implements OtterSelector {
    private static final Logger logger = LoggerFactory.getLogger(CanalEmbedSelector.class);
    private static final String SEP = SystemUtils.LINE_SEPARATOR;
    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final int maxEmptyTimes = 10;
    private Long pipelineId;
    private ClientIdentity clientIdentity;
    private MessageParser messageParser;
    private ConfigClientService configClientService;
    private OtterDownStreamHandler handler;
    private String destination;
    private String filter;
    private CanalConfigClient canalConfigClient;
    private int logSplitSize = 50;
    private boolean dump = true;
    private boolean dumpDetail = true;
    private int batchSize = 10000;
    private long batchTimeout = -1;
    private boolean ddlSync = true;
    private boolean filterTableError = false;
    private volatile boolean running = false;
    private volatile long lastEntryTime = 0;
    private CanalServerWithEmbedded canalServer = new CanalServerWithEmbedded();

    public CanalEmbedSelector(Long l) {
        this.pipelineId = l;
    }

    @Override // com.alibaba.otter.node.etl.select.selector.OtterSelector
    public boolean isStart() {
        return this.running;
    }

    @Override // com.alibaba.otter.node.etl.select.selector.OtterSelector
    public void start() {
        if (this.running) {
            return;
        }
        Pipeline findPipeline = this.configClientService.findPipeline(this.pipelineId);
        this.filter = CanalFilterSupport.makeFilterExpression(findPipeline);
        this.destination = findPipeline.getParameters().getDestinationName();
        this.batchSize = findPipeline.getParameters().getMainstemBatchsize().intValue();
        this.batchTimeout = findPipeline.getParameters().getBatchTimeout().longValue();
        this.ddlSync = findPipeline.getParameters().getDdlSync().booleanValue();
        final boolean z = findPipeline.getParameters().getSyncMode().isRow() || findPipeline.getParameters().isEnableRemedy().booleanValue();
        this.filterTableError = findPipeline.getParameters().getSkipSelectException().booleanValue();
        if (findPipeline.getParameters().getDumpSelector() != null) {
            this.dump = findPipeline.getParameters().getDumpSelector().booleanValue();
        }
        if (findPipeline.getParameters().getDumpSelectorDetail() != null) {
            this.dumpDetail = findPipeline.getParameters().getDumpSelectorDetail().booleanValue();
        }
        this.canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() { // from class: com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector.1
            public CanalInstance generate(String str) {
                Canal findCanal = CanalEmbedSelector.this.canalConfigClient.findCanal(str);
                OtterAlarmHandler otterAlarmHandler = new OtterAlarmHandler();
                otterAlarmHandler.setPipelineId(CanalEmbedSelector.this.pipelineId);
                OtterContextLocator.autowire(otterAlarmHandler);
                long j = 10000;
                if (findCanal.getCanalParameter().getSlaveId() != null) {
                    j = findCanal.getCanalParameter().getSlaveId().longValue();
                }
                findCanal.getCanalParameter().setSlaveId(Long.valueOf(j + CanalEmbedSelector.this.pipelineId.longValue()));
                findCanal.getCanalParameter().setDdlIsolation(Boolean.valueOf(CanalEmbedSelector.this.ddlSync));
                findCanal.getCanalParameter().setFilterTableError(Boolean.valueOf(CanalEmbedSelector.this.filterTableError));
                findCanal.getCanalParameter().setMemoryStorageRawEntry(false);
                CanalInstanceWithManager canalInstanceWithManager = new CanalInstanceWithManager(findCanal, CanalEmbedSelector.this.filter) { // from class: com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector.1.1
                    protected CanalHAController initHaController() {
                        return this.parameters.getHaMode().isMedia() ? new MediaHAController(this.parameters.getMediaGroup(), this.parameters.getDbUsername(), this.parameters.getDbPassword(), this.parameters.getDefaultDatabaseName()) : super.initHaController();
                    }

                    protected void startEventParserInternal(CanalEventParser canalEventParser, boolean z2) {
                        super.startEventParserInternal(canalEventParser, z2);
                        if (this.eventParser instanceof MysqlEventParser) {
                            this.eventParser.setSupportBinlogFormats("ROW");
                            if (z) {
                                this.eventParser.setSupportBinlogImages("FULL");
                            } else {
                                this.eventParser.setSupportBinlogImages("FULL,MINIMAL");
                            }
                            MysqlEventParser mysqlEventParser = this.eventParser;
                            mysqlEventParser.setParallel(false);
                            MediaHAController haController = mysqlEventParser.getHaController();
                            if (haController instanceof MediaHAController) {
                                if (z2) {
                                    throw new CanalException("not support group database use media HA");
                                }
                                haController.setCanalHASwitchable(mysqlEventParser);
                            }
                            if (!haController.isStart()) {
                                haController.start();
                            }
                            if (haController instanceof MediaHAController) {
                                this.eventParser.setMasterInfo(haController.getAvailableAuthenticationInfo());
                            }
                        }
                    }
                };
                canalInstanceWithManager.setAlarmHandler(otterAlarmHandler);
                AbstractCanalEventSink eventSink = canalInstanceWithManager.getEventSink();
                if (eventSink instanceof AbstractCanalEventSink) {
                    CanalEmbedSelector.this.handler = new OtterDownStreamHandler();
                    CanalEmbedSelector.this.handler.setPipelineId(CanalEmbedSelector.this.pipelineId);
                    CanalEmbedSelector.this.handler.setDetectingIntervalInSeconds(findCanal.getCanalParameter().getDetectingIntervalInSeconds());
                    OtterContextLocator.autowire(CanalEmbedSelector.this.handler);
                    eventSink.addHandler(CanalEmbedSelector.this.handler, 0);
                    CanalEmbedSelector.this.handler.start();
                }
                return canalInstanceWithManager;
            }
        });
        this.canalServer.start();
        this.canalServer.start(this.destination);
        this.clientIdentity = new ClientIdentity(this.destination, findPipeline.getParameters().getMainstemClientId().shortValue(), this.filter);
        this.canalServer.subscribe(this.clientIdentity);
        this.running = true;
    }

    @Override // com.alibaba.otter.node.etl.select.selector.OtterSelector
    public void stop() {
        if (this.running) {
            this.running = false;
            try {
                this.handler.stop();
            } catch (Exception e) {
                logger.warn("failed destory handler", e);
            }
            this.handler = null;
            this.canalServer.stop(this.destination);
            this.canalServer.stop();
        }
    }

    @Override // com.alibaba.otter.node.etl.select.selector.OtterSelector
    public Message<EventData> selector() throws InterruptedException {
        List<CanalEntry.Entry> entries;
        int i = 0;
        com.alibaba.otter.canal.protocol.Message message = null;
        if (this.batchTimeout < 0) {
            while (this.running) {
                message = this.canalServer.getWithoutAck(this.clientIdentity, this.batchSize);
                if (message != null && message.getId() != -1) {
                    break;
                }
                int i2 = i;
                i++;
                applyWait(i2);
            }
            if (!this.running) {
                throw new InterruptedException();
            }
        } else {
            while (this.running) {
                message = this.canalServer.getWithoutAck(this.clientIdentity, this.batchSize, Long.valueOf(this.batchTimeout), TimeUnit.MILLISECONDS);
                if (message != null && message.getId() != -1) {
                    break;
                }
            }
            if (!this.running) {
                throw new InterruptedException();
            }
        }
        if (message.isRaw()) {
            entries = new ArrayList(message.getRawEntries().size());
            Iterator it = message.getRawEntries().iterator();
            while (it.hasNext()) {
                try {
                    entries.add(CanalEntry.Entry.parseFrom((ByteString) it.next()));
                } catch (InvalidProtocolBufferException e) {
                    throw new SelectException((Throwable) e);
                }
            }
        } else {
            entries = message.getEntries();
        }
        Message<EventData> message2 = new Message<>(Long.valueOf(message.getId()), this.messageParser.parse(this.pipelineId, entries));
        if (!CollectionUtils.isEmpty(entries)) {
            long executeTime = entries.get(entries.size() - 1).getHeader().getExecuteTime();
            if (executeTime > 0) {
                this.lastEntryTime = executeTime;
            }
        }
        if (this.dump && logger.isInfoEnabled()) {
            String str = null;
            String str2 = null;
            if (!CollectionUtils.isEmpty(entries)) {
                str = buildPositionForDump(entries.get(0));
                str2 = buildPositionForDump(entries.get(entries.size() - 1));
            }
            dumpMessages(message2, str, str2, entries.size());
        }
        return message2;
    }

    @Override // com.alibaba.otter.node.etl.select.selector.OtterSelector
    public void rollback(Long l) {
        this.canalServer.rollback(this.clientIdentity, l);
    }

    @Override // com.alibaba.otter.node.etl.select.selector.OtterSelector
    public void rollback() {
        this.canalServer.rollback(this.clientIdentity);
    }

    @Override // com.alibaba.otter.node.etl.select.selector.OtterSelector
    public void ack(Long l) {
        this.canalServer.ack(this.clientIdentity, l.longValue());
    }

    @Override // com.alibaba.otter.node.etl.select.selector.OtterSelector
    public List<Long> unAckBatchs() {
        return this.canalServer.listBatchIds(this.clientIdentity);
    }

    @Override // com.alibaba.otter.node.etl.select.selector.OtterSelector
    public Long lastEntryTime() {
        return Long.valueOf(this.lastEntryTime);
    }

    private synchronized void dumpMessages(Message message, String str, String str2, int i) {
        try {
            MDC.put(OtterConstants.splitPipelineSelectLogFileKey, String.valueOf(this.pipelineId));
            logger.info(SEP + "****************************************************" + SEP);
            logger.info(MessageDumper.dumpMessageInfo(message, str, str2, i));
            logger.info("****************************************************" + SEP);
            if (this.dumpDetail) {
                dumpEventDatas(message.getDatas());
                logger.info("****************************************************" + SEP);
            }
        } finally {
            MDC.remove(OtterConstants.splitPipelineSelectLogFileKey);
        }
    }

    private void dumpEventDatas(List<EventData> list) {
        int size = list.size();
        int i = 0;
        do {
            if (i + this.logSplitSize >= size) {
                logger.info(MessageDumper.dumpEventDatas(list.subList(i, size)));
            } else {
                logger.info(MessageDumper.dumpEventDatas(list.subList(i, i + this.logSplitSize)));
            }
            i += this.logSplitSize;
        } while (i < size);
    }

    private void applyWait(int i) {
        int i2 = i > 10 ? 10 : i;
        if (i <= 3) {
            Thread.yield();
        } else {
            LockSupport.parkNanos(1000000 * i2);
        }
    }

    private String buildPositionForDump(CanalEntry.Entry entry) {
        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(entry.getHeader().getExecuteTime())) + ")";
    }

    public void setMessageParser(MessageParser messageParser) {
        this.messageParser = messageParser;
    }

    public void setConfigClientService(ConfigClientService configClientService) {
        this.configClientService = configClientService;
    }

    public void setCanalConfigClient(CanalConfigClient canalConfigClient) {
        this.canalConfigClient = canalConfigClient;
    }

    public void setDump(boolean z) {
        this.dump = z;
    }
}
