package io.eventuate.messaging.kafka.basic.consumer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/messaging/kafka/basic/consumer/EventuateKafkaConsumer.class */
public class EventuateKafkaConsumer {
    private static Logger logger = LoggerFactory.getLogger(EventuateKafkaConsumer.class);
    private final String subscriberId;
    private final EventuateKafkaConsumerMessageHandler handler;
    private final List<String> topics;
    private final BackPressureConfig backPressureConfig;
    private final long pollTimeout;
    private Properties consumerProperties;
    private AtomicBoolean stopFlag = new AtomicBoolean(false);
    private volatile EventuateKafkaConsumerState state = EventuateKafkaConsumerState.CREATED;
    private volatile boolean closeConsumerOnStop = true;
    private Optional<ConsumerCallbacks> consumerCallbacks = Optional.empty();

    public EventuateKafkaConsumer(String str, EventuateKafkaConsumerMessageHandler eventuateKafkaConsumerMessageHandler, List<String> list, String str2, EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties) {
        this.subscriberId = str;
        this.handler = eventuateKafkaConsumerMessageHandler;
        this.topics = list;
        this.consumerProperties = ConsumerPropertiesFactory.makeDefaultConsumerProperties(str2, str);
        this.consumerProperties.putAll(eventuateKafkaConsumerConfigurationProperties.getProperties());
        this.backPressureConfig = eventuateKafkaConsumerConfigurationProperties.getBackPressure();
        this.pollTimeout = eventuateKafkaConsumerConfigurationProperties.getPollTimeout();
    }

    public void setConsumerCallbacks(Optional<ConsumerCallbacks> optional) {
        this.consumerCallbacks = optional;
    }

    public boolean isCloseConsumerOnStop() {
        return this.closeConsumerOnStop;
    }

    public void setCloseConsumerOnStop(boolean z) {
        this.closeConsumerOnStop = z;
    }

    public static List<PartitionInfo> verifyTopicExistsBeforeSubscribing(KafkaConsumer<String, byte[]> kafkaConsumer, String str) {
        try {
            logger.debug("Verifying Topic {}", str);
            List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(str);
            logger.debug("Got these partitions {} for Topic {}", partitionsFor, str);
            return partitionsFor;
        } catch (Throwable th) {
            logger.error("Got exception: ", th);
            throw new RuntimeException(th);
        }
    }

