/*
 * Decompiled with CFR 0.152.
 */
package com.icthh.xm.commons.topic.config;

import com.icthh.xm.commons.logging.trace.TraceWrapper;
import com.icthh.xm.commons.logging.util.MdcUtils;
import com.icthh.xm.commons.topic.domain.TopicConfig;
import com.icthh.xm.commons.topic.message.MessageHandler;
import com.icthh.xm.commons.topic.util.MessageRetryDetailsUtils;
import java.math.BigInteger;
import java.util.Map;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

public class MessageListener
implements AcknowledgingMessageListener<String, String> {
    private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
    private final TopicConfig topicConfig;
    private final MessageHandler messageHandler;
    private final String tenantKey;
    private final TraceWrapper traceWrapper;

    public MessageListener(TopicConfig topicConfig, MessageHandler messageHandler, String tenantKey, TraceWrapper traceWrapper) {
        this.topicConfig = topicConfig;
        this.messageHandler = messageHandler;
        this.tenantKey = tenantKey.toUpperCase();
        this.traceWrapper = traceWrapper;
    }

    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        this.traceWrapper.runWithSpan(record, () -> this.processMessage(record, acknowledgment));
    }

    private void processMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        MessageRetryDetailsUtils.MessageRetryDetails retryDetails = MessageRetryDetailsUtils.getUpdatedOrGenerateRetryDetails(record);
        this.putRid(retryDetails.getRetryCount(), retryDetails.getRid());
        StopWatch stopWatch = StopWatch.createStarted();
        String rawBody = (String)record.value();
        log.info("start processing message, size = {}, body = [{}]", (Object)rawBody.length(), (Object)this.formatBody(rawBody));
        try {
            Map<String, byte[]> headers = StreamSupport.stream(record.headers().spliterator(), false).collect(Collectors.toMap(Header::key, Header::value));
            this.messageHandler.onMessage(rawBody, this.tenantKey, this.topicConfig, headers);
            acknowledgment.acknowledge();
            MessageRetryDetailsUtils.delete(record);
            log.info("stop processing message, time = {} ms.", (Object)stopWatch.getTime());
        }
        catch (Exception ex) {
            log.error("error processing message, retry number: {}, time = {} ms.", (Object)retryDetails.getRetryCount(), (Object)stopWatch.getTime());
            throw ex;
        }
        finally {
            MdcUtils.clear();
        }
    }

    private void putRid(BigInteger retryCount, String rid) {
        MdcUtils.putRid((String)new StringJoiner(":").add(this.tenantKey).add(this.topicConfig.getTopicName()).add(rid).add(retryCount.toString()).toString());
    }

    private String formatBody(String rawBody) {
        return this.topicConfig.getLogBody() != false ? rawBody : "***";
    }
}

