package io.eventuate.messaging.kafka.consumer;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/messaging/kafka/consumer/SwimlaneDispatcher.class */
public class SwimlaneDispatcher {
    private static Logger logger = LoggerFactory.getLogger(SwimlaneDispatcher.class);
    private String subscriberId;
    private Integer swimlane;
    private Executor executor;
    private final LinkedBlockingQueue<QueuedMessage> queue = new LinkedBlockingQueue<>();
    private AtomicBoolean running = new AtomicBoolean(false);
    private SwimlaneDispatcherBacklog consumerStatus = new SwimlaneDispatcherBacklog(this.queue);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/eventuate/messaging/kafka/consumer/SwimlaneDispatcher$QueuedMessage.class */
    public class QueuedMessage {
        RawKafkaMessage message;
        Consumer<RawKafkaMessage> messageConsumer;

        public QueuedMessage(RawKafkaMessage rawKafkaMessage, Consumer<RawKafkaMessage> consumer) {
            this.message = rawKafkaMessage;
            this.messageConsumer = consumer;
        }
    }

    public SwimlaneDispatcher(String str, Integer num, Executor executor) {
        this.subscriberId = str;
        this.swimlane = num;
        this.executor = executor;
    }

    public boolean getRunning() {
        return this.running.get();
    }

    public SwimlaneDispatcherBacklog dispatch(RawKafkaMessage rawKafkaMessage, Consumer<RawKafkaMessage> consumer) {
        synchronized (this.queue) {
            this.queue.add(new QueuedMessage(rawKafkaMessage, consumer));
            logger.trace("added message to queue: {} {} {}", new Object[]{this.subscriberId, this.swimlane, rawKafkaMessage});
            if (this.running.compareAndSet(false, true)) {
                logger.trace("Stopped - attempting to process newly queued message: {} {}", this.subscriberId, this.swimlane);
                processNextQueuedMessage();
            } else {
                logger.trace("Running - Not attempting to process newly queued message: {} {}", this.subscriberId, this.swimlane);
            }
        }
        return this.consumerStatus;
    }

    private void processNextQueuedMessage() {
        this.executor.execute(this::processQueuedMessage);
    }

    public void processQueuedMessage() {
        while (true) {
            QueuedMessage nextMessage = getNextMessage();
            if (nextMessage == null) {
                logger.trace("No queued message for {} {}", this.subscriberId, this.swimlane);
                return;
            }
            logger.trace("Invoking handler for message for {} {} {}", new Object[]{this.subscriberId, this.swimlane, nextMessage.message});
            try {
                nextMessage.messageConsumer.accept(nextMessage.message);
            } catch (RuntimeException e) {
                logger.error("Exception handling message - terminating", e);
                return;
            }
        }
    }

    private QueuedMessage getNextMessage() {
        QueuedMessage poll;
        QueuedMessage poll2 = this.queue.poll();
        if (poll2 != null) {
            return poll2;
        }
        synchronized (this.queue) {
            poll = this.queue.poll();
            if (poll == null) {
                this.running.compareAndSet(true, false);
            }
        }
        return poll;
    }
}
