package com.alibaba.otter.canal.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.MQMessageUtils;
import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.spi.CanalMQProducer;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/otter/canal/kafka/CanalKafkaProducer.class */
public class CanalKafkaProducer implements CanalMQProducer {
    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
    private Producer<String, Message> producer;
    private Producer<String, String> producer2;
    private MQProperties kafkaProperties;

    @Override // com.alibaba.otter.canal.spi.CanalMQProducer
    public void init(MQProperties mQProperties) {
        this.kafkaProperties = mQProperties;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", mQProperties.getServers());
        properties.put("acks", mQProperties.getAcks());
        properties.put("compression.type", mQProperties.getCompressionType());
        properties.put("batch.size", Integer.valueOf(mQProperties.getBatchSize()));
        properties.put("linger.ms", Integer.valueOf(mQProperties.getLingerMs()));
        properties.put("max.request.size", Integer.valueOf(mQProperties.getMaxRequestSize()));
        properties.put("buffer.memory", Long.valueOf(mQProperties.getBufferMemory()));
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("max.in.flight.requests.per.connection", 1);
        if (!mQProperties.getProperties().isEmpty()) {
            properties.putAll(mQProperties.getProperties());
        }
        properties.put("retries", Integer.valueOf(mQProperties.getRetries()));
        if (mQProperties.isKerberosEnable()) {
            File file = new File(mQProperties.getKerberosKrb5FilePath());
            File file2 = new File(mQProperties.getKerberosJaasFilePath());
            if (!file.exists() || !file2.exists()) {
                logger.error("ERROR # The kafka kerberos configuration file does not exist! please check it");
                throw new RuntimeException("ERROR # The kafka kerberos configuration file does not exist! please check it");
            }
            System.setProperty("java.security.krb5.conf", file.getAbsolutePath());
            System.setProperty("java.security.auth.login.config", file2.getAbsolutePath());
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            properties.put("security.protocol", "SASL_PLAINTEXT");
            properties.put("sasl.kerberos.service.name", "kafka");
        }
        if (mQProperties.getFlatMessage()) {
            properties.put("value.serializer", StringSerializer.class.getName());
            this.producer2 = new KafkaProducer(properties);
        } else {
            properties.put("value.serializer", MessageSerializer.class.getName());
            this.producer = new KafkaProducer(properties);
        }
    }

    @Override // com.alibaba.otter.canal.spi.CanalMQProducer
    public void stop() {
        try {
            try {
                logger.info("## stop the kafka producer");
                if (this.producer != null) {
                    this.producer.close();
                }
                if (this.producer2 != null) {
                    this.producer2.close();
                }
                logger.info("## kafka producer is down.");
            } catch (Throwable th) {
                logger.warn("##something goes wrong when stopping kafka producer:", th);
                logger.info("## kafka producer is down.");
            }
        } catch (Throwable th2) {
            logger.info("## kafka producer is down.");
            throw th2;
        }
    }

    @Override // com.alibaba.otter.canal.spi.CanalMQProducer
    public void send(MQProperties.CanalDestination canalDestination, Message message, CanalMQProducer.Callback callback) {
        try {
            if (StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
                send(canalDestination, canalDestination.getTopic(), message);
            } else {
                for (Map.Entry<String, Message> entry : MQMessageUtils.messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic()).entrySet()) {
                    String key = entry.getKey();
                    Message value = entry.getValue();
                    if (logger.isDebugEnabled()) {
                        logger.debug("## Send message to kafka topic: " + key);
                    }
                    send(canalDestination, key, value);
                }
            }
            callback.commit();
        } catch (Throwable th) {
            logger.error(th.getMessage(), th);
            callback.rollback();
        }
    }

    private void send(MQProperties.CanalDestination canalDestination, String str, Message message) throws Exception {
        if (!this.kafkaProperties.getFlatMessage()) {
            ArrayList arrayList = new ArrayList();
            if (canalDestination.getPartitionHash() == null || canalDestination.getPartitionHash().isEmpty()) {
                arrayList.add(new ProducerRecord(str, Integer.valueOf(canalDestination.getPartition() != null ? canalDestination.getPartition().intValue() : 0), (Object) null, message));
            } else {
                Message[] messagePartition = MQMessageUtils.messagePartition(message, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash());
                int length = messagePartition.length;
                for (int i = 0; i < length; i++) {
                    Message message2 = messagePartition[i];
                    if (message2 != null) {
                        arrayList.add(new ProducerRecord(str, Integer.valueOf(i), (Object) null, message2));
                    }
                }
            }
            produce(str, arrayList, false);
            return;
        }
        List<FlatMessage> messageConverter = MQMessageUtils.messageConverter(message);
        ArrayList arrayList2 = new ArrayList();
        if (messageConverter != null) {
            for (FlatMessage flatMessage : messageConverter) {
                if (canalDestination.getPartitionHash() == null || canalDestination.getPartitionHash().isEmpty()) {
                    arrayList2.add(new ProducerRecord(str, Integer.valueOf(canalDestination.getPartition() != null ? canalDestination.getPartition().intValue() : 0), (Object) null, JSON.toJSONString(flatMessage, new SerializerFeature[]{SerializerFeature.WriteMapNullValue})));
                } else {
                    FlatMessage[] messagePartition2 = MQMessageUtils.messagePartition(flatMessage, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash());
                    int length2 = messagePartition2.length;
                    for (int i2 = 0; i2 < length2; i2++) {
                        FlatMessage flatMessage2 = messagePartition2[i2];
                        if (flatMessage2 != null) {
                            arrayList2.add(new ProducerRecord(str, Integer.valueOf(i2), (Object) null, JSON.toJSONString(flatMessage2, new SerializerFeature[]{SerializerFeature.WriteMapNullValue})));
                        }
                    }
                }
                produce(str, arrayList2, true);
                arrayList2.clear();
            }
        }
    }

    private void produce(String str, List<ProducerRecord> list, boolean z) {
        Producer<String, String> producer = z ? this.producer2 : this.producer;
        ArrayList arrayList = new ArrayList();
        try {
            Iterator<ProducerRecord> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(producer.send(it.next()));
            }
            if (logger.isDebugEnabled()) {
                Iterator<ProducerRecord> it2 = list.iterator();
                while (it2.hasNext()) {
                    logger.debug("Send  message to kafka topic: [{}], packet: {}", str, it2.next().toString());
                }
            }
            producer.flush();
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                try {
                    ((Future) it3.next()).get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
        } catch (Throwable th) {
            if (logger.isDebugEnabled()) {
                Iterator<ProducerRecord> it4 = list.iterator();
                while (it4.hasNext()) {
                    logger.debug("Send  message to kafka topic: [{}], packet: {}", str, it4.next().toString());
                }
            }
            producer.flush();
            Iterator it5 = arrayList.iterator();
            while (it5.hasNext()) {
                try {
                    ((Future) it5.next()).get();
                } catch (InterruptedException | ExecutionException e2) {
                    throw new RuntimeException(e2);
                }
            }
            throw th;
        }
    }
}
