/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.http.nio.netty.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutException;
import io.netty.handler.timeout.WriteTimeoutHandler;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey;
import software.amazon.awssdk.http.nio.netty.internal.FutureCancelledException;
import software.amazon.awssdk.http.nio.netty.internal.LastHttpContentHandler;
import software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics;
import software.amazon.awssdk.http.nio.netty.internal.OneTimeReadTimeoutHandler;
import software.amazon.awssdk.http.nio.netty.internal.RequestAdapter;
import software.amazon.awssdk.http.nio.netty.internal.RequestContext;
import software.amazon.awssdk.http.nio.netty.internal.ResponseHandler;
import software.amazon.awssdk.http.nio.netty.internal.http2.FlushOnReadHandler;
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2StreamExceptionHandler;
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter;
import software.amazon.awssdk.http.nio.netty.internal.http2.HttpToHttp2OutboundAdapter;
import software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler;
import software.amazon.awssdk.http.nio.netty.internal.nrs.StreamedHttpRequest;
import software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils;
import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
import software.amazon.awssdk.metrics.MetricCollector;

@SdkInternalApi
public final class NettyRequestExecutor {
    private static final Logger log = LoggerFactory.getLogger(NettyRequestExecutor.class);
    private static final RequestAdapter REQUEST_ADAPTER_HTTP2 = new RequestAdapter(Protocol.HTTP2);
    private static final RequestAdapter REQUEST_ADAPTER_HTTP1_1 = new RequestAdapter(Protocol.HTTP1_1);
    private static final AtomicLong EXECUTION_COUNTER = new AtomicLong(0L);
    private final long executionId = EXECUTION_COUNTER.incrementAndGet();
    private final RequestContext context;
    private CompletableFuture<Void> executeFuture;
    private Channel channel;
    private RequestAdapter requestAdapter;

    public NettyRequestExecutor(RequestContext context) {
        this.context = context;
    }

    public CompletableFuture<Void> execute() {
        Promise channelFuture = this.context.eventLoopGroup().next().newPromise();
        this.executeFuture = this.createExecutionFuture((Promise<Channel>)channelFuture);
        this.context.channelPool().acquire(channelFuture);
        channelFuture.addListener(this::makeRequestListener);
        return this.executeFuture;
    }

    private CompletableFuture<Void> createExecutionFuture(Promise<Channel> channelPromise) {
        CompletableFuture<Void> metricsFuture = this.initiateMetricsCollection();
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        future.whenComplete((r, t) -> {
            this.verifyMetricsWereCollected(metricsFuture);
            if (t == null) {
                return;
            }
            if (!channelPromise.tryFailure(t)) {
                if (!channelPromise.isSuccess()) {
                    return;
                }
                Channel ch = (Channel)channelPromise.getNow();
                try {
                    ch.eventLoop().submit(() -> {
                        if (((Boolean)ch.attr(ChannelAttributeKey.IN_USE).get()).booleanValue()) {
                            ch.pipeline().fireExceptionCaught((Throwable)new FutureCancelledException(this.executionId, (Throwable)t));
                        } else {
                            ch.close().addListener(closeFuture -> this.context.channelPool().release(ch));
                        }
                    });
                }
                catch (Throwable exc) {
                    log.warn("Unable to add a task to cancel the request to channel's EventLoop", exc);
                }
            }
        });
        return future;
    }

    private CompletableFuture<Void> initiateMetricsCollection() {
        MetricCollector metricCollector = this.context.metricCollector();
        if (!NettyRequestMetrics.metricsAreEnabled(metricCollector)) {
            return null;
        }
        return this.context.channelPool().collectChannelPoolMetrics(metricCollector);
    }

    private void verifyMetricsWereCollected(CompletableFuture<Void> metricsFuture) {
        if (metricsFuture == null) {
            return;
        }
        if (!metricsFuture.isDone()) {
            log.debug("HTTP request metric collection did not finish in time, so results may be incomplete.");
            metricsFuture.cancel(false);
            return;
        }
        metricsFuture.exceptionally(t -> {
            log.debug("HTTP request metric collection failed, so results may be incomplete.", t);
            return null;
        });
    }

