/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.transfer.s3.internal;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.DemandIgnoringSubscription;
import software.amazon.awssdk.utils.async.StoringSubscriber;

@SdkInternalApi
public class AsyncBufferingSubscriber<T>
implements Subscriber<T> {
    private static final Logger log = Logger.loggerFor(AsyncBufferingSubscriber.class);
    private final CompletableFuture<?> returnFuture;
    private final Function<T, CompletableFuture<?>> consumer;
    private final int maxConcurrentExecutions;
    private final AtomicInteger numRequestsInFlight;
    private final AtomicBoolean isDelivering = new AtomicBoolean(false);
    private volatile boolean isStreamingDone;
    private Subscription subscription;
    private final StoringSubscriber<T> storingSubscriber;

    public AsyncBufferingSubscriber(Function<T, CompletableFuture<?>> consumer, CompletableFuture<Void> returnFuture, int maxConcurrentExecutions) {
        this.returnFuture = returnFuture;
        this.consumer = consumer;
        this.maxConcurrentExecutions = maxConcurrentExecutions;
        this.numRequestsInFlight = new AtomicInteger(0);
        this.storingSubscriber = new StoringSubscriber(Integer.MAX_VALUE);
    }

    public void onSubscribe(Subscription subscription) {
        Validate.paramNotNull((Object)subscription, (String)"subscription");
        if (this.subscription != null) {
            log.warn(() -> "The subscriber has already been subscribed. Cancelling the incoming subscription");
            subscription.cancel();
            return;
        }
        this.storingSubscriber.onSubscribe((Subscription)new DemandIgnoringSubscription(subscription));
        this.subscription = subscription;
        subscription.request((long)this.maxConcurrentExecutions);
    }

    public void onNext(T item) {
        this.storingSubscriber.onNext(item);
        this.flushBufferIfNeeded();
    }

    private void flushBufferIfNeeded() {
        if (this.isDelivering.compareAndSet(false, true)) {
            try {
                Optional next = this.storingSubscriber.peek();
                while (this.numRequestsInFlight.get() < this.maxConcurrentExecutions) {
                    if (!next.isPresent()) {
                        this.subscription.request(1L);
                        break;
                    }
                    switch (((StoringSubscriber.Event)next.get()).type()) {
                        case ON_COMPLETE: {
                            this.handleCompleteEvent();
                            break;
                        }
                        case ON_ERROR: {
                            this.handleError(((StoringSubscriber.Event)next.get()).runtimeError());
                            break;
                        }
                        case ON_NEXT: {
                            this.handleOnNext(((StoringSubscriber.Event)next.get()).value());
                            break;
                        }
                        default: {
                            this.handleError(new IllegalStateException("Unknown stored type: " + ((StoringSubscriber.Event)next.get()).type()));
                        }
                    }
                    next = this.storingSubscriber.peek();
                }
            }
            finally {
                this.isDelivering.set(false);
            }
        }
    }

    private void handleOnNext(T item) {
        this.storingSubscriber.poll();
        int numberOfRequestInFlight = this.numRequestsInFlight.incrementAndGet();
        log.debug(() -> "Delivering next item, numRequestInFlight=" + numberOfRequestInFlight);
        this.consumer.apply(item).whenComplete((r, t) -> {
            this.numRequestsInFlight.decrementAndGet();
            if (!this.isStreamingDone) {
                this.subscription.request(1L);
            } else {
                this.flushBufferIfNeeded();
            }
        });
    }

    private void handleCompleteEvent() {
        if (this.numRequestsInFlight.get() == 0) {
            this.returnFuture.complete(null);
            this.storingSubscriber.poll();
        }
    }

    public void onError(Throwable t) {
        this.handleError(t);
        this.storingSubscriber.onError(t);
    }

    private void handleError(Throwable t) {
        this.returnFuture.completeExceptionally(t);
        this.storingSubscriber.poll();
    }

    public void onComplete() {
        this.isStreamingDone = true;
        this.storingSubscriber.onComplete();
        this.flushBufferIfNeeded();
    }

    public int numRequestsInFlight() {
        return this.numRequestsInFlight.get();
    }
}

