/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.http.crt.internal;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongUnaryOperator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpStream;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

@SdkInternalApi
public final class AwsCrtResponseBodyPublisher
implements Publisher<ByteBuffer> {
    private static final Logger log = Logger.loggerFor(AwsCrtResponseBodyPublisher.class);
    private static final LongUnaryOperator DECREMENT_IF_GREATER_THAN_ZERO = x -> x > 0L ? x - 1L : x;
    private final HttpClientConnection connection;
    private final HttpStream stream;
    private final CompletableFuture<Void> responseComplete;
    private final AtomicLong outstandingRequests = new AtomicLong(0L);
    private final int windowSize;
    private final AtomicBoolean isCancelled = new AtomicBoolean(false);
    private final AtomicBoolean areNativeResourcesReleased = new AtomicBoolean(false);
    private final AtomicBoolean isSubscriptionComplete = new AtomicBoolean(false);
    private final AtomicBoolean queueComplete = new AtomicBoolean(false);
    private final AtomicInteger mutualRecursionDepth = new AtomicInteger(0);
    private final AtomicInteger queuedBytes = new AtomicInteger(0);
    private final AtomicReference<Subscriber<? super ByteBuffer>> subscriberRef = new AtomicReference<Object>(null);
    private final Queue<byte[]> queuedBuffers = new ConcurrentLinkedQueue<byte[]>();
    private final AtomicReference<Throwable> error = new AtomicReference<Object>(null);

    public AwsCrtResponseBodyPublisher(HttpClientConnection connection, HttpStream stream, CompletableFuture<Void> responseComplete, int windowSize) {
        this.connection = (HttpClientConnection)Validate.notNull((Object)connection, (String)"HttpConnection must not be null", (Object[])new Object[0]);
        this.stream = (HttpStream)Validate.notNull((Object)stream, (String)"Stream must not be null", (Object[])new Object[0]);
        this.responseComplete = (CompletableFuture)Validate.notNull(responseComplete, (String)"ResponseComplete future must not be null", (Object[])new Object[0]);
        this.windowSize = Validate.isPositive((int)windowSize, (String)"windowSize must be > 0");
    }

    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        Validate.notNull(subscriber, (String)"Subscriber must not be null", (Object[])new Object[0]);
        boolean wasFirstSubscriber = this.subscriberRef.compareAndSet(null, subscriber);
        if (!wasFirstSubscriber) {
            log.error(() -> "Only one subscriber allowed");
            subscriber.onSubscribe(new Subscription(){

                public void request(long n) {
                }

                public void cancel() {
                }
            });
            subscriber.onError((Throwable)new IllegalStateException("Only one subscriber allowed"));
        } else {
            subscriber.onSubscribe((Subscription)new AwsCrtResponseBodySubscription(this));
        }
    }

    public void queueBuffer(byte[] buffer) {
        Validate.notNull((Object)buffer, (String)"ByteBuffer must not be null", (Object[])new Object[0]);
        if (this.isCancelled.get()) {
            this.stream.incrementWindow(buffer.length);
            return;
        }
        this.queuedBuffers.add(buffer);
        int totalBytesQueued = this.queuedBytes.addAndGet(buffer.length);
        if (totalBytesQueued > this.windowSize) {
            throw new IllegalStateException("Queued more than Window Size: queued=" + totalBytesQueued + ", window=" + this.windowSize);
        }
    }

    protected void request(long n) {
        long outstandingReqs;
        Validate.inclusiveBetween((long)1L, (long)Long.MAX_VALUE, (long)n, (String)"request");
        if (n > Long.MAX_VALUE - this.outstandingRequests.get()) {
            this.outstandingRequests.set(Long.MAX_VALUE);
            outstandingReqs = Long.MAX_VALUE;
        } else {
            outstandingReqs = this.outstandingRequests.addAndGet(n);
        }
        this.publishToSubscribers();
        log.trace(() -> "Subscriber Requested more Buffers. Outstanding Requests: " + outstandingReqs);
    }

    public void setError(Throwable t) {
        log.error(() -> "Error processing Response Body", t);
        this.error.compareAndSet(null, t);
    }

    protected void setCancelled() {
        this.isCancelled.set(true);
        this.subscriberRef.set(null);
    }

    private synchronized void releaseNativeResources() {
        boolean alreadyReleased = this.areNativeResourcesReleased.getAndSet(true);
        if (!alreadyReleased) {
            this.stream.close();
            this.connection.close();
        }
    }

    public void setQueueComplete() {
        log.trace(() -> "Response Body Publisher queue marked as completed.");
        this.queueComplete.set(true);
        this.releaseNativeResources();
    }

    protected void completeSubscriptionExactlyOnce() {
        boolean alreadyComplete = this.isSubscriptionComplete.getAndSet(true);
        if (alreadyComplete) {
            return;
        }
        Optional<Object> subscriber = Optional.ofNullable(this.subscriberRef.getAndSet(null));
        Throwable throwable = this.error.get();
        this.releaseNativeResources();
        if (throwable != null) {
            log.error(() -> "Error before ResponseBodyPublisher could complete: " + throwable.getMessage());
            try {
                subscriber.ifPresent(s -> s.onError(throwable));
            }
            catch (Exception e) {
                log.warn(() -> "Failed to exceptionally complete subscriber future with: " + throwable.getMessage());
            }
            this.responseComplete.completeExceptionally(throwable);
        } else {
            log.debug(() -> "ResponseBodyPublisher Completed Successfully");
            try {
                subscriber.ifPresent(Subscriber::onComplete);
            }
            catch (Exception e) {
                log.warn(() -> "Failed to successfully complete subscriber future");
            }
            this.responseComplete.complete(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void publishToSubscribers() {
        boolean shouldComplete = true;
        AwsCrtResponseBodyPublisher awsCrtResponseBodyPublisher = this;
        synchronized (awsCrtResponseBodyPublisher) {
            if (this.error.get() == null) {
                if (this.isSubscriptionComplete.get() || this.isCancelled.get()) {
                    log.debug(() -> "Subscription already completed or cancelled, can't publish updates to Subscribers.");
                    return;
                }
                if (this.mutualRecursionDepth.get() > 0) {
                    return;
                }
                int totalAmountTransferred = 0;
                while (this.outstandingRequests.get() > 0L && !this.queuedBuffers.isEmpty()) {
                    byte[] buffer = this.queuedBuffers.poll();
                    this.outstandingRequests.getAndUpdate(DECREMENT_IF_GREATER_THAN_ZERO);
                    int amount = buffer.length;
                    this.publishWithoutMutualRecursion(this.subscriberRef.get(), ByteBuffer.wrap(buffer));
                    totalAmountTransferred += amount;
                }
                if (totalAmountTransferred > 0) {
                    this.queuedBytes.addAndGet(-totalAmountTransferred);
                    if (!this.areNativeResourcesReleased.get()) {
                        this.stream.incrementWindow(totalAmountTransferred);
                    }
                }
                shouldComplete = this.queueComplete.get() && this.queuedBuffers.isEmpty();
            } else {
                shouldComplete = true;
            }
        }
        if (shouldComplete) {
            this.completeSubscriptionExactlyOnce();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void publishWithoutMutualRecursion(Subscriber<? super ByteBuffer> subscriber, ByteBuffer buffer) {
        try {
            int depth = this.mutualRecursionDepth.getAndIncrement();
            if (depth == 0) {
                subscriber.onNext((Object)buffer);
            }
        }
        finally {
            this.mutualRecursionDepth.decrementAndGet();
        }
    }

    static class AwsCrtResponseBodySubscription
    implements Subscription {
        private final AwsCrtResponseBodyPublisher publisher;

        AwsCrtResponseBodySubscription(AwsCrtResponseBodyPublisher publisher) {
            this.publisher = publisher;
        }

        public void request(long n) {
            if (n <= 0L) {
                this.publisher.setError(new IllegalArgumentException("Request is for <= 0 elements: " + n));
                this.publisher.publishToSubscribers();
                return;
            }
            this.publisher.request(n);
            this.publisher.publishToSubscribers();
        }

        public void cancel() {
            this.publisher.setCancelled();
        }
    }
}

