package com.alibaba.otter.canal.instance.manager;

import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
import com.alibaba.otter.canal.common.alarm.LogAlarmHandler;
import com.alibaba.otter.canal.common.utils.JsonUtils;
import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
import com.alibaba.otter.canal.instance.core.AbstractCanalInstance;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.meta.FileMixedMetaManager;
import com.alibaba.otter.canal.meta.MemoryMetaManager;
import com.alibaba.otter.canal.meta.PeriodMixedMetaManager;
import com.alibaba.otter.canal.meta.ZooKeeperMetaManager;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.ha.HeartBeatHAController;
import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFactory;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBBuilder;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.sink.entry.EntryEventSink;
import com.alibaba.otter.canal.sink.entry.group.GroupEventSink;
import com.alibaba.otter.canal.store.AbstractCanalStoreScavenge;
import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
import com.alibaba.otter.canal.store.model.BatchMode;
import java.io.File;
import java.io.FilenameFilter;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.class */
public class CanalInstanceWithManager extends AbstractCanalInstance {
    private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithManager.class);
    protected String filter;
    protected CanalParameter parameters;

    public CanalInstanceWithManager(Canal canal, String str) {
        this.parameters = canal.getCanalParameter();
        this.canalId = canal.getId();
        this.destination = canal.getName();
        this.filter = str;
        logger.info("init CanalInstance for {}-{} with parameters:{}", new Object[]{this.canalId, this.destination, this.parameters});
        initAlarmHandler();
        initMetaManager();
        initEventStore();
        initEventSink();
        initEventParser();
        if (!this.alarmHandler.isStart()) {
            this.alarmHandler.start();
        }
        if (!this.metaManager.isStart()) {
            this.metaManager.start();
        }
        logger.info("init successful....");
    }

    public void start() {
        logger.info("start CannalInstance for {}-{} with parameters:{}", new Object[]{this.canalId, this.destination, this.parameters});
        super.start();
    }

    protected void initAlarmHandler() {
        logger.info("init alarmHandler begin...");
        String alarmHandlerClass = this.parameters.getAlarmHandlerClass();
        String alarmHandlerPluginDir = this.parameters.getAlarmHandlerPluginDir();
        if (alarmHandlerClass == null || alarmHandlerPluginDir == null) {
            this.alarmHandler = new LogAlarmHandler();
        } else {
            try {
                File[] listFiles = new File(alarmHandlerPluginDir).listFiles(new FilenameFilter() { // from class: com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.1
                    @Override // java.io.FilenameFilter
                    public boolean accept(File file, String str) {
                        return str.endsWith(".jar");
                    }
                });
                if (listFiles == null || listFiles.length == 0) {
                    throw new IllegalStateException(String.format("alarmHandlerPluginDir [%s] can't find any name endswith \".jar\" file.", alarmHandlerPluginDir));
                }
                URL[] urlArr = new URL[listFiles.length];
                for (int i = 0; i < listFiles.length; i++) {
                    urlArr[i] = listFiles[i].toURI().toURL();
                }
                this.alarmHandler = (CanalAlarmHandler) new URLClassLoader(urlArr, CanalInstanceWithManager.class.getClassLoader()).loadClass(alarmHandlerClass).newInstance();
                logger.info("init [{}] alarm handler success.", alarmHandlerClass);
            } catch (Throwable th) {
                String format = String.format("init alarmHandlerPluginDir [%s] alarm handler [%s] error: %s", alarmHandlerPluginDir, alarmHandlerClass, ExceptionUtils.getFullStackTrace(th));
                logger.error(format);
                throw new CanalException(format, th);
            }
        }
        logger.info("init alarmHandler end! \n\t load CanalAlarmHandler:{} ", this.alarmHandler.getClass().getName());
    }

    protected void initMetaManager() {
        logger.info("init metaManager begin...");
        CanalParameter.MetaMode metaMode = this.parameters.getMetaMode();
        if (metaMode.isMemory()) {
            this.metaManager = new MemoryMetaManager();
        } else if (metaMode.isZookeeper()) {
            this.metaManager = new ZooKeeperMetaManager();
            this.metaManager.setZkClientx(getZkclientx());
        } else if (metaMode.isMixed()) {
            this.metaManager = new PeriodMixedMetaManager();
            ZooKeeperMetaManager zooKeeperMetaManager = new ZooKeeperMetaManager();
            zooKeeperMetaManager.setZkClientx(getZkclientx());
            this.metaManager.setZooKeeperMetaManager(zooKeeperMetaManager);
        } else {
            if (!metaMode.isLocalFile()) {
                throw new CanalException("unsupport MetaMode for " + metaMode);
            }
            FileMixedMetaManager fileMixedMetaManager = new FileMixedMetaManager();
            fileMixedMetaManager.setDataDir(this.parameters.getDataDir());
            fileMixedMetaManager.setPeriod(this.parameters.getMetaFileFlushPeriod().intValue());
            this.metaManager = fileMixedMetaManager;
        }
        logger.info("init metaManager end! \n\t load CanalMetaManager:{} ", this.metaManager.getClass().getName());
    }

    protected void initEventStore() {
        logger.info("init eventStore begin...");
        CanalParameter.StorageMode storageMode = this.parameters.getStorageMode();
        if (!storageMode.isMemory()) {
            if (storageMode.isFile()) {
                throw new CanalException("unsupport MetaMode for " + storageMode);
            }
            if (!storageMode.isMixed()) {
                throw new CanalException("unsupport MetaMode for " + storageMode);
            }
            throw new CanalException("unsupport MetaMode for " + storageMode);
        }
        MemoryEventStoreWithBuffer memoryEventStoreWithBuffer = new MemoryEventStoreWithBuffer();
        memoryEventStoreWithBuffer.setBufferSize(this.parameters.getMemoryStorageBufferSize().intValue());
        memoryEventStoreWithBuffer.setBufferMemUnit(this.parameters.getMemoryStorageBufferMemUnit().intValue());
        memoryEventStoreWithBuffer.setBatchMode(BatchMode.valueOf(this.parameters.getStorageBatchMode().name()));
        memoryEventStoreWithBuffer.setDdlIsolation(this.parameters.getDdlIsolation().booleanValue());
        memoryEventStoreWithBuffer.setRaw(this.parameters.getMemoryStorageRawEntry().booleanValue());
        this.eventStore = memoryEventStoreWithBuffer;
        if (this.eventStore instanceof AbstractCanalStoreScavenge) {
            CanalParameter.StorageScavengeMode storageScavengeMode = this.parameters.getStorageScavengeMode();
            AbstractCanalStoreScavenge abstractCanalStoreScavenge = this.eventStore;
            abstractCanalStoreScavenge.setDestination(this.destination);
            abstractCanalStoreScavenge.setCanalMetaManager(this.metaManager);
            abstractCanalStoreScavenge.setOnAck(storageScavengeMode.isOnAck());
            abstractCanalStoreScavenge.setOnFull(storageScavengeMode.isOnFull());
            abstractCanalStoreScavenge.setOnSchedule(storageScavengeMode.isOnSchedule());
            if (storageScavengeMode.isOnSchedule()) {
                abstractCanalStoreScavenge.setScavengeSchedule(this.parameters.getScavengeSchdule());
            }
        }
        logger.info("init eventStore end! \n\t load CanalEventStore:{}", this.eventStore.getClass().getName());
    }

    protected void initEventSink() {
        logger.info("init eventSink begin...");
        int groupSize = getGroupSize();
        if (groupSize <= 1) {
            this.eventSink = new EntryEventSink();
        } else {
            this.eventSink = new GroupEventSink(groupSize);
        }
        if (this.eventSink instanceof EntryEventSink) {
            this.eventSink.setFilterTransactionEntry(false);
            this.eventSink.setEventStore(getEventStore());
        }
        logger.info("init eventSink end! \n\t load CanalEventSink:{}", this.eventSink.getClass().getName());
    }

    protected void initEventParser() {
        logger.info("init eventParser begin...");
        CanalParameter.SourcingType sourcingType = this.parameters.getSourcingType();
        List<List<CanalParameter.DataSourcing>> groupDbAddresses = this.parameters.getGroupDbAddresses();
        if (CollectionUtils.isEmpty(groupDbAddresses)) {
            this.eventParser = doInitEventParser(sourcingType, new ArrayList());
        } else {
            int size = groupDbAddresses.get(0).size();
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < size; i++) {
                ArrayList arrayList2 = new ArrayList();
                CanalParameter.SourcingType sourcingType2 = null;
                for (List<CanalParameter.DataSourcing> list : groupDbAddresses) {
                    if (sourcingType2 != null && !sourcingType2.equals(list.get(i).getType())) {
                        throw new CanalException(String.format("master/slave Sourcing type is unmatch. %s vs %s", sourcingType2, list.get(i).getType()));
                    }
                    sourcingType2 = list.get(i).getType();
                    arrayList2.add(list.get(i).getDbAddress());
                }
                arrayList.add(doInitEventParser(sourcingType2, arrayList2));
            }
            if (arrayList.size() > 1) {
                GroupEventParser groupEventParser = new GroupEventParser();
                groupEventParser.setEventParsers(arrayList);
                this.eventParser = groupEventParser;
            } else {
                this.eventParser = (CanalEventParser) arrayList.get(0);
            }
        }
        logger.info("init eventParser end! \n\t load CanalEventParser:{}", this.eventParser.getClass().getName());
    }

    private CanalEventParser doInitEventParser(CanalParameter.SourcingType sourcingType, List<InetSocketAddress> list) {
        RdsBinlogEventParserProxy rdsBinlogEventParserProxy;
        RdsBinlogEventParserProxy mysqlEventParser;
        if (sourcingType.isMysql()) {
            if (StringUtils.isNotEmpty(this.parameters.getRdsAccesskey()) && StringUtils.isNotEmpty(this.parameters.getRdsSecretkey()) && StringUtils.isNotEmpty(this.parameters.getRdsInstanceId())) {
                mysqlEventParser = new RdsBinlogEventParserProxy();
                mysqlEventParser.setAccesskey(this.parameters.getRdsAccesskey());
                mysqlEventParser.setSecretkey(this.parameters.getRdsSecretkey());
                mysqlEventParser.setInstanceId(this.parameters.getRdsInstanceId());
            } else {
                mysqlEventParser = new MysqlEventParser();
            }
            mysqlEventParser.setDestination(this.destination);
            mysqlEventParser.setConnectionCharset(Charset.forName(this.parameters.getConnectionCharset()));
            mysqlEventParser.setConnectionCharsetNumber(this.parameters.getConnectionCharsetNumber().byteValue());
            mysqlEventParser.setDefaultConnectionTimeoutInSeconds(this.parameters.getDefaultConnectionTimeoutInSeconds().intValue());
            mysqlEventParser.setSendBufferSize(this.parameters.getSendBufferSize().intValue());
            mysqlEventParser.setReceiveBufferSize(this.parameters.getReceiveBufferSize().intValue());
            mysqlEventParser.setDetectingEnable(this.parameters.getDetectingEnable().booleanValue());
            mysqlEventParser.setDetectingSQL(this.parameters.getDetectingSQL());
            mysqlEventParser.setDetectingIntervalInSeconds(this.parameters.getDetectingIntervalInSeconds());
            mysqlEventParser.setSlaveId(this.parameters.getSlaveId().longValue());
            if (!CollectionUtils.isEmpty(list)) {
                mysqlEventParser.setMasterInfo(new AuthenticationInfo(list.get(0), this.parameters.getDbUsername(), this.parameters.getDbPassword(), this.parameters.getDefaultDatabaseName()));
                if (list.size() > 1) {
                    mysqlEventParser.setStandbyInfo(new AuthenticationInfo(list.get(1), this.parameters.getDbUsername(), this.parameters.getDbPassword(), this.parameters.getDefaultDatabaseName()));
                }
            }
            if (!CollectionUtils.isEmpty(this.parameters.getPositions())) {
                mysqlEventParser.setMasterPosition((EntryPosition) JsonUtils.unmarshalFromString(this.parameters.getPositions().get(0), EntryPosition.class));
                if (this.parameters.getPositions().size() > 1) {
                    mysqlEventParser.setStandbyPosition((EntryPosition) JsonUtils.unmarshalFromString(this.parameters.getPositions().get(1), EntryPosition.class));
                }
            }
            mysqlEventParser.setFallbackIntervalInSeconds(this.parameters.getFallbackIntervalInSeconds().intValue());
            mysqlEventParser.setProfilingEnabled(false);
            mysqlEventParser.setFilterTableError(this.parameters.getFilterTableError().booleanValue());
            mysqlEventParser.setParallel(this.parameters.getParallel().booleanValue());
            mysqlEventParser.setIsGTIDMode(BooleanUtils.toBoolean(this.parameters.getGtidEnable()));
            if (this.parameters.getTsdbSnapshotInterval() != null) {
                mysqlEventParser.setTsdbSnapshotInterval(this.parameters.getTsdbSnapshotInterval().intValue());
            }
            if (this.parameters.getTsdbSnapshotExpire() != null) {
                mysqlEventParser.setTsdbSnapshotExpire(this.parameters.getTsdbSnapshotExpire().intValue());
            }
            boolean z = BooleanUtils.toBoolean(this.parameters.getTsdbEnable());
            if (z) {
                mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() { // from class: com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.2
                    public void destory(String str) {
                        TableMetaTSDBBuilder.destory(str);
                    }

                    public TableMetaTSDB build(String str, String str2) {
                        try {
                            System.setProperty("canal.instance.tsdb.url", CanalInstanceWithManager.this.parameters.getTsdbJdbcUrl());
                            System.setProperty("canal.instance.tsdb.dbUsername", CanalInstanceWithManager.this.parameters.getTsdbJdbcUserName());
                            System.setProperty("canal.instance.tsdb.dbPassword", CanalInstanceWithManager.this.parameters.getTsdbJdbcPassword());
                            TableMetaTSDB build = TableMetaTSDBBuilder.build(str, "classpath:spring/tsdb/mysql-tsdb.xml");
                            System.setProperty("canal.instance.tsdb.url", "");
                            System.setProperty("canal.instance.tsdb.dbUsername", "");
                            System.setProperty("canal.instance.tsdb.dbPassword", "");
                            return build;
                        } catch (Throwable th) {
                            System.setProperty("canal.instance.tsdb.url", "");
                            System.setProperty("canal.instance.tsdb.dbUsername", "");
                            System.setProperty("canal.instance.tsdb.dbPassword", "");
                            throw th;
                        }
                    }
                });
                mysqlEventParser.setEnableTsdb(z);
            }
            rdsBinlogEventParserProxy = mysqlEventParser;
        } else {
            if (!sourcingType.isLocalBinlog()) {
                if (sourcingType.isOracle()) {
                    throw new CanalException("unsupport SourcingType for " + sourcingType);
                }
                throw new CanalException("unsupport SourcingType for " + sourcingType);
            }
            RdsBinlogEventParserProxy localBinlogEventParser = new LocalBinlogEventParser();
            localBinlogEventParser.setDestination(this.destination);
            localBinlogEventParser.setBufferSize(this.parameters.getReceiveBufferSize().intValue());
            localBinlogEventParser.setConnectionCharset(Charset.forName(this.parameters.getConnectionCharset()));
            localBinlogEventParser.setConnectionCharsetNumber(this.parameters.getConnectionCharsetNumber().byteValue());
            localBinlogEventParser.setDirectory(this.parameters.getLocalBinlogDirectory());
            localBinlogEventParser.setProfilingEnabled(false);
            localBinlogEventParser.setDetectingEnable(this.parameters.getDetectingEnable().booleanValue());
            localBinlogEventParser.setDetectingIntervalInSeconds(this.parameters.getDetectingIntervalInSeconds());
            localBinlogEventParser.setFilterTableError(this.parameters.getFilterTableError().booleanValue());
            localBinlogEventParser.setParallel(this.parameters.getParallel().booleanValue());
            if (!CollectionUtils.isEmpty(list)) {
                localBinlogEventParser.setMasterInfo(new AuthenticationInfo(list.get(0), this.parameters.getDbUsername(), this.parameters.getDbPassword(), this.parameters.getDefaultDatabaseName()));
            }
            rdsBinlogEventParserProxy = localBinlogEventParser;
        }
        if (rdsBinlogEventParserProxy instanceof AbstractEventParser) {
            AbstractEventParser abstractEventParser = (AbstractEventParser) rdsBinlogEventParserProxy;
            abstractEventParser.setTransactionSize(this.parameters.getTransactionSize().intValue());
            abstractEventParser.setLogPositionManager(initLogPositionManager());
            abstractEventParser.setAlarmHandler(getAlarmHandler());
            abstractEventParser.setEventSink(getEventSink());
            if (StringUtils.isNotEmpty(this.filter)) {
                abstractEventParser.setEventFilter(new AviaterRegexFilter(this.filter));
            }
            if (StringUtils.isNotEmpty(this.parameters.getBlackFilter())) {
                abstractEventParser.setEventBlackFilter(new AviaterRegexFilter(this.parameters.getBlackFilter()));
            }
        }
        if (rdsBinlogEventParserProxy instanceof MysqlEventParser) {
            ((MysqlEventParser) rdsBinlogEventParserProxy).setHaController(initHaController());
        }
        return rdsBinlogEventParserProxy;
    }

    protected CanalHAController initHaController() {
        logger.info("init haController begin...");
        CanalParameter.HAMode haMode = this.parameters.getHaMode();
        if (!haMode.isHeartBeat()) {
            throw new CanalException("unsupport HAMode for " + haMode);
        }
        HeartBeatHAController heartBeatHAController = new HeartBeatHAController();
        heartBeatHAController.setDetectingRetryTimes(this.parameters.getDetectingRetryTimes().intValue());
        heartBeatHAController.setSwitchEnable(this.parameters.getHeartbeatHaEnable().booleanValue());
        logger.info("init haController end! \n\t load CanalHAController:{}", heartBeatHAController.getClass().getName());
        return heartBeatHAController;
    }

    protected CanalLogPositionManager initLogPositionManager() {
        MemoryLogPositionManager failbackLogPositionManager;
        logger.info("init logPositionPersistManager begin...");
        CanalParameter.IndexMode indexMode = this.parameters.getIndexMode();
        if (indexMode.isMemory()) {
            failbackLogPositionManager = new MemoryLogPositionManager();
        } else if (indexMode.isZookeeper()) {
            failbackLogPositionManager = new ZooKeeperLogPositionManager(getZkclientx());
        } else if (indexMode.isMixed()) {
            failbackLogPositionManager = new PeriodMixedLogPositionManager(new MemoryLogPositionManager(), new ZooKeeperLogPositionManager(getZkclientx()), 1000L);
        } else if (indexMode.isMeta()) {
            failbackLogPositionManager = new MetaLogPositionManager(this.metaManager);
        } else {
            if (!indexMode.isMemoryMetaFailback()) {
                throw new CanalException("unsupport indexMode for " + indexMode);
            }
            failbackLogPositionManager = new FailbackLogPositionManager(new MemoryLogPositionManager(), new MetaLogPositionManager(this.metaManager));
        }
        logger.info("init logPositionManager end! \n\t load CanalLogPositionManager:{}", failbackLogPositionManager.getClass().getName());
        return failbackLogPositionManager;
    }

    protected void startEventParserInternal(CanalEventParser canalEventParser, boolean z) {
        if (canalEventParser instanceof AbstractEventParser) {
            ((AbstractEventParser) canalEventParser).setAlarmHandler(getAlarmHandler());
        }
        super.startEventParserInternal(canalEventParser, z);
    }

    private int getGroupSize() {
        List<List<CanalParameter.DataSourcing>> groupDbAddresses = this.parameters.getGroupDbAddresses();
        if (CollectionUtils.isEmpty(groupDbAddresses)) {
            return 1;
        }
        return groupDbAddresses.get(0).size();
    }

    private synchronized ZkClientx getZkclientx() {
        ArrayList arrayList = new ArrayList(this.parameters.getZkClusters());
        Collections.sort(arrayList);
        return ZkClientx.getZkClient(StringUtils.join(arrayList, ";"));
    }

    public void setAlarmHandler(CanalAlarmHandler canalAlarmHandler) {
        this.alarmHandler = canalAlarmHandler;
    }
}
