package org.springframework.kafka.requestreply;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchConsumerAwareMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.class */
public class AggregatingReplyingKafkaTemplate<K, V, R> extends ReplyingKafkaTemplate<K, V, Collection<ConsumerRecord<K, R>>> implements BatchConsumerAwareMessageListener<K, Collection<ConsumerRecord<K, R>>> {
    public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
    public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
    private static final int DEFAULT_COMMIT_TIMEOUT = 30;
    private final Map<CorrelationKey, Set<RecordHolder<K, R>>> pending;
    private final Map<TopicPartition, Long> offsets;
    private final BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy;
    private Duration commitTimeout;
    private boolean returnPartialOnTimeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate$RecordHolder.class */
    public static final class RecordHolder<K, R> {
        private final ConsumerRecord<K, R> record;

        RecordHolder(ConsumerRecord<K, R> consumerRecord) {
            this.record = consumerRecord;
        }

        ConsumerRecord<K, R> getRecord() {
            return this.record;
        }

        public int hashCode() {
            return (31 * 1) + this.record.topic().hashCode() + this.record.partition() + ((int) this.record.offset());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            RecordHolder recordHolder = (RecordHolder) obj;
            return this.record == null ? recordHolder.record != null ? false : false : this.record.topic().equals(recordHolder.record.topic()) && this.record.partition() == recordHolder.record.partition() && this.record.offset() == recordHolder.record.offset();
        }
    }

    public AggregatingReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, Collection<ConsumerRecord<K, R>>> genericMessageListenerContainer, BiPredicate<List<ConsumerRecord<K, R>>, Boolean> biPredicate) {
        super(producerFactory, genericMessageListenerContainer);
        this.pending = new HashMap();
        this.offsets = new HashMap();
        this.commitTimeout = Duration.ofSeconds(30L);
        Assert.notNull(biPredicate, "'releaseStrategy' cannot be null");
        ContainerProperties.AckMode ackMode = genericMessageListenerContainer.getContainerProperties().getAckMode();
        Assert.isTrue(ackMode.equals(ContainerProperties.AckMode.MANUAL) || ackMode.equals(ContainerProperties.AckMode.MANUAL_IMMEDIATE), "The reply container must have a MANUAL or MANUAL_IMMEDIATE AckMode");
        this.releaseStrategy = biPredicate;
    }

    public void setCommitTimeout(Duration duration) {
        Assert.notNull(duration, "'commitTimeout' cannot be null");
        this.commitTimeout = duration;
    }

    public synchronized void setReturnPartialOnTimeout(boolean z) {
        this.returnPartialOnTimeout = z;
    }

    @Override // org.springframework.kafka.listener.BatchConsumerAwareMessageListener
    public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>> list, Consumer<?, ?> consumer) {
        ArrayList arrayList = new ArrayList();
        list.forEach(consumerRecord -> {
            Header lastHeader = consumerRecord.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
            if (lastHeader == null) {
                this.logger.error(() -> {
                    return "No correlationId found in reply: " + KafkaUtils.format((ConsumerRecord<?, ?>) consumerRecord) + " - to use request/reply semantics, the responding server must return the correlation id  in the 'kafka_correlationId' header";
                });
                return;
            }
            CorrelationKey correlationKey = new CorrelationKey(lastHeader.value());
            synchronized (this) {
                if (isPending(correlationKey)) {
                    List<ConsumerRecord<K, R>> list2 = (List) addToCollection(consumerRecord, correlationKey).stream().map((v0) -> {
                        return v0.getRecord();
                    }).collect(Collectors.toList());
                    if (this.releaseStrategy.test(list2, false)) {
                        ConsumerRecord consumerRecord = new ConsumerRecord(AGGREGATED_RESULTS_TOPIC, 0, 0L, (Object) null, list2);
                        consumerRecord.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, correlationKey.getCorrelationId()));
                        this.pending.remove(correlationKey);
                        checkOffsetsAndCommitIfNecessary(list2, consumer);
                        arrayList.add(consumerRecord);
                    }
                } else {
                    logLateArrival(consumerRecord, correlationKey);
                }
            }
        });
        if (arrayList.size() > 0) {
            super.onMessage((List) arrayList);
        }
    }

    @Override // org.springframework.kafka.requestreply.ReplyingKafkaTemplate
    protected synchronized boolean handleTimeout(CorrelationKey correlationKey, RequestReplyFuture<K, V, Collection<ConsumerRecord<K, R>>> requestReplyFuture) {
        Set<RecordHolder<K, R>> remove = this.pending.remove(correlationKey);
        if (remove == null || !this.returnPartialOnTimeout) {
            return false;
        }
        List<ConsumerRecord<K, R>> list = (List) remove.stream().map((v0) -> {
            return v0.getRecord();
        }).collect(Collectors.toList());
        if (!this.releaseStrategy.test(list, true)) {
            return false;
        }
        requestReplyFuture.set(new ConsumerRecord(PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC, 0, 0L, (Object) null, list));
        return true;
    }

    private void checkOffsetsAndCommitIfNecessary(List<ConsumerRecord<K, R>> list, Consumer<?, ?> consumer) {
        list.forEach(consumerRecord -> {
            this.offsets.compute(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), (topicPartition, l) -> {
                return Long.valueOf(l == null ? consumerRecord.offset() + 1 : Math.max(l.longValue(), consumerRecord.offset() + 1));
            });
        });
        if (!this.pending.isEmpty() || this.offsets.isEmpty()) {
            return;
        }
        consumer.commitSync((Map) this.offsets.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new OffsetAndMetadata(((Long) entry.getValue()).longValue());
        })), this.commitTimeout);
        this.offsets.clear();
    }

    private Set<RecordHolder<K, R>> addToCollection(ConsumerRecord consumerRecord, CorrelationKey correlationKey) {
        Set<RecordHolder<K, R>> computeIfAbsent = this.pending.computeIfAbsent(correlationKey, correlationKey2 -> {
            return new LinkedHashSet();
        });
        computeIfAbsent.add(new RecordHolder<>(consumerRecord));
        return computeIfAbsent;
    }

    @Override // org.springframework.kafka.listener.GenericMessageListener
    public /* bridge */ /* synthetic */ void onMessage(Object obj, Consumer consumer) {
        onMessage((List) obj, (Consumer<?, ?>) consumer);
    }
}