    private void makeRequestListener(Future<Channel> channelFuture) {
        if (channelFuture.isSuccess()) {
            this.channel = (Channel)channelFuture.getNow();
            NettyUtils.doInEventLoop((EventExecutor)this.channel.eventLoop(), () -> {
                this.configureChannel();
                if (this.tryConfigurePipeline()) {
                    this.makeRequest();
                }
            });
        } else {
            this.handleFailure(() -> "Failed to create connection to " + this.endpoint(), channelFuture.cause());
        }
    }

    private void configureChannel() {
        this.channel.attr(ChannelAttributeKey.EXECUTION_ID_KEY).set((Object)this.executionId);
        this.channel.attr(ChannelAttributeKey.EXECUTE_FUTURE_KEY).set(this.executeFuture);
        this.channel.attr(ChannelAttributeKey.REQUEST_CONTEXT_KEY).set((Object)this.context);
        this.channel.attr(ChannelAttributeKey.RESPONSE_COMPLETE_KEY).set((Object)false);
        this.channel.attr(ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY).set((Object)false);
        this.channel.attr(ChannelAttributeKey.IN_USE).set((Object)true);
        this.channel.config().setOption(ChannelOption.AUTO_READ, (Object)false);
    }

    private boolean tryConfigurePipeline() {
        Protocol protocol = ChannelAttributeKey.getProtocolNow(this.channel);
        ChannelPipeline pipeline = this.channel.pipeline();
        switch (protocol) {
            case HTTP2: {
                pipeline.addLast(new ChannelHandler[]{new Http2ToHttpInboundAdapter()});
                pipeline.addLast(new ChannelHandler[]{new HttpToHttp2OutboundAdapter()});
                pipeline.addLast(new ChannelHandler[]{Http2StreamExceptionHandler.create()});
                this.requestAdapter = REQUEST_ADAPTER_HTTP2;
                break;
            }
            case HTTP1_1: {
                this.requestAdapter = REQUEST_ADAPTER_HTTP1_1;
                break;
            }
            default: {
                String errorMsg = "Unknown protocol: " + protocol;
                this.closeAndRelease(this.channel);
                this.handleFailure(() -> errorMsg, new RuntimeException(errorMsg));
                return false;
            }
        }
        pipeline.addLast(new ChannelHandler[]{LastHttpContentHandler.create()});
        if (Protocol.HTTP2.equals((Object)protocol)) {
            pipeline.addLast(new ChannelHandler[]{FlushOnReadHandler.getInstance()});
        }
        pipeline.addLast(new ChannelHandler[]{new HttpStreamsClientHandler()});
        pipeline.addLast(new ChannelHandler[]{ResponseHandler.getInstance()});
        if (!this.channel.isActive()) {
            String errorMessage = "Channel was closed before it could be written to.";
            this.closeAndRelease(this.channel);
            this.handleFailure(() -> errorMessage, new IOException(errorMessage));
            return false;
        }
        return true;
    }

    private void makeRequest() {
        HttpRequest request = this.requestAdapter.adapt(this.context.executeRequest().request());
        this.writeRequest(request);
    }

    private void writeRequest(HttpRequest request) {
        this.channel.pipeline().addFirst(new ChannelHandler[]{new WriteTimeoutHandler((long)this.context.configuration().writeTimeoutMillis(), TimeUnit.MILLISECONDS)});
        StreamedRequest streamedRequest = new StreamedRequest(request, (Publisher<ByteBuffer>)this.context.executeRequest().requestContentPublisher());
        this.channel.writeAndFlush((Object)streamedRequest).addListener(wireCall -> {
            ChannelUtils.removeIfExists(this.channel.pipeline(), WriteTimeoutHandler.class);
            if (wireCall.isSuccess()) {
                NettyRequestMetrics.publishHttp2StreamMetrics(this.context.metricCollector(), this.channel);
                if (this.context.executeRequest().fullDuplex()) {
                    return;
                }
                this.channel.pipeline().addFirst(new ChannelHandler[]{new ReadTimeoutHandler((long)this.context.configuration().readTimeoutMillis(), TimeUnit.MILLISECONDS)});
                this.channel.read();
            } else {
                this.closeAndRelease(this.channel);
                this.handleFailure(() -> "Failed to make request to " + this.endpoint(), wireCall.cause());
            }
        });
        if (this.shouldExplicitlyTriggerRead()) {
            if (this.is100ContinueExpected()) {
                this.channel.pipeline().addFirst(new ChannelHandler[]{new OneTimeReadTimeoutHandler(Duration.ofMillis(this.context.configuration().readTimeoutMillis()))});
            } else {
                this.channel.pipeline().addFirst(new ChannelHandler[]{new ReadTimeoutHandler((long)this.context.configuration().readTimeoutMillis(), TimeUnit.MILLISECONDS)});
            }
            this.channel.read();
        }
    }

