/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubBufferedPartitionProducer;
import com.azure.messaging.eventhubs.EventHubBufferedProducerClientBuilder;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubProperties;
import com.azure.messaging.eventhubs.PartitionProperties;
import com.azure.messaging.eventhubs.PartitionResolver;
import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
import com.azure.messaging.eventhubs.models.SendOptions;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

@ServiceClient(builder=EventHubBufferedProducerClientBuilder.class, isAsync=true)
public final class EventHubBufferedProducerAsyncClient
implements Closeable {
    private static final SendOptions ROUND_ROBIN_SEND_OPTIONS = new SendOptions();
    private final ClientLogger logger = new ClientLogger(EventHubBufferedProducerAsyncClient.class);
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final EventHubProducerAsyncClient client;
    private final BufferedProducerClientOptions clientOptions;
    private final PartitionResolver partitionResolver;
    private final Mono<Void> initialisationMono;
    private final Mono<String[]> partitionIdsMono;
    private final ConcurrentHashMap<String, EventHubBufferedPartitionProducer> partitionProducers = new ConcurrentHashMap();
    private final AmqpRetryOptions retryOptions;
    private final Tracer tracer;

    EventHubBufferedProducerAsyncClient(EventHubClientBuilder builder, BufferedProducerClientOptions clientOptions, PartitionResolver partitionResolver, AmqpRetryOptions retryOptions, Tracer tracer) {
        this.client = builder.buildAsyncProducerClient();
        this.clientOptions = clientOptions;
        this.partitionResolver = partitionResolver;
        this.retryOptions = retryOptions;
        Mono partitionProducerFluxes = this.client.getEventHubProperties().flatMapMany(property -> {
            Object[] as = (String[])property.getPartitionIds().stream().toArray(String[]::new);
            return Flux.fromArray((Object[])as);
        }).map(partitionId -> this.partitionProducers.computeIfAbsent((String)partitionId, key -> this.createPartitionProducer((String)key))).then();
        this.initialisationMono = partitionProducerFluxes.cache();
        this.partitionIdsMono = this.initialisationMono.then(Mono.fromCallable(() -> new ArrayList(this.partitionProducers.keySet()).toArray(new String[0]))).cache();
        this.tracer = tracer;
    }

    public String getFullyQualifiedNamespace() {
        return this.client.getFullyQualifiedNamespace();
    }

    public String getEventHubName() {
        return this.client.getEventHubName();
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<EventHubProperties> getEventHubProperties() {
        return this.initialisationMono.then(Mono.defer(() -> this.client.getEventHubProperties()));
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public Flux<String> getPartitionIds() {
        return this.partitionIdsMono.flatMapMany(ids -> Flux.fromArray((Object[])ids));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<PartitionProperties> getPartitionProperties(String partitionId) {
        if (Objects.isNull(partitionId)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'partitionId' cannot be null."));
        }
        if (CoreUtils.isNullOrEmpty((CharSequence)partitionId)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("'partitionId' cannot be empty."));
        }
        return this.client.getPartitionProperties(partitionId);
    }

    public int getBufferedEventCount() {
        return this.partitionProducers.values().parallelStream().mapToInt(producer -> producer.getBufferedEventCount()).sum();
    }

    public int getBufferedEventCount(String partitionId) {
        EventHubBufferedPartitionProducer producer = this.partitionProducers.get(partitionId);
        return producer != null ? producer.getBufferedEventCount() : 0;
    }

    public Mono<Integer> enqueueEvent(EventData eventData) {
        return this.enqueueEvent(eventData, ROUND_ROBIN_SEND_OPTIONS);
    }

    public Mono<Integer> enqueueEvent(EventData eventData, SendOptions options) {
        if (eventData == null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'eventData' cannot be null."));
        }
        if (options == null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'options' cannot be null."));
        }
        if (!CoreUtils.isNullOrEmpty((CharSequence)options.getPartitionId())) {
            if (!this.partitionProducers.containsKey(options.getPartitionId())) {
                return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("partitionId is not valid. Available ones: " + String.join((CharSequence)",", this.partitionProducers.keySet())));
            }
            EventHubBufferedPartitionProducer producer = this.partitionProducers.computeIfAbsent(options.getPartitionId(), key -> this.createPartitionProducer((String)key));
            return producer.enqueueEvent(eventData).thenReturn((Object)this.getBufferedEventCount());
        }
        if (options.getPartitionKey() != null) {
            return this.partitionIdsMono.flatMap(ids -> {
                String partitionId = this.partitionResolver.assignForPartitionKey(options.getPartitionKey(), (String[])ids);
                EventHubBufferedPartitionProducer producer = this.partitionProducers.get(partitionId);
                if (producer == null) {
                    return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException(String.format("Unable to find EventHubBufferedPartitionProducer for partitionId: %s when mapping partitionKey: %s to available partitions.", partitionId, options.getPartitionKey())));
                }
                eventData.setPartitionKeyAnnotation(options.getPartitionKey());
                return producer.enqueueEvent(eventData).thenReturn((Object)this.getBufferedEventCount());
            });
        }
        return this.partitionIdsMono.flatMap(ids -> {
            String partitionId = this.partitionResolver.assignRoundRobin((String[])ids);
            EventHubBufferedPartitionProducer producer = this.partitionProducers.computeIfAbsent(partitionId, key -> this.createPartitionProducer((String)key));
            eventData.setPartitionKeyAnnotation(options.getPartitionKey());
            return producer.enqueueEvent(eventData).thenReturn((Object)this.getBufferedEventCount());
        });
    }

    public Mono<Integer> enqueueEvents(Iterable<EventData> events) {
        return this.enqueueEvents(events, ROUND_ROBIN_SEND_OPTIONS);
    }

    public Mono<Integer> enqueueEvents(Iterable<EventData> events, SendOptions options) {
        if (events == null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'eventData' cannot be null."));
        }
        if (options == null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'options' cannot be null."));
        }
        List enqueued = StreamSupport.stream(events.spliterator(), false).map(event -> this.enqueueEvent((EventData)((Object)event), options)).collect(Collectors.toList());
        return Flux.concat(enqueued).last();
    }

    public Mono<Void> flush() {
        List flushOperations = this.partitionProducers.values().stream().map(value -> value.flush()).collect(Collectors.toList());
        return Flux.merge(flushOperations).then();
    }

    @Override
    public void close() {
        if (this.isClosed.getAndSet(true)) {
            return;
        }
        this.partitionProducers.values().forEach(partitionProducer -> partitionProducer.close());
        this.client.close();
    }

    private EventHubBufferedPartitionProducer createPartitionProducer(String partitionId) {
        Supplier queueSupplier = Queues.get((int)this.clientOptions.getMaxEventBufferLengthPerPartition());
        Queue eventQueue = (Queue)queueSupplier.get();
        Sinks.Many eventSink = Sinks.many().unicast().onBackpressureBuffer(eventQueue);
        return new EventHubBufferedPartitionProducer(this.client, partitionId, this.clientOptions, this.retryOptions, (Sinks.Many<EventData>)eventSink, eventQueue, this.tracer);
    }

    static class BufferedProducerClientOptions {
        private boolean enableIdempotentRetries = false;
        private int maxConcurrentSendsPerPartition = 1;
        private int maxEventBufferLengthPerPartition = 1500;
        private Duration maxWaitTime = Duration.ofSeconds(30L);
        private Consumer<SendBatchFailedContext> sendFailedContext;
        private Consumer<SendBatchSucceededContext> sendSucceededContext;
        private int maxConcurrentSends = 1;

        BufferedProducerClientOptions() {
        }

        boolean enableIdempotentRetries() {
            return this.enableIdempotentRetries;
        }

        void setEnableIdempotentRetries(boolean enableIdempotentRetries) {
            this.enableIdempotentRetries = enableIdempotentRetries;
        }

        int getMaxConcurrentSends() {
            return this.maxConcurrentSends;
        }

        void setMaxConcurrentSends(int maxConcurrentSends) {
            this.maxConcurrentSends = maxConcurrentSends;
        }

        int getMaxConcurrentSendsPerPartition() {
            return this.maxConcurrentSendsPerPartition;
        }

        void setMaxConcurrentSendsPerPartition(int maxConcurrentSendsPerPartition) {
            this.maxConcurrentSendsPerPartition = maxConcurrentSendsPerPartition;
        }

        int getMaxEventBufferLengthPerPartition() {
            return this.maxEventBufferLengthPerPartition;
        }

        void maxEventBufferLengthPerPartition(int maxPendingEventCount) {
            this.maxEventBufferLengthPerPartition = maxPendingEventCount;
        }

        Duration getMaxWaitTime() {
            return this.maxWaitTime;
        }

        void setMaxWaitTime(Duration maxWaitTime) {
            this.maxWaitTime = maxWaitTime;
        }

        Consumer<SendBatchFailedContext> getSendFailedContext() {
            return this.sendFailedContext;
        }

        void setSendFailedContext(Consumer<SendBatchFailedContext> sendFailedContext) {
            this.sendFailedContext = sendFailedContext;
        }

        Consumer<SendBatchSucceededContext> getSendSucceededContext() {
            return this.sendSucceededContext;
        }

        void setSendSucceededContext(Consumer<SendBatchSucceededContext> sendSucceededContext) {
            this.sendSucceededContext = sendSucceededContext;
        }
    }
}

