package com.alibaba.otter.canal.connector.rabbitmq.producer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
import com.alibaba.otter.canal.common.utils.PropertiesUtils;
import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
import com.alibaba.otter.canal.connector.core.producer.MQDestination;
import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
import com.alibaba.otter.canal.connector.core.spi.SPI;
import com.alibaba.otter.canal.connector.core.util.Callback;
import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQConstants;
import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQProducerConfig;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SPI(RabbitMQConstants.ROOT)
/* loaded from: input_file:com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.class */
public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQProducer {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CanalRabbitMQProducer.class);
    private Connection connect;
    private Channel channel;

    public void init(Properties properties) {
        RabbitMQProducerConfig rabbitMQProducerConfig = new RabbitMQProducerConfig();
        this.mqProperties = rabbitMQProducerConfig;
        super.init(properties);
        loadRabbitMQProperties(properties);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        String host = rabbitMQProducerConfig.getHost();
        if (host.contains(":")) {
            String[] split = host.split(":");
            connectionFactory.setHost(split[0]);
            connectionFactory.setPort(Integer.parseInt(split[1]));
        } else {
            connectionFactory.setHost(host);
        }
        if (this.mqProperties.getAliyunAccessKey().length() <= 0 || this.mqProperties.getAliyunSecretKey().length() <= 0 || this.mqProperties.getAliyunUid() <= 0) {
            connectionFactory.setUsername(rabbitMQProducerConfig.getUsername());
            connectionFactory.setPassword(rabbitMQProducerConfig.getPassword());
        } else {
            connectionFactory.setCredentialsProvider(new AliyunCredentialsProvider(this.mqProperties.getAliyunAccessKey(), this.mqProperties.getAliyunSecretKey(), this.mqProperties.getAliyunUid()));
        }
        connectionFactory.setVirtualHost(rabbitMQProducerConfig.getVirtualHost());
        try {
            this.connect = connectionFactory.newConnection();
            this.channel = this.connect.createChannel();
        } catch (IOException | TimeoutException e) {
            throw new CanalException("Start RabbitMQ producer error", e);
        }
    }

    private void loadRabbitMQProperties(Properties properties) {
        RabbitMQProducerConfig rabbitMQProducerConfig = (RabbitMQProducerConfig) this.mqProperties;
        doMoreCompatibleConvert("canal.mq.servers", RabbitMQConstants.RABBITMQ_HOST, properties);
        String property = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_HOST);
        if (!StringUtils.isEmpty(property)) {
            rabbitMQProducerConfig.setHost(property);
        }
        String property2 = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_VIRTUAL_HOST);
        if (!StringUtils.isEmpty(property2)) {
            rabbitMQProducerConfig.setVirtualHost(property2);
        }
        String property3 = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_EXCHANGE);
        if (!StringUtils.isEmpty(property3)) {
            rabbitMQProducerConfig.setExchange(property3);
        }
        String property4 = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_USERNAME);
        if (!StringUtils.isEmpty(property4)) {
            rabbitMQProducerConfig.setUsername(property4);
        }
        String property5 = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_PASSWORD);
        if (StringUtils.isEmpty(property5)) {
            return;
        }
        rabbitMQProducerConfig.setPassword(property5);
    }

    public void send(MQDestination mQDestination, Message message, Callback callback) {
        ExecutorTemplate executorTemplate = new ExecutorTemplate(this.sendExecutor);
        try {
            try {
                if (StringUtils.isEmpty(mQDestination.getDynamicTopic())) {
                    send(mQDestination, mQDestination.getTopic(), message);
                } else {
                    for (Map.Entry entry : MQMessageUtils.messageTopics(message, mQDestination.getTopic(), mQDestination.getDynamicTopic()).entrySet()) {
                        String replace = ((String) entry.getKey()).replace('.', '_');
                        Message message2 = (Message) entry.getValue();
                        executorTemplate.submit(() -> {
                            send(mQDestination, replace, message2);
                        });
                    }
                    executorTemplate.waitForResult();
                }
                callback.commit();
                executorTemplate.clear();
            } catch (Throwable th) {
                logger.error(th.getMessage(), th);
                callback.rollback();
                executorTemplate.clear();
            }
        } catch (Throwable th2) {
            executorTemplate.clear();
            throw th2;
        }
    }

    private void send(MQDestination mQDestination, String str, Message message) {
        if (!this.mqProperties.isFlatMessage()) {
            byte[] serializer = CanalMessageSerializerUtil.serializer(message, this.mqProperties.isFilterTransactionEntry());
            if (logger.isDebugEnabled()) {
                logger.debug("send message:{} to destination:{}", serializer, mQDestination.getCanalDestination());
            }
            sendMessage(str, serializer);
            return;
        }
        Iterator it = MQMessageUtils.messageConverter(MQMessageUtils.buildMessageData(message, this.buildExecutor), message.getId()).iterator();
        while (it.hasNext()) {
            byte[] jSONBytes = JSON.toJSONBytes((FlatMessage) it.next(), new SerializerFeature[]{SerializerFeature.WriteMapNullValue});
            if (logger.isDebugEnabled()) {
                logger.debug("send message:{} to destination:{}", jSONBytes, mQDestination.getCanalDestination());
            }
            sendMessage(str, jSONBytes);
        }
    }

    private void sendMessage(String str, byte[] bArr) {
        try {
            this.channel.basicPublish(((RabbitMQProducerConfig) this.mqProperties).getExchange(), str, null, bArr);
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    public void stop() {
        logger.info("## Stop RabbitMQ producer##");
        try {
            this.connect.close();
            this.channel.close();
            super.stop();
        } catch (AlreadyClosedException e) {
            logger.error("Connection is already closed", (Throwable) e);
        } catch (IOException | TimeoutException e2) {
            throw new CanalException("Stop RabbitMQ producer error", e2);
        }
        super.stop();
    }
}
