package org.apache.doris.load.sync.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.doris.analysis.BinlogDesc;
import org.apache.doris.analysis.ChannelDescription;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.sync.DataSyncJobType;
import org.apache.doris.load.sync.SyncFailMsg;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.mysql.MysqlServerStatusFlag;
import org.apache.doris.persist.Storage;
import org.apache.doris.qe.StmtExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/sync/canal/CanalSyncJob.class */
public class CanalSyncJob extends SyncJob {
    private static final Logger LOG = LogManager.getLogger(CanalSyncJob.class);
    protected static final String CANAL_SERVER_IP = "canal.server.ip";
    protected static final String CANAL_SERVER_PORT = "canal.server.port";
    protected static final String CANAL_DESTINATION = "canal.destination";
    protected static final String CANAL_USERNAME = "canal.username";
    protected static final String CANAL_PASSWORD = "canal.password";
    protected static final String CANAL_BATCH_SIZE = "canal.batchSize";
    protected static final String CANAL_DEBUG = "canal.debug";

    @SerializedName("remote")
    private final CanalDestination remote;

    @SerializedName("username")
    private String username;

    @SerializedName("password")
    private String password;

    @SerializedName("batchSize")
    private int batchSize;

    @SerializedName("debug")
    private boolean debug;
    private transient SyncCanalClient client;

    public CanalSyncJob(long j, String str, long j2) {
        super(j, str, j2);
        this.batchSize = MysqlServerStatusFlag.SERVER_STATUS_IN_TRANS_READONLY;
        this.debug = false;
        this.dataSyncJobType = DataSyncJobType.CANAL;
        this.remote = new CanalDestination("", 0, "");
    }

