/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.listener.adapter;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.jspecify.annotations.Nullable;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.pulsar.listener.Acknowledgement;
import org.springframework.pulsar.listener.PulsarBatchAcknowledgingMessageListener;
import org.springframework.pulsar.listener.adapter.AbstractPulsarMessageToSpringMessageAdapter;

public class PulsarBatchMessagesToSpringMessageListenerAdapter<V>
extends AbstractPulsarMessageToSpringMessageAdapter<V>
implements PulsarBatchAcknowledgingMessageListener<V> {
    public PulsarBatchMessagesToSpringMessageListenerAdapter(Object bean, Method method) {
        super(bean, method);
    }

    @Override
    public void received(Consumer<V> consumer, final List<Message<V>> msg, @Nullable Acknowledgement acknowledgement) {
        org.springframework.messaging.Message message = null;
        ArrayList theRecord = null;
        if (this.isPulsarMessageList() && !this.isHeaderFound()) {
            theRecord = msg;
        } else if (this.isPulsarMessageList() && this.isHeaderFound()) {
            List<org.springframework.messaging.Message<?>> messages = this.toSpringMessages(consumer, msg);
            Map<String, List<Object>> aggregatedHeaders = this.withAggregatedHeaders(messages);
            ArrayList<Message<V>> list1 = new ArrayList<Message<V>>(msg);
            message = MessageBuilder.withPayload(list1).copyHeaders(aggregatedHeaders).build();
        } else if (this.isMessageList() && !this.isHeaderFound()) {
            List<org.springframework.messaging.Message<?>> messages = this.toSpringMessages(consumer, msg);
            message = MessageBuilder.withPayload(messages).build();
        } else if (this.isMessageList() && this.isHeaderFound()) {
            List<org.springframework.messaging.Message<?>> messages = this.toSpringMessages(consumer, msg);
            Map<String, List<Object>> aggregatedHeaders = this.withAggregatedHeaders(messages);
            message = MessageBuilder.withPayload(messages).copyHeaders(aggregatedHeaders).build();
        } else if (this.isSimpleExtraction()) {
            ArrayList list = new ArrayList(msg.size());
            msg.stream().iterator().forEachRemaining(vMessage -> list.add(vMessage.getValue()));
            theRecord = list;
        } else if (this.isHeaderFound()) {
            List<org.springframework.messaging.Message<?>> messages = this.toSpringMessages(consumer, msg);
            Map<String, List<Object>> aggregatedHeaders = this.withAggregatedHeaders(messages);
            ArrayList list = new ArrayList(msg.size());
            msg.stream().iterator().forEachRemaining(vMessage -> list.add(vMessage.getValue()));
            message = MessageBuilder.withPayload(list).copyHeaders(aggregatedHeaders).build();
        }
        if (this.isConsumerRecords()) {
            theRecord = new Messages<V>(){
                final /* synthetic */ PulsarBatchMessagesToSpringMessageListenerAdapter this$0;
                {
                    this.this$0 = this$0;
                }

                public Iterator<Message<V>> iterator() {
                    return msg.iterator();
                }

                public int size() {
                    return msg.size();
                }
            };
        }
        this.invoke(theRecord, consumer, message, acknowledgement);
    }

    private Map<String, List<Object>> withAggregatedHeaders(List<org.springframework.messaging.Message<?>> messages) {
        HashMap<String, List<Object>> aggregatedHeaders = new HashMap<String, List<Object>>();
        for (org.springframework.messaging.Message<?> message : messages) {
            message.getHeaders().forEach((s, o) -> {
                List objects = aggregatedHeaders.computeIfAbsent((String)s, k -> new ArrayList());
                objects.add(o);
            });
        }
        return aggregatedHeaders;
    }

    private List<org.springframework.messaging.Message<?>> toSpringMessages(Consumer<V> consumer, List<Message<V>> msg) {
        ArrayList messages = new ArrayList(msg.size());
        msg.stream().iterator().forEachRemaining(record -> messages.add(this.toMessagingMessage(record, consumer)));
        return messages;
    }

    protected void invoke(Object records, Consumer<V> consumer, org.springframework.messaging.Message<?> message, Acknowledgement acknowledgement) {
        this.invokeHandler(message, records, consumer, acknowledgement);
    }
}

