/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.webclient;

import io.helidon.common.http.DataChunk;
import io.helidon.webclient.NettyChannel;
import io.helidon.webclient.WebClientException;
import io.helidon.webclient.WebClientRequestBuilderImpl;
import io.helidon.webclient.WebClientResponse;
import io.helidon.webclient.WebClientServiceRequest;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.logging.Logger;

class RequestContentSubscriber
implements Flow.Subscriber<DataChunk> {
    private static final Logger LOGGER = Logger.getLogger(RequestContentSubscriber.class.getName());
    private static final LastHttpContent LAST_HTTP_CONTENT = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
    private static final Set<HttpMethod> EMPTY_CONTENT_LENGTH = Set.of(HttpMethod.PUT, HttpMethod.POST);
    private final CompletableFuture<WebClientResponse> responseFuture;
    private final CompletableFuture<WebClientServiceRequest> sent;
    private final DefaultHttpRequest request;
    private final NettyChannel channel;
    private final long requestId;
    private final boolean allowChunkedEncoding;
    private volatile Flow.Subscription subscription;
    private volatile DataChunk firstDataChunk;
    private volatile boolean lengthOptimization = true;

    RequestContentSubscriber(DefaultHttpRequest request, Channel channel, CompletableFuture<WebClientResponse> responseFuture, CompletableFuture<WebClientServiceRequest> sent, boolean allowChunkedEncoding) {
        this.request = request;
        this.channel = new NettyChannel(channel);
        this.responseFuture = responseFuture;
        this.sent = sent;
        this.requestId = (Long)channel.attr(WebClientRequestBuilderImpl.REQUEST_ID).get();
        this.allowChunkedEncoding = allowChunkedEncoding;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1L);
        LOGGER.finest(() -> "(client reqID: " + this.requestId + ") Writing sending request and its content to the server.");
    }

    @Override
    public void onNext(DataChunk data) {
        if (data.isFlushChunk()) {
            this.channel.flush();
            return;
        }
        if (this.lengthOptimization && this.firstDataChunk == null) {
            this.firstDataChunk = data.isReadOnly() ? data : data.duplicate();
            this.subscription.request(1L);
            return;
        }
        if (null != this.firstDataChunk) {
            this.lengthOptimization = false;
            if (HttpUtil.isContentLengthSet((HttpMessage)this.request)) {
                if (this.allowChunkedEncoding) {
                    this.request.headers().set((CharSequence)HttpHeaderNames.TRANSFER_ENCODING, (Object)HttpHeaderValues.CHUNKED);
                }
            } else if (this.allowChunkedEncoding) {
                HttpUtil.setTransferEncodingChunked((HttpMessage)this.request, (boolean)true);
            } else if (HttpUtil.isKeepAlive((HttpMessage)this.request)) {
                throw new WebClientException("Chunked Transfer-Encoding is disabled. Content-Length or Connection: close, has to be set.");
            }
            this.channel.write(true, this.request);
            this.sendData(this.firstDataChunk);
            this.firstDataChunk = null;
        }
        this.sendData(data);
    }

    @Override
    public void onError(Throwable throwable) {
        this.responseFuture.completeExceptionally(throwable);
    }

    @Override
    public void onComplete() {
        if (this.lengthOptimization) {
            LOGGER.finest(() -> "(client reqID: " + this.requestId + ") Message body contains only one data chunk. Setting chunked encoding to false.");
            HttpUtil.setTransferEncodingChunked((HttpMessage)this.request, (boolean)false);
            if (!HttpUtil.isContentLengthSet((HttpMessage)this.request)) {
                if (this.firstDataChunk != null) {
                    HttpUtil.setContentLength((HttpMessage)this.request, (long)this.firstDataChunk.remaining());
                } else if (EMPTY_CONTENT_LENGTH.contains(this.request.method())) {
                    HttpUtil.setContentLength((HttpMessage)this.request, (long)0L);
                }
            } else if (HttpUtil.getContentLength((HttpMessage)this.request) == 0L && this.firstDataChunk != null) {
                HttpUtil.setContentLength((HttpMessage)this.request, (long)this.firstDataChunk.remaining());
            }
            this.channel.write(true, this.request);
            if (this.firstDataChunk != null) {
                this.sendData(this.firstDataChunk);
            }
        }
        WebClientServiceRequest serviceRequest = this.channel.serviceRequest();
        LOGGER.finest(() -> "(client reqID: " + this.requestId + ") Sending last http content");
        this.channel.write(true, LAST_HTTP_CONTENT, f -> f.addListener(this.completeOnFailureListener("(client reqID: " + this.requestId + ") An exception occurred when writing last http content.")).addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE).addListener(future -> {
            if (future.isSuccess()) {
                this.sent.complete(serviceRequest);
                LOGGER.finest(() -> "(client reqID: " + this.requestId + ") Request sent");
            }
        }));
    }

    private void sendData(DataChunk data) {
        LOGGER.finest(() -> "(client reqID: " + this.requestId + ") Sending data chunk");
        DefaultHttpContent httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer((ByteBuffer[])data.data()));
        this.channel.write(true, httpContent, f -> f.addListener(future -> {
            data.release();
            this.subscription.request(1L);
            LOGGER.finest(() -> "(client reqID: " + this.requestId + ") Data chunk sent with result: " + future.isSuccess());
        }).addListener(this.completeOnFailureListener("(client reqID: " + this.requestId + ") Failure when sending a content!")).addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE));
    }

    private GenericFutureListener<Future<? super Void>> completeOnFailureListener(String message) {
        return future -> {
            if (!future.isSuccess()) {
                Throwable cause = future.cause();
                if (this.channel.isConnectionReset()) {
                    this.completeRequestFuture(new IllegalStateException("(client reqID: " + this.requestId + ") Connection reset by the host", cause));
                } else {
                    this.completeRequestFuture(new IllegalStateException(message, cause));
                }
            }
        };
    }

    private void completeRequestFuture(Throwable throwable) {
        if (throwable != null) {
            this.responseFuture.completeExceptionally(throwable);
        }
    }
}

