package io.eventuate.messaging.kafka.basic.consumer;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/messaging/kafka/basic/consumer/KafkaMessageProcessor.class */
public class KafkaMessageProcessor {
    private String subscriberId;
    private EventuateKafkaConsumerMessageHandler handler;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private OffsetTracker offsetTracker = new OffsetTracker();
    private BlockingQueue<ConsumerRecord<String, byte[]>> processedRecords = new LinkedBlockingQueue();
    private AtomicReference<KafkaMessageProcessorFailedException> failed = new AtomicReference<>();
    private Set<MessageConsumerBacklog> consumerBacklogs = new HashSet();

    public KafkaMessageProcessor(String str, EventuateKafkaConsumerMessageHandler eventuateKafkaConsumerMessageHandler) {
        this.subscriberId = str;
        this.handler = eventuateKafkaConsumerMessageHandler;
    }

    public void process(ConsumerRecord<String, byte[]> consumerRecord) {
        throwFailureException();
        this.offsetTracker.noteUnprocessed(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
        MessageConsumerBacklog apply = this.handler.apply(consumerRecord, (r8, th) -> {
            if (th != null) {
                this.logger.error("Got exception: ", th);
                this.failed.set(new KafkaMessageProcessorFailedException(th));
            } else {
                this.logger.debug("Adding processed record to queue {} {}", this.subscriberId, Long.valueOf(consumerRecord.offset()));
                this.processedRecords.add(consumerRecord);
            }
        });
        if (apply != null) {
            this.consumerBacklogs.add(apply);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void throwFailureException() {
        if (this.failed.get() != null) {
            throw this.failed.get();
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> offsetsToCommit() {
        int i = 0;
        while (true) {
            ConsumerRecord<String, byte[]> poll = this.processedRecords.poll();
            if (poll == null) {
                this.logger.trace("removed {} {} processed records from queue", this.subscriberId, Integer.valueOf(i));
                return this.offsetTracker.offsetsToCommit();
            }
            i++;
            this.offsetTracker.noteProcessed(new TopicPartition(poll.topic(), poll.partition()), poll.offset());
        }
    }

    public void noteOffsetsCommitted(Map<TopicPartition, OffsetAndMetadata> map) {
        this.offsetTracker.noteOffsetsCommitted(map);
    }

    public OffsetTracker getPending() {
        return this.offsetTracker;
    }

    public int backlog() {
        return this.consumerBacklogs.stream().mapToInt((v0) -> {
            return v0.size();
        }).sum();
    }
}
