package io.eventuate.messaging.kafka.consumer;

import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumer;
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumerConfigurationProperties;
import io.eventuate.messaging.kafka.common.EventuateBinaryMessageEncoding;
import io.eventuate.messaging.kafka.common.EventuateKafkaMultiMessageConverter;
import io.eventuate.messaging.partitionmanagement.CommonMessageConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/messaging/kafka/consumer/MessageConsumerKafkaImpl.class */
public class MessageConsumerKafkaImpl implements CommonMessageConsumer {
    private String bootstrapServers;
    private EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private final String id = UUID.randomUUID().toString();
    private List<EventuateKafkaConsumer> consumers = new ArrayList();
    private EventuateKafkaMultiMessageConverter eventuateKafkaMultiMessageConverter = new EventuateKafkaMultiMessageConverter();

    public MessageConsumerKafkaImpl(String str, EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties) {
        this.bootstrapServers = str;
        this.eventuateKafkaConsumerConfigurationProperties = eventuateKafkaConsumerConfigurationProperties;
    }

    public KafkaSubscription subscribe(String str, Set<String> set, KafkaMessageHandler kafkaMessageHandler) {
        SwimlaneBasedDispatcher swimlaneBasedDispatcher = new SwimlaneBasedDispatcher(str, Executors.newCachedThreadPool());
        EventuateKafkaConsumer eventuateKafkaConsumer = new EventuateKafkaConsumer(str, (consumerRecord, biConsumer) -> {
            return swimlaneBasedDispatcher.dispatch(new RawKafkaMessage((byte[]) consumerRecord.value()), Integer.valueOf(consumerRecord.partition()), rawKafkaMessage -> {
                handle(rawKafkaMessage, biConsumer, kafkaMessageHandler);
            });
        }, new ArrayList(set), this.bootstrapServers, this.eventuateKafkaConsumerConfigurationProperties);
        this.consumers.add(eventuateKafkaConsumer);
        eventuateKafkaConsumer.start();
        return new KafkaSubscription(() -> {
            eventuateKafkaConsumer.stop();
            this.consumers.remove(eventuateKafkaConsumer);
        });
    }

    public void handle(RawKafkaMessage rawKafkaMessage, BiConsumer<Void, Throwable> biConsumer, KafkaMessageHandler kafkaMessageHandler) {
        try {
            if (this.eventuateKafkaMultiMessageConverter.isMultiMessage(rawKafkaMessage.getPayload())) {
                this.eventuateKafkaMultiMessageConverter.convertBytesToMessages(rawKafkaMessage.getPayload()).getMessages().stream().map((v0) -> {
                    return v0.getValue();
                }).map(KafkaMessage::new).forEach(kafkaMessageHandler);
            } else {
                kafkaMessageHandler.accept(new KafkaMessage(EventuateBinaryMessageEncoding.bytesToString(rawKafkaMessage.getPayload())));
            }
            biConsumer.accept(null, null);
        } catch (Throwable th) {
            biConsumer.accept(null, th);
            throw th;
        }
    }

    public void close() {
        this.consumers.forEach((v0) -> {
            v0.stop();
        });
    }

    public String getId() {
        return this.id;
    }
}
