package io.gravitee.gateway.jupiter.handlers.api.adapter.invoker;

import io.gravitee.gateway.api.buffer.Buffer;
import io.gravitee.gateway.api.handler.Handler;
import io.gravitee.gateway.api.proxy.ProxyConnection;
import io.gravitee.gateway.api.proxy.ProxyResponse;
import io.gravitee.gateway.jupiter.api.context.RequestExecutionContext;
import io.reactivex.Flowable;
import io.reactivex.internal.subscriptions.EmptySubscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gravitee/gateway/jupiter/handlers/api/adapter/invoker/FlowableProxyResponse.class */
public class FlowableProxyResponse extends Flowable<Buffer> {
    private final Logger log = LoggerFactory.getLogger(FlowableProxyResponse.class);
    private final AtomicLong demand = new AtomicLong(0);
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
    private ProxyResponse proxyResponse;
    private RequestExecutionContext ctx;
    private ProxyConnection connection;
    private Subscription subscription;
    private Subscriber<? super Buffer> subscriber;

    /* loaded from: input_file:io/gravitee/gateway/jupiter/handlers/api/adapter/invoker/FlowableProxyResponse$ProxyResponseSubscription.class */
    final class ProxyResponseSubscription implements Subscription {
        ProxyResponseSubscription() {
        }

        public void request(long j) {
            if (FlowableProxyResponse.this.demand.addAndGet(j) > 0) {
                FlowableProxyResponse.this.resumeProxyResponse();
            }
        }

        public void cancel() {
            FlowableProxyResponse.this.cancelProxyResponse();
        }
    }

    public void initialize(RequestExecutionContext requestExecutionContext, ProxyConnection proxyConnection, ProxyResponse proxyResponse) {
        this.ctx = requestExecutionContext;
        this.connection = proxyConnection;
        this.proxyResponse = proxyResponse;
        pauseProxyResponse();
    }

    private void release() {
        this.proxyResponse.endHandler((Handler) null);
        this.proxyResponse.bodyHandler((Handler) null);
    }

    protected void subscribeActual(Subscriber<? super Buffer> subscriber) {
        if (this.subscription != null) {
            EmptySubscription.error(new IllegalStateException("This processor allows only a single Subscriber"), subscriber);
            return;
        }
        this.log.debug("Subscribing to proxy response");
        this.subscriber = subscriber;
        this.subscription = new ProxyResponseSubscription();
        this.proxyResponse.bodyHandler(this::handleChunk);
        this.proxyResponse.endHandler(r3 -> {
            handleEnd();
        });
        subscriber.onSubscribe(this.subscription);
    }

    private void handleChunk(Buffer buffer) {
        try {
            if (this.ctx.response().ended()) {
                cancelProxyResponse();
                this.subscriber.onComplete();
            } else {
                this.subscriber.onNext(buffer);
                if (this.demand.decrementAndGet() == 0) {
                    pauseProxyResponse();
                }
            }
        } catch (Throwable th) {
            this.subscriber.onError(th);
            cancelProxyResponse();
        }
    }

    private void handleEnd() {
        release();
        this.subscriber.onComplete();
    }

    private void pauseProxyResponse() {
        this.proxyResponse.pause();
    }

    private void resumeProxyResponse() {
        this.proxyResponse.resume();
    }

    private void cancelProxyResponse() {
        try {
            if (this.cancelled.compareAndSet(false, true)) {
                this.log.debug("Cancelling proxy response");
                this.proxyResponse.cancel();
                this.connection.cancel();
            }
        } catch (Exception e) {
            this.log.warn("Unable to properly cancel the proxy response.", e);
        }
    }
}
