/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.http.nio.netty.internal;

import com.typesafe.netty.http.StreamedHttpResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.util.AttributeKey;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKeys;
import software.amazon.awssdk.http.nio.netty.internal.RequestContext;
import software.amazon.awssdk.utils.FunctionalUtils;

@ChannelHandler.Sharable
class ResponseHandler
extends SimpleChannelInboundHandler<HttpObject> {
    private static final Logger log = LoggerFactory.getLogger(ResponseHandler.class);
    private static final AttributeKey<Boolean> KEEP_ALIVE = AttributeKey.newInstance((String)"KeepAlive");

    ResponseHandler() {
    }

    protected void channelRead0(ChannelHandlerContext channelContext, HttpObject msg) throws Exception {
        RequestContext requestContext = (RequestContext)channelContext.channel().attr(ChannelAttributeKeys.REQUEST_CONTEXT_KEY).get();
        if (msg instanceof HttpResponse) {
            HttpResponse response = (HttpResponse)msg;
            SdkHttpResponse sdkResponse = (SdkHttpResponse)SdkHttpFullResponse.builder().headers(ResponseHandler.fromNettyHeaders(response.headers())).statusCode(response.status().code()).statusText(response.status().reasonPhrase()).build();
            channelContext.channel().attr(KEEP_ALIVE).set((Object)HttpUtil.isKeepAlive((HttpMessage)response));
            requestContext.handler().headersReceived(sdkResponse);
        }
        if (msg instanceof StreamedHttpResponse) {
            requestContext.handler().onStream((Publisher)new PublisherAdapter((StreamedHttpResponse)msg, channelContext, requestContext));
        } else if (msg instanceof FullHttpResponse) {
            channelContext.read();
            ByteBuf fullContent = ((FullHttpResponse)msg).content();
            ByteBuffer bb = ResponseHandler.copyToByteBuffer(fullContent);
            fullContent.release();
            requestContext.handler().onStream((Publisher)new FullResponseContentPublisher(channelContext, bb));
            Subscriber subscriber = (Subscriber)channelContext.channel().attr(ChannelAttributeKeys.SUBSCRIBER_KEY).get();
            try {
                subscriber.onComplete();
                requestContext.handler().complete();
            }
            catch (RuntimeException e) {
                subscriber.onError((Throwable)e);
                requestContext.handler().exceptionOccurred((Throwable)e);
                throw e;
            }
            finally {
                ResponseHandler.finalizeRequest(requestContext, channelContext);
            }
        }
    }

    private static void finalizeRequest(RequestContext requestContext, ChannelHandlerContext channelContext) {
        channelContext.channel().attr(ChannelAttributeKeys.RESPONSE_COMPLETE_KEY).set((Object)true);
        if (!((Boolean)channelContext.channel().attr(KEEP_ALIVE).get()).booleanValue()) {
            ResponseHandler.closeAndRelease(channelContext);
        } else {
            requestContext.channelPool().release(channelContext.channel());
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        RequestContext requestContext = (RequestContext)ctx.channel().attr(ChannelAttributeKeys.REQUEST_CONTEXT_KEY).get();
        log.error("Exception processing request: {}", (Object)requestContext.sdkRequest(), (Object)cause);
        ResponseHandler.runAndLogError("SdkHttpResponseHandler threw an exception", () -> requestContext.handler().exceptionOccurred(cause));
        ResponseHandler.runAndLogError("Could not release channel back to the pool", () -> ResponseHandler.closeAndRelease(ctx));
    }

    public void channelInactive(ChannelHandlerContext handlerCtx) throws Exception {
        RequestContext requestCtx = (RequestContext)handlerCtx.channel().attr(ChannelAttributeKeys.REQUEST_CONTEXT_KEY).get();
        boolean responseCompleted = (Boolean)handlerCtx.channel().attr(ChannelAttributeKeys.RESPONSE_COMPLETE_KEY).get();
        if (!responseCompleted) {
            ResponseHandler.runAndLogError("SdkHttpResponseHandler threw an exception when calling exceptionOccurred", () -> requestCtx.handler().exceptionOccurred((Throwable)new IOException("Server failed to send complete response")));
            ResponseHandler.runAndLogError("Could not release channel", () -> requestCtx.channelPool().release(handlerCtx.channel()));
        }
    }

    private static void closeAndRelease(ChannelHandlerContext ctx) {
        RequestContext requestContext = (RequestContext)ctx.channel().attr(ChannelAttributeKeys.REQUEST_CONTEXT_KEY).get();
        ctx.channel().close().addListener(channelFuture -> requestContext.channelPool().release(ctx.channel()));
    }

    private static void runAndLogError(String errorMsg, FunctionalUtils.UnsafeRunnable runnable) {
        try {
            runnable.run();
        }
        catch (Exception e) {
            log.error(errorMsg, (Throwable)e);
        }
    }

    private static Map<String, List<String>> fromNettyHeaders(HttpHeaders headers) {
        return headers.entries().stream().collect(Collectors.groupingBy(Map.Entry::getKey, Collectors.mapping(Map.Entry::getValue, Collectors.toList())));
    }

    private static ByteBuffer copyToByteBuffer(ByteBuf byteBuf) {
        ByteBuffer bb = ByteBuffer.allocate(byteBuf.readableBytes());
        byteBuf.getBytes(byteBuf.readerIndex(), bb);
        bb.flip();
        return bb;
    }

    private static class FullResponseContentPublisher
    implements Publisher<ByteBuffer> {
        private final ChannelHandlerContext channelContext;
        private final ByteBuffer fullContent;

        FullResponseContentPublisher(ChannelHandlerContext channelContext, ByteBuffer fullContent) {
            this.channelContext = channelContext;
            this.fullContent = fullContent;
        }

        public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            subscriber.onNext((Object)this.fullContent);
            this.channelContext.channel().attr(ChannelAttributeKeys.SUBSCRIBER_KEY).set(subscriber);
        }
    }

    private static class PublisherAdapter
    implements Publisher<ByteBuffer> {
        private final StreamedHttpResponse response;
        private final ChannelHandlerContext channelContext;
        private final RequestContext requestContext;

        private PublisherAdapter(StreamedHttpResponse response, ChannelHandlerContext channelContext, RequestContext requestContext) {
            this.response = response;
            this.channelContext = channelContext;
            this.requestContext = requestContext;
        }

        public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
            this.response.subscribe((Subscriber)new Subscriber<HttpContent>(){

                public void onSubscribe(Subscription subscription) {
                    subscriber.onSubscribe(subscription);
                }

                public void onNext(HttpContent httpContent) {
                    ByteBuffer b = ResponseHandler.copyToByteBuffer(httpContent.content());
                    httpContent.release();
                    subscriber.onNext((Object)b);
                    channelContext.read();
                }

                public void onError(Throwable t) {
                    ResponseHandler.runAndLogError(String.format("Subscriber %s threw an exception in onError.", subscriber.toString()), () -> subscriber.onError(t));
                    requestContext.handler().exceptionOccurred(t);
                }

                public void onComplete() {
                    try {
                        ResponseHandler.runAndLogError(String.format("Subscriber %s threw an exception in onComplete.", subscriber.toString()), () -> ((Subscriber)subscriber).onComplete());
                        requestContext.handler().complete();
                    }
                    finally {
                        ResponseHandler.finalizeRequest(requestContext, channelContext);
                    }
                }
            });
        }
    }
}

