/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.server.netty.body;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.http.exceptions.ContentLengthExceededException;
import io.micronaut.http.netty.reactive.HotObservable;
import io.micronaut.http.netty.stream.DelegateStreamedHttpRequest;
import io.micronaut.http.server.HttpServerConfiguration;
import io.micronaut.http.server.netty.FormDataHttpContentProcessor;
import io.micronaut.http.server.netty.HttpContentProcessorAsReactiveProcessor;
import io.micronaut.http.server.netty.body.ByteBody;
import io.micronaut.http.server.netty.body.HttpBody;
import io.micronaut.http.server.netty.body.HttpBodyReused;
import io.micronaut.http.server.netty.body.ImmediateByteBody;
import io.micronaut.http.server.netty.body.ManagedBody;
import io.micronaut.http.server.netty.body.MultiObjectBody;
import io.micronaut.http.server.netty.body.StreamingMultiObjectBody;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Internal
public final class StreamingByteBody
extends ManagedBody<Publisher<HttpContent>>
implements ByteBody {
    private final long advertisedLength;

    StreamingByteBody(Publisher<HttpContent> publisher, long advertisedLength) {
        super(publisher);
        this.advertisedLength = advertisedLength;
    }

    @Override
    public MultiObjectBody processMulti(FormDataHttpContentProcessor processor) {
        return this.next(new StreamingMultiObjectBody(HttpContentProcessorAsReactiveProcessor.asPublisher(processor, (Publisher)this.prepareClaim())));
    }

    @Override
    public MultiObjectBody rawContent(HttpServerConfiguration configuration) {
        ImmediateByteBody.checkLength(configuration, this.advertisedLength);
        return this.next(new StreamingMultiObjectBody(new LengthCheckPublisher(configuration, (Publisher)this.prepareClaim())));
    }

    @Override
    public ExecutionFlow<ImmediateByteBody> buffer(ByteBufAllocator alloc) {
        IntermediateBuffering intermediateBuffering = new IntermediateBuffering(alloc);
        ((Publisher)this.prepareClaim()).subscribe(intermediateBuffering);
        this.next(intermediateBuffering);
        return intermediateBuffering.completion;
    }

    @Override
    public HttpRequest claimForReuse(HttpRequest request) {
        Publisher publisher = (Publisher)this.prepareClaim();
        this.next(new HttpBodyReused());
        return new DelegateStreamedHttpRequest(request, publisher);
    }

    @Override
    void release(Publisher<HttpContent> value) {
        if (value instanceof HotObservable) {
            HotObservable hot = (HotObservable)value;
            hot.closeIfNoSubscriber();
        }
    }

    private static final class LengthCheckPublisher
    implements Publisher<ByteBuf>,
    Subscriber<HttpContent> {
        private final HttpServerConfiguration configuration;
        private final Publisher<HttpContent> upstream;
        private Subscriber<? super ByteBuf> downstream;
        private Subscription subscription;
        private long received = 0L;
        private boolean exceeded = false;

        LengthCheckPublisher(HttpServerConfiguration configuration, Publisher<HttpContent> upstream) {
            this.configuration = configuration;
            this.upstream = upstream;
        }

        @Override
        public void subscribe(Subscriber<? super ByteBuf> s2) {
            this.downstream = s2;
            this.upstream.subscribe(this);
        }

        @Override
        public void onSubscribe(Subscription s2) {
            this.subscription = s2;
            this.downstream.onSubscribe(s2);
        }

        @Override
        public void onNext(HttpContent httpContent) {
            if (this.exceeded) {
                httpContent.release();
                return;
            }
            ByteBuf buf = httpContent.content();
            this.received += (long)buf.readableBytes();
            try {
                ImmediateByteBody.checkLength(this.configuration, this.received);
            }
            catch (ContentLengthExceededException fail) {
                this.exceeded = true;
                httpContent.release();
                this.downstream.onError(fail);
                this.subscription.cancel();
                return;
            }
            this.downstream.onNext(buf);
        }

        @Override
        public void onError(Throwable t2) {
            if (!this.exceeded) {
                this.downstream.onError(t2);
            }
        }

        @Override
        public void onComplete() {
            if (!this.exceeded) {
                this.downstream.onComplete();
            }
        }
    }

    private static final class IntermediateBuffering
    implements Subscriber<HttpContent>,
    HttpBody {
        private final DelayedExecutionFlow<ImmediateByteBody> completion = DelayedExecutionFlow.create();
        private final Lock lock = new ReentrantLock();
        private final ByteBufAllocator alloc;
        private Subscription subscription;
        private boolean discarded = false;
        private CompositeByteBuf composite;
        private ByteBuf single;
        private ImmediateByteBody next;

        private IntermediateBuffering(ByteBufAllocator alloc) {
            this.alloc = alloc;
        }

        @Override
        public void onSubscribe(Subscription s2) {
            s2.request(Long.MAX_VALUE);
            this.subscription = s2;
        }

        @Override
        public void onNext(HttpContent httpContent) {
            this.lock.lock();
            try {
                if (this.discarded) {
                    httpContent.release();
                    return;
                }
                if (this.composite != null) {
                    this.composite.addComponent(true, httpContent.content());
                } else if (this.single == null) {
                    this.single = httpContent.content();
                } else {
                    this.composite = this.alloc.compositeBuffer();
                    this.composite.addComponent(true, this.single);
                    this.composite.addComponent(true, httpContent.content());
                    this.single = null;
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public void onError(Throwable t2) {
            this.discard();
            try {
                this.completion.completeExceptionally(t2);
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }

        @Override
        public void onComplete() {
            this.lock.lock();
            try {
                this.discarded = true;
                this.next = new ImmediateByteBody(this.composite == null ? this.single : this.composite);
                this.single = null;
                this.composite = null;
            }
            finally {
                this.lock.unlock();
            }
            this.completion.complete(this.next);
        }

        private void discard() {
            this.lock.lock();
            try {
                this.discarded = true;
                if (this.composite != null) {
                    this.composite.release();
                    this.composite = null;
                }
                if (this.single != null) {
                    this.single.release();
                    this.single = null;
                }
            }
            finally {
                this.lock.unlock();
            }
            if (this.next != null) {
                this.next.release();
            }
            if (this.subscription != null) {
                this.subscription.cancel();
            }
        }

        @Override
        public void release() {
            this.discard();
        }

        @Override
        @Nullable
        public HttpBody next() {
            return this.next;
        }
    }
}

