/*
 * Decompiled with CFR 0.152.
 */
package com.icthh.xm.commons.topic.config;

import com.icthh.xm.commons.logging.trace.TraceWrapper;
import com.icthh.xm.commons.logging.util.MdcUtils;
import com.icthh.xm.commons.topic.config.MessageListener;
import com.icthh.xm.commons.topic.domain.NotRetryableException;
import com.icthh.xm.commons.topic.domain.TopicConfig;
import com.icthh.xm.commons.topic.message.MessageHandler;
import com.icthh.xm.commons.topic.util.MessageRetryDetailsUtils;
import com.icthh.xm.commons.topic.util.MessageRetryUtils;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

public class MessageListenerContainerBuilder {
    private static final Logger log = LoggerFactory.getLogger(MessageListenerContainerBuilder.class);
    private final KafkaProperties kafkaProperties;
    private final KafkaTemplate<String, String> kafkaTemplate;

    public AbstractMessageListenerContainer build(String tenantKey, TopicConfig topicConfig, MessageHandler messageHandler, TraceWrapper traceWrapper) {
        Map<String, Object> consumerConfig = this.buildConsumerConfig(topicConfig);
        DefaultKafkaConsumerFactory kafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerConfig, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(new String[]{topicConfig.getTopicName()});
        containerProperties.setObservationEnabled(true);
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        if (topicConfig.getConsumeMessagePerSecondLimit() != null && topicConfig.getConsumeMessagePerSecondLimit() > 0) {
            containerProperties.setIdleBetweenPolls((long)Math.divideExact(1000, topicConfig.getConsumeMessagePerSecondLimit()));
        }
        containerProperties.setMessageListener((Object)new MessageListener(topicConfig, messageHandler, tenantKey, traceWrapper));
        ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer((ConsumerFactory)kafkaConsumerFactory, containerProperties);
        container.setCommonErrorHandler(this.buildErrorHandler(tenantKey, topicConfig));
        if (topicConfig.getConcurrency() != null) {
            container.setConcurrency(topicConfig.getConcurrency().intValue());
        }
        return container;
    }

    private CommonErrorHandler buildErrorHandler(String tenantKey, TopicConfig topicConfig) {
        Long backOffPeriod;
        FixedBackOff fixedBackOff = new FixedBackOff();
        Integer retriesCount = topicConfig.getRetriesCount();
        if (retriesCount != null && retriesCount > 0) {
            fixedBackOff.setMaxAttempts((long)retriesCount.intValue());
        }
        if ((backOffPeriod = topicConfig.getBackOffPeriod()) != null) {
            fixedBackOff.setInterval(backOffPeriod.longValue());
        }
        DefaultErrorHandler defaultErrorHandler = StringUtils.isNotEmpty((CharSequence)topicConfig.getDeadLetterQueue()) ? new DefaultErrorHandler(this.getDeadLetterPublishingRecoverer(tenantKey, topicConfig), (BackOff)fixedBackOff) : new DefaultErrorHandler((BackOff)fixedBackOff);
        defaultErrorHandler.addNotRetryableExceptions(new Class[]{NotRetryableException.class});
        defaultErrorHandler.setCommitRecovered(true);
        return defaultErrorHandler;
    }

    private ConsumerRecordRecoverer getDeadLetterPublishingRecoverer(String tenantKey, TopicConfig topicConfig) {
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(this.kafkaTemplate, (r, ex) -> this.recover((ConsumerRecord<?, ?>)r, tenantKey, topicConfig));
        recoverer.addHeadersFunction((record, ex) -> {
            Throwable e = ex;
            if (e instanceof ListenerExecutionFailedException && e.getCause() != null) {
                e = e.getCause();
            }
            RecordHeaders additional = new RecordHeaders();
            additional.add((Header)new RecordHeader("xm_exception_message", e.toString().getBytes(StandardCharsets.UTF_8)));
            additional.add((Header)new RecordHeader("xm_exception_stack_trace", ExceptionUtils.getStackTrace((Throwable)e).getBytes(StandardCharsets.UTF_8)));
            return additional;
        });
        recoverer.addNotRetryableExceptions(new Class[]{NotRetryableException.class});
        return recoverer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TopicPartition recover(ConsumerRecord<?, ?> record, String tenantKey, TopicConfig topicConfig) {
        String rawBody = String.valueOf(record.value());
        String deadLetterQueue = topicConfig.getDeadLetterQueue();
        try {
            MessageRetryUtils.putRid(record, tenantKey, topicConfig.getTopicName());
            log.warn("send message to dead-letter [{}] due to retry count exceeded [{}], total processing time = {} ms, body = [{}]", new Object[]{deadLetterQueue, MessageRetryDetailsUtils.getRetryCounter(record), MessageRetryDetailsUtils.getTotalProcessingTime(record), rawBody});
            TopicPartition topicPartition = new TopicPartition(deadLetterQueue, record.partition());
            return topicPartition;
        }
        finally {
            MdcUtils.clear();
        }
    }

    private Map<String, Object> buildConsumerConfig(TopicConfig topicConfig) {
        Map props = this.kafkaProperties.buildConsumerProperties(null);
        String groupIdFromConf = topicConfig.getGroupId();
        String groupId = StringUtils.isEmpty((CharSequence)groupIdFromConf) ? UUID.randomUUID().toString() : groupIdFromConf;
        props.put("group.id", groupId);
        props.put("enable.auto.commit", false);
        if (StringUtils.isNotBlank((CharSequence)topicConfig.getAutoOffsetReset())) {
            props.put("auto.offset.reset", topicConfig.getAutoOffsetReset());
        }
        if (StringUtils.isNotBlank((CharSequence)topicConfig.getMetadataMaxAge())) {
            props.put("metadata.max.age.ms", topicConfig.getMetadataMaxAge());
        }
        if (StringUtils.isNotBlank((CharSequence)topicConfig.getIsolationLevel())) {
            props.put("isolation.level", topicConfig.getIsolationLevel());
        }
        if (topicConfig.getMaxPollInterval() != null) {
            props.put("max.poll.interval.ms", topicConfig.getMaxPollInterval());
        }
        if (topicConfig.getConsumeMessagePerSecondLimit() != null) {
            props.put("max.poll.records", 1);
        }
        return Collections.unmodifiableMap(props);
    }

    public MessageListenerContainerBuilder(KafkaProperties kafkaProperties, KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaProperties = kafkaProperties;
        this.kafkaTemplate = kafkaTemplate;
    }
}