    private boolean shouldExplicitlyTriggerRead() {
        return this.context.executeRequest().fullDuplex() || this.is100ContinueExpected();
    }

    private boolean is100ContinueExpected() {
        return this.context.executeRequest().request().firstMatchingHeader("Expect").filter(b -> b.equalsIgnoreCase("100-continue")).isPresent();
    }

    private URI endpoint() {
        return this.context.executeRequest().request().getUri();
    }

    private void handleFailure(Supplier<String> msg, Throwable cause) {
        log.debug(msg.get(), cause);
        cause = this.decorateException(cause);
        this.context.handler().onError(cause);
        this.executeFuture.completeExceptionally(cause);
    }

    private Throwable decorateException(Throwable originalCause) {
        if (this.isAcquireTimeoutException(originalCause)) {
            return new Throwable(this.getMessageForAcquireTimeoutException(), originalCause);
        }
        if (this.isTooManyPendingAcquiresException(originalCause)) {
            return new Throwable(this.getMessageForTooManyAcquireOperationsError(), originalCause);
        }
        if (originalCause instanceof ReadTimeoutException) {
            return new IOException("Read timed out", originalCause);
        }
        if (originalCause instanceof WriteTimeoutException) {
            return new IOException("Write timed out", originalCause);
        }
        if (originalCause instanceof ClosedChannelException) {
            return new IOException("The channel was closed. This may have been done by the client (e.g. because the request was aborted), by the service (e.g. because there was a handshake error, the request took too long, or the client tried to write on a read-only socket), or by an intermediary party (e.g. because the channel was idle for too long).", originalCause);
        }
        return originalCause;
    }

    private boolean isAcquireTimeoutException(Throwable originalCause) {
        String message = originalCause.getMessage();
        return originalCause instanceof TimeoutException && message != null && message.contains("Acquire operation took longer");
    }

    private boolean isTooManyPendingAcquiresException(Throwable originalCause) {
        String message = originalCause.getMessage();
        return originalCause instanceof IllegalStateException && message != null && originalCause.getMessage().contains("Too many outstanding acquire operations");
    }

    private String getMessageForAcquireTimeoutException() {
        return "Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate.\nConsider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate.\nIncreasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout.\nIf the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests.";
    }

    private String getMessageForTooManyAcquireOperationsError() {
        return "Maximum pending connection acquisitions exceeded. The request rate is too high for the client to keep up.\nConsider taking any of the following actions to mitigate the issue: increase max connections, increase max pending acquire count, decrease pool lease timeout, or slowing the request rate.\nIncreasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the pending acquire count allows extra requests to be buffered by the client, but can cause additional request latency and higher memory usage. If your request latency or memory usage is already too high, decreasing the lease timeout will allow requests to fail more quickly, reducing the number of pending connection acquisitions, but likely won't decrease the total number of failed requests.\nIf the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests.";
    }

    private void closeAndRelease(Channel channel) {
        log.trace("closing and releasing channel {}", (Object)channel.id().asLongText());
        channel.attr(ChannelAttributeKey.KEEP_ALIVE).set((Object)false);
        channel.close();
        this.context.channelPool().release(channel);
    }

