package com.alibaba.otter.canal.connector.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.otter.canal.common.utils.PropertiesUtils;
import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
import com.alibaba.otter.canal.connector.core.spi.SPI;
import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
import com.alibaba.otter.canal.connector.core.util.MessageUtil;
import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQConstants;
import com.alibaba.otter.canal.connector.rabbitmq.producer.AliyunCredentialsProvider;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.common.collect.Lists;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SPI(RabbitMQConstants.ROOT)
/* loaded from: input_file:com/alibaba/otter/canal/connector/rabbitmq/consumer/CanalRabbitMQConsumer.class */
public class CanalRabbitMQConsumer implements CanalMsgConsumer {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CanalRabbitMQConsumer.class);
    private String nameServer;
    private String vhost;
    private String queueName;
    private String accessKey;
    private String secretKey;
    private Long resourceOwnerId;
    private String username;
    private String password;
    private boolean flatMessage;
    private Connection connect;
    private Channel channel;
    private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;
    private long batchProcessTimeout = 60000;
    private volatile ConsumerBatchMessage<CommonMessage> lastGetBatchMessage = null;

    public void init(Properties properties, String str, String str2) {
        this.nameServer = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_HOST);
        this.vhost = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_VIRTUAL_HOST);
        this.queueName = str;
        this.accessKey = PropertiesUtils.getProperty(properties, "canal.aliyun.accessKey");
        this.secretKey = PropertiesUtils.getProperty(properties, "canal.aliyun.secretKey");
        this.username = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_USERNAME);
        this.password = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_PASSWORD);
        Long l = (Long) properties.get(RabbitMQConstants.RABBITMQ_RESOURCE_OWNERID);
        if (l != null) {
            this.resourceOwnerId = l;
        }
        this.flatMessage = ((Boolean) properties.get("canal.mq.flatMessage")).booleanValue();
        this.messageBlockingQueue = new LinkedBlockingQueue(1024);
    }

    public void connect() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        if (this.accessKey.length() <= 0 || this.secretKey.length() <= 0) {
            connectionFactory.setUsername(this.username);
            connectionFactory.setPassword(this.password);
        } else {
            connectionFactory.setCredentialsProvider(new AliyunCredentialsProvider(this.accessKey, this.secretKey, this.resourceOwnerId.longValue()));
        }
        if (this.nameServer == null || !this.nameServer.contains(":")) {
            connectionFactory.setHost(this.nameServer);
        } else {
            String[] split = this.nameServer.split(":");
            connectionFactory.setHost(split[0]);
            connectionFactory.setPort(Integer.parseInt(split[1]));
        }
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(5000);
        connectionFactory.setVirtualHost(this.vhost);
        try {
            this.connect = connectionFactory.newConnection();
            this.channel = this.connect.createChannel();
            if (this.connect == null) {
                connect();
            }
            try {
                this.channel.basicConsume(this.queueName, false, (Consumer) new DefaultConsumer(this.channel) { // from class: com.alibaba.otter.canal.connector.rabbitmq.consumer.CanalRabbitMQConsumer.1
                    @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
                    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                        if (bArr != null) {
                            CanalRabbitMQConsumer.this.channel.basicAck(envelope.getDeliveryTag(), CanalRabbitMQConsumer.this.process(bArr));
                        }
                    }
                });
            } catch (IOException e) {
                throw new CanalClientException("error", e);
            }
        } catch (IOException | TimeoutException e2) {
            throw new CanalClientException("Start RabbitMQ producer error", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean process(byte[] bArr) {
        if (logger.isDebugEnabled()) {
            logger.debug("Get Message: {}", new String(bArr));
        }
        ArrayList newArrayList = Lists.newArrayList();
        if (this.flatMessage) {
            newArrayList.add((CommonMessage) JSON.parseObject(bArr, CommonMessage.class, new Feature[0]));
        } else {
            newArrayList.addAll(MessageUtil.convert(CanalMessageSerializerUtil.deserializer(bArr)));
        }
        ConsumerBatchMessage<CommonMessage> consumerBatchMessage = new ConsumerBatchMessage<>(newArrayList);
        try {
            this.messageBlockingQueue.put(consumerBatchMessage);
            try {
                return consumerBatchMessage.waitFinish(this.batchProcessTimeout) && consumerBatchMessage.isSuccess();
            } catch (InterruptedException e) {
                logger.error("Interrupted when waiting messages to be finished.", (Throwable) e);
                throw new RuntimeException(e);
            }
        } catch (InterruptedException e2) {
            logger.error("Put message to queue error", (Throwable) e2);
            throw new RuntimeException(e2);
        }
    }

    public List<CommonMessage> getMessage(Long l, TimeUnit timeUnit) {
        try {
            if (this.lastGetBatchMessage != null) {
                throw new CanalClientException("mq get/ack not support concurrent & async ack");
            }
            ConsumerBatchMessage<CommonMessage> poll = this.messageBlockingQueue.poll(l.longValue(), timeUnit);
            if (poll == null) {
                return null;
            }
            this.lastGetBatchMessage = poll;
            return poll.getData();
        } catch (InterruptedException e) {
            logger.warn("Get message timeout", (Throwable) e);
            throw new CanalClientException("Failed to fetch the data after: " + l);
        }
    }

    public void rollback() {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }

    public void ack() {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.ack();
            }
        } catch (Throwable th) {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }

    public void disconnect() {
        if (this.channel != null) {
            try {
                this.channel.close();
            } catch (IOException | TimeoutException e) {
                throw new CanalClientException("stop channel error", e);
            }
        }
        if (this.connect != null) {
            try {
                this.connect.close();
            } catch (IOException e2) {
                throw new CanalClientException("stop connect error", e2);
            }
        }
    }
}
