package io.eventuate.messaging.kafka.consumer;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/messaging/kafka/consumer/SwimlaneBasedDispatcher.class */
public class SwimlaneBasedDispatcher {
    private static Logger logger = LoggerFactory.getLogger(SwimlaneBasedDispatcher.class);
    private final ConcurrentHashMap<Integer, SwimlaneDispatcher> map = new ConcurrentHashMap<>();
    private Executor executor;
    private String subscriberId;

    public SwimlaneBasedDispatcher(String str, Executor executor) {
        this.subscriberId = str;
        this.executor = executor;
    }

    public SwimlaneDispatcherBacklog dispatch(RawKafkaMessage rawKafkaMessage, Integer num, Consumer<RawKafkaMessage> consumer) {
        return getOrCreate(num).dispatch(rawKafkaMessage, consumer);
    }

    private SwimlaneDispatcher getOrCreate(Integer num) {
        SwimlaneDispatcher swimlaneDispatcher = this.map.get(num);
        if (swimlaneDispatcher == null) {
            logger.trace("No dispatcher for {} {}. Attempting to create", this.subscriberId, num);
            swimlaneDispatcher = new SwimlaneDispatcher(this.subscriberId, num, this.executor);
            SwimlaneDispatcher putIfAbsent = this.map.putIfAbsent(num, swimlaneDispatcher);
            if (putIfAbsent != null) {
                logger.trace("Using concurrently created SwimlaneDispatcher for {} {}", this.subscriberId, num);
                swimlaneDispatcher = putIfAbsent;
            } else {
                logger.trace("Using newly created SwimlaneDispatcher for {} {}", this.subscriberId, num);
            }
        }
        return swimlaneDispatcher;
    }
}
