/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.jetty.reactive.client.internal;

import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.function.BiFunction;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import org.eclipse.jetty.reactive.client.ReactiveResponse;
import org.eclipse.jetty.reactive.client.internal.AbstractSingleProcessor;
import org.eclipse.jetty.reactive.client.internal.QueuedSinglePublisher;
import org.eclipse.jetty.util.thread.AutoLock;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResponseListenerProcessor<T>
extends AbstractSingleProcessor<T, T>
implements Response.Listener {
    private static final Logger logger = LoggerFactory.getLogger(ResponseListenerProcessor.class);
    private final ContentPublisher content = new ContentPublisher();
    private final ReactiveRequest request;
    private final BiFunction<ReactiveResponse, Publisher<Content.Chunk>, Publisher<T>> contentFn;
    private final boolean abortOnCancel;
    private boolean requestSent;
    private boolean responseReceived;

    public ResponseListenerProcessor(ReactiveRequest request, BiFunction<ReactiveResponse, Publisher<Content.Chunk>, Publisher<T>> contentFn, boolean abortOnCancel) {
        this.request = request;
        this.contentFn = contentFn;
        this.abortOnCancel = abortOnCancel;
    }

    public void onBegin(Response response) {
    }

    public boolean onHeader(Response response, HttpField field) {
        return true;
    }

    public void onHeaders(Response response) {
        if (logger.isDebugEnabled()) {
            logger.debug("received response headers {} on {}", (Object)response, (Object)this);
        }
        this.responseReceived = true;
        Publisher<T> publisher = this.contentFn.apply(this.request.getReactiveResponse(), this.content);
        publisher.subscribe((Subscriber)this);
    }

    public void onContent(Response response, ByteBuffer content) {
    }

    public void onContent(Response response, Content.Chunk chunk, Runnable demander) {
    }

    public void onContentSource(Response response, Content.Source source) {
        if (logger.isDebugEnabled()) {
            logger.debug("received response content source {} {} on {}", new Object[]{response, source, this});
        }
        this.content.accept(source);
    }

    public void onSuccess(Response response) {
        if (logger.isDebugEnabled()) {
            logger.debug("response complete {} on {}", (Object)response, (Object)this);
        }
    }

    public void onFailure(Response response, Throwable failure) {
        if (logger.isDebugEnabled()) {
            logger.debug("response failure {} on {}", new Object[]{response, this, failure});
        }
    }

    public void onComplete(Result result) {
        if (result.isSucceeded()) {
            this.content.complete();
        } else {
            Throwable failure = result.getFailure();
            if (!this.content.fail(failure) && !this.responseReceived) {
                this.onError(failure);
            }
        }
    }

    @Override
    protected void onRequest(Subscriber<? super T> subscriber, long n) {
        boolean send;
        try (AutoLock ignored = this.lock();){
            send = !this.requestSent;
            this.requestSent = true;
        }
        if (send) {
            this.send();
        }
        super.onRequest(subscriber, n);
    }

    public void onNext(T t) {
        this.downStreamOnNext(t);
    }

    private void send() {
        if (logger.isDebugEnabled()) {
            logger.debug("sending request {} from {}", (Object)this.request, (Object)this);
        }
        this.request.getRequest().send((Response.CompleteListener)this);
    }

    @Override
    public void cancel() {
        if (this.abortOnCancel) {
            this.request.getRequest().abort((Throwable)new CancellationException());
        }
        super.cancel();
    }

    @Override
    public String toString() {
        return String.format("%s@%x[%s]", this.getClass().getSimpleName(), this.hashCode(), this.request);
    }

    private static class ContentPublisher
    extends QueuedSinglePublisher<Content.Chunk> {
        private boolean initialDemand;
        private Content.Source source;

        private ContentPublisher() {
        }

        private void accept(Content.Source contentSource) {
            boolean demand;
            try (AutoLock ignored = this.lock();){
                this.source = contentSource;
                demand = this.initialDemand;
            }
            if (demand) {
                this.read(contentSource);
            }
        }

        @Override
        protected void onRequest(Subscriber<? super Content.Chunk> subscriber, long n) {
            Content.Source content;
            if (logger.isDebugEnabled()) {
                logger.debug("demand {} on {}", (Object)n, (Object)this);
            }
            super.onRequest(subscriber, n);
            try (AutoLock ignored = this.lock();){
                content = this.source;
                if (content == null) {
                    this.initialDemand = true;
                }
            }
            if (content != null) {
                this.read(content);
            }
        }

        private void read(Content.Source source) {
            Content.Chunk chunk = source.read();
            if (logger.isDebugEnabled()) {
                logger.debug("read {} from {} on {}", new Object[]{chunk, source, this});
            }
            if (chunk == null) {
                source.demand(() -> this.read(source));
                return;
            }
            if (Content.Chunk.isFailure((Content.Chunk)chunk)) {
                this.fail(chunk.getFailure());
                return;
            }
            if (chunk.hasRemaining()) {
                try {
                    this.offer(chunk);
                }
                catch (Throwable x) {
                    chunk.release();
                    this.fail(x);
                }
            }
        }
    }
}

