/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.support.converter;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Bytes;
import org.jspecify.annotations.Nullable;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.log.LogMessage;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;

public class BatchMessagingMessageConverter
implements BatchMessageConverter {
    protected final LogAccessor logger = new LogAccessor(this.getClass());
    private final @Nullable RecordMessageConverter recordConverter;
    private boolean generateMessageId = false;
    private boolean generateTimestamp = false;
    private @Nullable KafkaHeaderMapper headerMapper;
    private boolean rawRecordHeader;

    public BatchMessagingMessageConverter() {
        this(null);
    }

    public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordConverter) {
        this.recordConverter = recordConverter;
        if (JacksonPresent.isJackson3Present()) {
            this.headerMapper = new JsonKafkaHeaderMapper();
        } else if (JacksonPresent.isJackson2Present()) {
            this.headerMapper = new DefaultKafkaHeaderMapper();
        }
    }

    public void setGenerateMessageId(boolean generateMessageId) {
        this.generateMessageId = generateMessageId;
    }

    public void setGenerateTimestamp(boolean generateTimestamp) {
        this.generateTimestamp = generateTimestamp;
    }

    public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
        this.headerMapper = headerMapper;
    }

    @Override
    public @Nullable RecordMessageConverter getRecordMessageConverter() {
        return this.recordConverter;
    }

    public void setRawRecordHeader(boolean rawRecordHeader) {
        this.rawRecordHeader = rawRecordHeader;
    }

    @Override
    public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer, Type type) {
        KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp);
        Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
        ArrayList<Object> payloads = new ArrayList<Object>();
        ArrayList<Object> keys = new ArrayList<Object>();
        ArrayList<String> topics = new ArrayList<String>();
        ArrayList<Integer> partitions = new ArrayList<Integer>();
        ArrayList<Long> offsets = new ArrayList<Long>();
        ArrayList<String> timestampTypes = new ArrayList<String>();
        ArrayList<Long> timestamps = new ArrayList<Long>();
        ArrayList<Map<String, Object>> convertedHeaders = new ArrayList<Map<String, Object>>();
        ArrayList<Headers> natives = new ArrayList<Headers>();
        ArrayList raws = new ArrayList();
        ArrayList<ConversionException> conversionFailures = new ArrayList<ConversionException>();
        this.addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures);
        this.commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes, timestamps);
        String listenerInfo = null;
        for (ConsumerRecord<?, ?> record : records) {
            this.addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures);
            Headers recordHeaders = record.headers();
            if (this.headerMapper != null && recordHeaders != null) {
                Map<String, Object> converted = this.convertHeaders(recordHeaders, convertedHeaders);
                Object obj = converted.get("kafka_listenerInfo");
                if (obj instanceof String) {
                    String info;
                    listenerInfo = info = (String)obj;
                }
            } else {
                natives.add(recordHeaders);
            }
            if (!this.rawRecordHeader) continue;
            raws.add(record);
        }
        if (this.headerMapper == null && !natives.isEmpty()) {
            this.logger.debug(() -> "No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders");
        }
        if (listenerInfo != null) {
            rawHeaders.put("kafka_listenerInfo", listenerInfo);
        }
        return MessageBuilder.createMessage(payloads, (MessageHeaders)kafkaMessageHeaders);
    }

    private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders, List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {
        if (this.headerMapper != null) {
            rawHeaders.put("kafka_batchConvertedHeaders", convertedHeaders);
        } else {
            rawHeaders.put("kafka_nativeHeaders", natives);
        }
        if (this.rawRecordHeader) {
            rawHeaders.put("kafka_data", raws);
        }
        rawHeaders.put("kafka_conversionFailures", conversionFailures);
    }

    private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object> payloads, List<Object> keys, List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes, List<Long> timestamps, List<ConversionException> conversionFailures) {
        payloads.add(this.obtainPayload(type, record, conversionFailures));
        keys.add(record.key());
        topics.add(record.topic());
        partitions.add(record.partition());
        offsets.add(record.offset());
        timestamps.add(record.timestamp());
        TimestampType timestampType = record.timestampType();
        if (timestampType != null) {
            timestampTypes.add(timestampType.name());
        }
    }

    private @Nullable Object obtainPayload(Type type, ConsumerRecord<?, ?> record, List<ConversionException> conversionFailures) {
        return this.recordConverter == null || !this.containerType(type) ? this.extractAndConvertValue(record, type) : this.convert(record, type, conversionFailures);
    }

    private Map<String, Object> convertHeaders(Headers headers, List<Map<String, Object>> convertedHeaders) {
        HashMap<String, Object> converted = new HashMap<String, Object>();
        if (this.headerMapper != null) {
            this.headerMapper.toHeaders(headers, converted);
        }
        convertedHeaders.add(converted);
        return converted;
    }

    @Override
    public List<ProducerRecord<?, ?>> fromMessage(Message<?> message, String defaultTopic) {
        throw new UnsupportedOperationException();
    }

    protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
        return record.value() == null ? KafkaNull.INSTANCE : record.value();
    }

    protected @Nullable Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
        try {
            if (this.recordConverter != null) {
                Object payload = this.recordConverter.toMessage(record, null, null, ((ParameterizedType)type).getActualTypeArguments()[0]).getPayload();
                conversionFailures.add(null);
                return payload;
            }
            return null;
        }
        catch (ConversionException ex) {
            byte[] original = null;
            Object object = record.value();
            if (object instanceof byte[]) {
                byte[] bytes;
                original = bytes = (byte[])object;
            } else {
                object = record.value();
                if (object instanceof Bytes) {
                    Bytes bytes = (Bytes)object;
                    original = bytes.get();
                } else {
                    object = record.value();
                    if (object instanceof String) {
                        String string = (String)object;
                        original = string.getBytes(StandardCharsets.UTF_8);
                    }
                }
            }
            if (original != null) {
                SerializationUtils.deserializationException(record.headers(), original, (Exception)((Object)ex), false);
                conversionFailures.add(ex);
                this.logger.warn((Throwable)((Object)ex), (CharSequence)LogMessage.format((String)"Could not convert message for topic=%s, partition=%d, offset=%d", (Object)record.topic(), (Object)record.partition(), (Object)record.offset()));
                return null;
            }
            throw new ConversionException("The batch converter can only report conversion failures to the listener if the record.value() is byte[], Bytes, or String", (Throwable)((Object)ex));
        }
    }

    private boolean containerType(Type type) {
        ParameterizedType parameterizedType;
        return type instanceof ParameterizedType && (parameterizedType = (ParameterizedType)type).getActualTypeArguments().length == 1;
    }
}

