/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.integration.eventhub.support;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.spring.integration.core.api.PartitionSupplier;
import com.azure.spring.integration.core.api.StartPosition;
import com.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.azure.spring.integration.eventhub.impl.EventHubProcessor;
import com.azure.spring.integration.eventhub.impl.EventHubTemplate;
import com.azure.spring.integration.eventhub.support.EventHubProcessorSupport;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import reactor.core.publisher.Mono;

public class EventHubTestOperation
extends EventHubTemplate {
    private final Multimap<String, EventData> eventHubsByName = ArrayListMultimap.create();
    private final Map<String, Map<String, EventHubProcessor>> processorsByNameAndGroup = new ConcurrentHashMap<String, Map<String, EventHubProcessor>>();
    private final Supplier<EventContext> eventContextSupplier;

    public EventHubTestOperation(EventHubClientFactory clientFactory, Supplier<EventContext> eventContextSupplier) {
        super(clientFactory);
        this.eventContextSupplier = eventContextSupplier;
    }

    public <U> Mono<Void> sendAsync(String eventHubName, @NonNull Message<U> message, PartitionSupplier partitionSupplier) {
        EventData azureMessage = (EventData)this.getMessageConverter().fromMessage(message, EventData.class);
        this.eventHubsByName.put((Object)eventHubName, (Object)azureMessage);
        this.processorsByNameAndGroup.putIfAbsent(eventHubName, new ConcurrentHashMap());
        this.processorsByNameAndGroup.get(eventHubName).values().forEach(c -> {
            EventHubProcessorSupport cs = (EventHubProcessorSupport)c;
            cs.onEvent(this.eventContextSupplier.get(), azureMessage);
        });
        return Mono.empty();
    }

    @Override
    protected synchronized void createEventProcessorClient(String name, String consumerGroup, EventHubProcessor eventHubProcessor) {
        this.processorsByNameAndGroup.putIfAbsent(name, new ConcurrentHashMap());
        this.processorsByNameAndGroup.get(name).putIfAbsent(consumerGroup, eventHubProcessor);
    }

    @Override
    protected void startEventProcessorClient(String name, String consumerGroup) {
        if (this.getStartPosition() == StartPosition.EARLIEST) {
            this.processorsByNameAndGroup.get(name).values().forEach(c -> {
                EventHubProcessorSupport cs = (EventHubProcessorSupport)c;
                this.eventHubsByName.get((Object)name).forEach(m -> cs.onEvent(this.eventContextSupplier.get(), (EventData)m));
            });
        }
    }

    @Override
    protected void stopEventProcessorClient(String name, String consumerGroup) {
        this.processorsByNameAndGroup.get(name).remove(consumerGroup);
    }

    @Override
    public EventHubProcessor createEventProcessor(Consumer<Message<?>> consumer, Class<?> messagePayloadType) {
        return new EventHubProcessorSupport(consumer, messagePayloadType, this.getCheckpointConfig(), this.getMessageConverter());
    }
}