    private static class StreamedRequest
    extends DelegateHttpRequest
    implements StreamedHttpRequest {
        private final Publisher<ByteBuffer> publisher;
        private final Optional<Long> requestContentLength;
        private long written = 0L;
        private boolean done;
        private Subscription subscription;

        StreamedRequest(HttpRequest request, Publisher<ByteBuffer> publisher) {
            super(request);
            this.publisher = publisher;
            this.requestContentLength = StreamedRequest.contentLength(request);
        }

        public void subscribe(final Subscriber<? super HttpContent> subscriber) {
            this.publisher.subscribe((Subscriber)new Subscriber<ByteBuffer>(){

                public void onSubscribe(Subscription subscription) {
                    subscription = subscription;
                    subscriber.onSubscribe(subscription);
                }

                public void onNext(ByteBuffer contentBytes) {
                    if (done) {
                        return;
                    }
                    try {
                        int newLimit = this.clampedBufferLimit(contentBytes.remaining());
                        contentBytes.limit(newLimit);
                        ByteBuf contentByteBuf = Unpooled.wrappedBuffer((ByteBuffer)contentBytes);
                        DefaultHttpContent content = new DefaultHttpContent(contentByteBuf);
                        subscriber.onNext((Object)content);
                        written = written + (long)newLimit;
                        if (!this.shouldContinuePublishing()) {
                            done = true;
                            subscription.cancel();
                            subscriber.onComplete();
                        }
                    }
                    catch (Throwable t) {
                        this.onError(t);
                    }
                }

                public void onError(Throwable t) {
                    if (!done) {
                        done = true;
                        subscription.cancel();
                        subscriber.onError(t);
                    }
                }

                public void onComplete() {
                    if (!done) {
                        Long expectedContentLength = requestContentLength.orElse(null);
                        if (expectedContentLength != null && written < expectedContentLength) {
                            this.onError(new IllegalStateException("Request content was only " + written + " bytes, but the specified content-length was " + expectedContentLength + " bytes."));
                        } else {
                            done = true;
                            subscriber.onComplete();
                        }
                    }
                }
            });
        }

        private int clampedBufferLimit(int bufLen) {
            return this.requestContentLength.map(cl -> (int)Math.min(cl - this.written, (long)bufLen)).orElse(bufLen);
        }

        private boolean shouldContinuePublishing() {
            return this.requestContentLength.map(cl -> this.written < cl).orElse(true);
        }

        private static Optional<Long> contentLength(HttpRequest request) {
            String value = request.headers().get("Content-Length");
            if (value != null) {
                try {
                    return Optional.of(Long.parseLong(value));
                }
                catch (NumberFormatException e) {
                    log.warn("Unable  to parse 'Content-Length' header. Treating it as non existent.");
                }
            }
            return Optional.empty();
        }
    }

    static class DelegateHttpRequest
    implements HttpRequest {
        protected final HttpRequest request;

        DelegateHttpRequest(HttpRequest request) {
            this.request = request;
        }

        public HttpRequest setMethod(HttpMethod method) {
            this.request.setMethod(method);
            return this;
        }

        public HttpRequest setUri(String uri) {
            this.request.setUri(uri);
            return this;
        }

        public HttpMethod getMethod() {
            return this.request.method();
        }

        public HttpMethod method() {
            return this.request.method();
        }

        public String getUri() {
            return this.request.uri();
        }

        public String uri() {
            return this.request.uri();
        }

        public HttpVersion getProtocolVersion() {
            return this.request.protocolVersion();
        }

        public HttpVersion protocolVersion() {
            return this.request.protocolVersion();
        }

        public HttpRequest setProtocolVersion(HttpVersion version) {
            this.request.setProtocolVersion(version);
            return this;
        }

        public HttpHeaders headers() {
            return this.request.headers();
        }

        public DecoderResult getDecoderResult() {
            return this.request.decoderResult();
        }

        public DecoderResult decoderResult() {
            return this.request.decoderResult();
        }

        public void setDecoderResult(DecoderResult result) {
            this.request.setDecoderResult(result);
        }

        public String toString() {
            return this.getClass().getName() + "(" + this.request.toString() + ")";
        }
    }
}