    private void maybeCommitOffsets(KafkaConsumer<String, byte[]> kafkaConsumer, KafkaMessageProcessor kafkaMessageProcessor) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = kafkaMessageProcessor.offsetsToCommit();
        if (offsetsToCommit.isEmpty()) {
            return;
        }
        this.consumerCallbacks.ifPresent((v0) -> {
            v0.onTryCommitCallback();
        });
        logger.debug("Committing offsets {} {}", this.subscriberId, offsetsToCommit);
        kafkaConsumer.commitSync(offsetsToCommit);
        logger.debug("Committed offsets {}", this.subscriberId);
        kafkaMessageProcessor.noteOffsetsCommitted(offsetsToCommit);
        this.consumerCallbacks.ifPresent((v0) -> {
            v0.onCommitedCallback();
        });
    }

    public void start() {
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerProperties);
            KafkaMessageProcessor kafkaMessageProcessor = new KafkaMessageProcessor(this.subscriberId, this.handler);
            BackPressureManager backPressureManager = new BackPressureManager(this.backPressureConfig);
            Iterator<String> it = this.topics.iterator();
            while (it.hasNext()) {
                verifyTopicExistsBeforeSubscribing(kafkaConsumer, it.next());
            }
            logger.debug("Subscribing to {} {}", this.subscriberId, this.topics);
            kafkaConsumer.subscribe(new ArrayList(this.topics));
            logger.debug("Subscribed to {} {}", this.subscriberId, this.topics);
            new Thread(() -> {
                try {
                    runPollingLoop(kafkaConsumer, kafkaMessageProcessor, backPressureManager);
                    maybeCommitOffsets(kafkaConsumer, kafkaMessageProcessor);
                    this.state = EventuateKafkaConsumerState.STOPPED;
                    if (this.closeConsumerOnStop) {
                        kafkaConsumer.close();
                    }
                } catch (KafkaMessageProcessorFailedException e) {
                    logger.trace("Terminating since KafkaMessageProcessorFailedException");
                    this.state = EventuateKafkaConsumerState.MESSAGE_HANDLING_FAILED;
                    kafkaConsumer.close(Duration.of(200L, ChronoUnit.MILLIS));
                } catch (Throwable th) {
                    logger.error("Got exception: ", th);
                    this.state = EventuateKafkaConsumerState.FAILED;
                    kafkaConsumer.close(Duration.of(200L, ChronoUnit.MILLIS));
                    throw new RuntimeException(th);
                }
                logger.trace("Stopped in state {}", this.state);
            }, "Eventuate-subscriber-" + this.subscriberId).start();
            this.state = EventuateKafkaConsumerState.STARTED;
        } catch (Exception e) {
            logger.error("Error subscribing", e);
            this.state = EventuateKafkaConsumerState.FAILED_TO_START;
            throw new RuntimeException(e);
        }
    }

    private void runPollingLoop(KafkaConsumer<String, byte[]> kafkaConsumer, KafkaMessageProcessor kafkaMessageProcessor, BackPressureManager backPressureManager) {
        while (!this.stopFlag.get()) {
            ConsumerRecords poll = kafkaConsumer.poll(Duration.of(100L, ChronoUnit.MILLIS));
            if (!poll.isEmpty()) {
                logger.debug("Got {} {} records", this.subscriberId, Integer.valueOf(poll.count()));
            }
            if (poll.isEmpty()) {
                kafkaMessageProcessor.throwFailureException();
            } else {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord<String, byte[]> consumerRecord = (ConsumerRecord) it.next();
                    logger.debug("processing record {} {} {}", new Object[]{this.subscriberId, Long.valueOf(consumerRecord.offset()), consumerRecord.value()});
                    if (logger.isDebugEnabled()) {
                        logger.debug(String.format("EventuateKafkaAggregateSubscriptions subscriber = %s, offset = %d, key = %s, value = %s", this.subscriberId, Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()));
                    }
                    kafkaMessageProcessor.process(consumerRecord);
                }
            }
            if (!poll.isEmpty()) {
                logger.debug("Processed {} {} records", this.subscriberId, Integer.valueOf(poll.count()));
            }
            try {
                maybeCommitOffsets(kafkaConsumer, kafkaMessageProcessor);
            } catch (Exception e) {
                logger.error("Cannot commit offsets", e);
                this.consumerCallbacks.ifPresent((v0) -> {
                    v0.onCommitFailedCallback();
                });
            }
            if (!poll.isEmpty()) {
                logger.debug("To commit {} {}", this.subscriberId, kafkaMessageProcessor.getPending());
            }
            int backlog = kafkaMessageProcessor.backlog();
            HashSet hashSet = new HashSet();
            Iterator it2 = poll.iterator();
            while (it2.hasNext()) {
                ConsumerRecord consumerRecord2 = (ConsumerRecord) it2.next();
                hashSet.add(new TopicPartition(consumerRecord2.topic(), consumerRecord2.partition()));
            }
            BackPressureActions update = backPressureManager.update(hashSet, backlog);
            if (!update.pause.isEmpty()) {
                logger.info("Subscriber {} pausing {} due to backlog {} > {}", new Object[]{this.subscriberId, update.pause, Integer.valueOf(backlog), Integer.valueOf(this.backPressureConfig.getHigh())});
                kafkaConsumer.pause(update.pause);
            }
            if (!update.resume.isEmpty()) {
                logger.info("Subscriber {} resuming {} due to backlog {} <= {}", new Object[]{this.subscriberId, update.resume, Integer.valueOf(backlog), Integer.valueOf(this.backPressureConfig.getLow())});
                kafkaConsumer.resume(update.resume);
            }
        }
    }

    public void stop() {
        this.stopFlag.set(true);
    }

    public EventuateKafkaConsumerState getState() {
        return this.state;
    }
}
