/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.integration.eventhub.impl;

import com.azure.spring.cloud.context.core.util.Tuple;
import com.azure.spring.integration.core.api.PartitionSupplier;
import com.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.azure.spring.integration.eventhub.api.EventHubRxOperation;
import com.azure.spring.integration.eventhub.impl.AbstractEventHubTemplate;
import com.azure.spring.integration.eventhub.impl.EventHubProcessor;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.messaging.Message;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Subscriber;
import rx.subscriptions.Subscriptions;

public class EventHubRxTemplate
extends AbstractEventHubTemplate
implements EventHubRxOperation {
    private final ConcurrentHashMap<Tuple<String, String>, Observable<Message<?>>> subjectByNameAndGroup = new ConcurrentHashMap();

    public EventHubRxTemplate(EventHubClientFactory clientFactory) {
        super(clientFactory);
    }

    private static <T> Observable<T> toObservable(Mono<T> mono) {
        return Observable.create(subscriber -> mono.toFuture().whenComplete((result, error) -> {
            if (error != null) {
                subscriber.onError(error);
            } else {
                subscriber.onNext(result);
                subscriber.onCompleted();
            }
        }));
    }

    public <T> Observable<Void> sendRx(String destination, Message<T> message, PartitionSupplier partitionSupplier) {
        return EventHubRxTemplate.toObservable(this.sendAsync(destination, message, partitionSupplier));
    }

    public Observable<Message<?>> subscribe(String destination, String consumerGroup, Class<?> messagePayloadType) {
        Tuple nameAndConsumerGroup = Tuple.of((Object)destination, (Object)consumerGroup);
        this.subjectByNameAndGroup.computeIfAbsent((Tuple<String, String>)nameAndConsumerGroup, k -> Observable.create(subscriber -> {
            EventHubProcessor eventHubProcessor = new EventHubProcessor(arg_0 -> ((Subscriber)subscriber).onNext(arg_0), messagePayloadType, this.getCheckpointConfig(), this.getMessageConverter());
            this.createEventProcessorClient(destination, consumerGroup, eventHubProcessor);
            this.startEventProcessorClient(destination, consumerGroup);
            subscriber.add(Subscriptions.create(() -> this.stopEventProcessorClient(destination, consumerGroup)));
        }).share());
        return this.subjectByNameAndGroup.get(nameAndConsumerGroup);
    }
}