    private void init() throws DdlException {
        CanalConnector newSingleConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(this.remote.getIp(), this.remote.getPort()), this.remote.getDestination(), this.username, this.password);
        initChannels();
        this.client = new SyncCanalClient(this, this.remote.getDestination(), newSingleConnector, this.batchSize, this.debug);
        this.client.registerChannels(this.channels);
    }

    public void initChannels() throws DdlException {
        if (this.channels == null) {
            this.channels = Lists.newArrayList();
        }
        Database dbOrDdlException = Env.getCurrentInternalCatalog().getDbOrDdlException(this.dbId);
        dbOrDdlException.writeLock();
        try {
            for (ChannelDescription channelDescription : this.channelDescriptions) {
                String targetTable = channelDescription.getTargetTable();
                OlapTable olapTableOrDdlException = dbOrDdlException.getOlapTableOrDdlException(targetTable);
                if (olapTableOrDdlException.getKeysType() != KeysType.UNIQUE_KEYS || !olapTableOrDdlException.hasDeleteSign()) {
                    throw new DdlException("Table[" + targetTable + "] don't support batch delete.");
                }
                List<String> colNames = channelDescription.getColNames();
                if (colNames == null) {
                    colNames = Lists.newArrayList();
                    Iterator<Column> it = olapTableOrDdlException.getBaseSchema(false).iterator();
                    while (it.hasNext()) {
                        colNames.add(it.next().getName());
                    }
                }
                CanalSyncChannel canalSyncChannel = new CanalSyncChannel(channelDescription.getChannelId(), this, dbOrDdlException, olapTableOrDdlException, colNames, channelDescription.getSrcDatabase(), channelDescription.getSrcTableName());
                if (channelDescription.getPartitionNames() != null) {
                    canalSyncChannel.setPartitions(channelDescription.getPartitionNames());
                }
                this.channels.add(canalSyncChannel);
            }
        } finally {
            dbOrDdlException.writeUnlock();
        }
    }

    @Override // org.apache.doris.load.sync.SyncJob
    public void checkAndSetBinlogInfo(BinlogDesc binlogDesc) throws DdlException {
        super.checkAndSetBinlogInfo(binlogDesc);
        Map<String, String> properties = binlogDesc.getProperties();
        this.remote.parse(properties);
        if (!properties.containsKey(CANAL_USERNAME)) {
            throw new DdlException("Missing canal.username property in binlog properties");
        }
        this.username = properties.get(CANAL_USERNAME);
        if (!properties.containsKey(CANAL_PASSWORD)) {
            throw new DdlException("Missing canal.password property in binlog properties");
        }
        this.password = properties.get(CANAL_PASSWORD);
        if (properties.containsKey(CANAL_BATCH_SIZE)) {
            try {
                this.batchSize = Integer.parseInt(properties.get(CANAL_BATCH_SIZE));
            } catch (NumberFormatException e) {
                throw new DdlException("Property canal.batchSize is not int");
            }
        }
        if (properties.containsKey(CANAL_DEBUG)) {
            this.debug = Boolean.parseBoolean(properties.get(CANAL_DEBUG));
        }
    }

    public boolean isInit() {
        return (this.client == null || this.channels == null) ? false : true;
    }

    @Override // org.apache.doris.load.sync.SyncJob
    public boolean isNeedReschedule() {
        return this.jobState == SyncJob.JobState.RUNNING && !isInit();
    }

    @Override // org.apache.doris.load.sync.SyncJob
    public void execute() throws UserException {
        LOG.info(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(this.id)).add("remote ip", this.remote.getIp()).add("remote port", this.remote.getPort()).add("msg", "Try to start canal client.").add("debug", this.debug).build());
        if (!isInit()) {
            init();
        }
        unprotectedStartClient();
    }

    @Override // org.apache.doris.load.sync.SyncJob
    public void cancel(SyncFailMsg.MsgType msgType, String str) {
        try {
            switch (msgType) {
                case SUBMIT_FAIL:
                case RUN_FAIL:
                case UNKNOWN:
                    unprotectedStopClient(SyncJob.JobState.PAUSED);
                    break;
                case SCHEDULE_FAIL:
                case USER_CANCEL:
                    unprotectedStopClient(SyncJob.JobState.CANCELLED);
                    break;
                default:
                    Preconditions.checkState(false, "unknown msg type: " + msgType.name());
                    break;
            }
            this.failMsg = new SyncFailMsg(msgType, str);
            LOG.info(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(this.id)).add("MsgType", msgType.name()).add("msg", "Cancel canal sync job.").add("errMsg", str).build());
        } catch (UserException e) {
            LOG.warn(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(this.id)).add("msg", "Failed to cancel canal sync job.").build(), e);
        }
    }

    @Override // org.apache.doris.load.sync.SyncJob
    public void pause() throws UserException {
        unprotectedStopClient(SyncJob.JobState.PAUSED);
        LOG.info(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(this.id)).add("remote ip", this.remote.getIp()).add("remote port", this.remote.getPort()).add("msg", "Pause canal sync job.").add("debug", this.debug).build());
    }

    @Override // org.apache.doris.load.sync.SyncJob
    public void resume() throws UserException {
        updateState(SyncJob.JobState.PENDING, false);
        LOG.info(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(this.id)).add("remote ip", this.remote.getIp()).add("remote port", this.remote.getPort()).add("msg", "Resume canal sync job.").add("debug", this.debug).build());
    }

    public void unprotectedStartClient() throws UserException {
        this.client.startup();
        updateState(SyncJob.JobState.RUNNING, false);
        LOG.info(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(this.id)).add(Storage.NODE_NAME, this.jobName).add("msg", "Client has been started.").build());
    }

    public void unprotectedStopClient(SyncJob.JobState jobState) throws UserException {
        if (jobState == SyncJob.JobState.CANCELLED || jobState == SyncJob.JobState.PAUSED) {
            if (this.client != null) {
                this.client.shutdown(true);
            }
            updateState(jobState, false);
            LOG.info(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(this.id)).add(Storage.NODE_NAME, this.jobName).add("msg", "Client has been stopped.").build());
        }
    }

    @Override // org.apache.doris.load.sync.SyncJob
    public void replayUpdateSyncJobState(SyncJob.SyncJobUpdateStateInfo syncJobUpdateStateInfo) {
        this.lastStartTimeMs = syncJobUpdateStateInfo.getLastStartTimeMs();
        this.lastStopTimeMs = syncJobUpdateStateInfo.getLastStopTimeMs();
        this.finishTimeMs = syncJobUpdateStateInfo.getFinishTimeMs();
        this.failMsg = syncJobUpdateStateInfo.getFailMsg();
        try {
            SyncJob.JobState jobState = syncJobUpdateStateInfo.getJobState();
            switch (jobState) {
                case PENDING:
                    updateState(SyncJob.JobState.PENDING, true);
                    break;
                case RUNNING:
                    updateState(SyncJob.JobState.RUNNING, true);
                    break;
                case PAUSED:
                    updateState(SyncJob.JobState.PAUSED, true);
                    break;
                case CANCELLED:
                    updateState(SyncJob.JobState.CANCELLED, true);
                    break;
                default:
                    throw new UserException("job state is invalid: " + jobState);
            }
        } catch (UserException e) {
            LOG.error(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(this.id)).add("desired_state", syncJobUpdateStateInfo.getJobState()).add("msg", "replay update state error.").add("reason", e.getMessage()).build(), e);
        }
        LOG.info(new LogBuilder(LogKey.SYNC_JOB, Long.valueOf(syncJobUpdateStateInfo.getId())).add("desired_state:", syncJobUpdateStateInfo.getJobState()).add("msg", "replay update sync job state").build());
    }

    @Override // org.apache.doris.load.sync.SyncJob
    public String getStatus() {
        return this.client != null ? this.client.getPositionInfo() : StmtExecutor.NULL_VALUE_FOR_LOAD;
    }

    @Override // org.apache.doris.load.sync.SyncJob
    public String getJobConfig() {
        StringBuilder sb = new StringBuilder();
        sb.append("address:").append(this.remote.getIp()).append(ClusterNamespace.CLUSTER_DELIMITER).append(this.remote.getPort()).append(",").append("destination:").append(this.remote.getDestination()).append(",").append("batchSize:").append(this.batchSize);
        return sb.toString();
    }

    public CanalDestination getRemote() {
        return this.remote;
    }

    public String toString() {
        return "SyncJob [jobId=" + this.id + ", jobName=" + this.jobName + ", dbId=" + this.dbId + ", state=" + this.jobState + ", createTimeMs=" + TimeUtils.longToTimeString(this.createTimeMs) + ", lastStartTimeMs=" + TimeUtils.longToTimeString(this.lastStartTimeMs) + ", lastStopTimeMs=" + TimeUtils.longToTimeString(this.lastStopTimeMs) + ", finishTimeMs=" + TimeUtils.longToTimeString(this.finishTimeMs) + "]";
    }
}
