/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.server.netty;

import io.micronaut.buffer.netty.NettyByteBufferFactory;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.DelayedSubscriber;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.core.io.buffer.ByteBuffer;
import io.micronaut.core.io.buffer.ByteBufferFactory;
import io.micronaut.core.propagation.PropagatedContext;
import io.micronaut.core.propagation.PropagatedContextElement;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.MutableHeaders;
import io.micronaut.http.ByteBodyHttpResponse;
import io.micronaut.http.ByteBodyHttpResponseWrapper;
import io.micronaut.http.HttpAttributes;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.CloseableByteBody;
import io.micronaut.http.body.MediaTypeProvider;
import io.micronaut.http.body.MessageBodyHandlerRegistry;
import io.micronaut.http.body.MessageBodyWriter;
import io.micronaut.http.body.ResponseBodyWriter;
import io.micronaut.http.body.ResponseBodyWriterWrapper;
import io.micronaut.http.codec.CodecException;
import io.micronaut.http.context.ServerHttpRequestContext;
import io.micronaut.http.context.event.HttpRequestReceivedEvent;
import io.micronaut.http.context.event.HttpRequestTerminatedEvent;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.http.netty.EventLoopFlow;
import io.micronaut.http.netty.NettyHttpResponseBuilder;
import io.micronaut.http.netty.NettyMutableHttpResponse;
import io.micronaut.http.netty.body.AvailableNettyByteBody;
import io.micronaut.http.netty.body.NettyBodyAdapter;
import io.micronaut.http.netty.body.NettyJsonHandler;
import io.micronaut.http.netty.stream.JsonSubscriber;
import io.micronaut.http.netty.stream.StreamedHttpResponse;
import io.micronaut.http.server.RouteExecutor;
import io.micronaut.http.server.binding.RequestArgumentSatisfier;
import io.micronaut.http.server.netty.NettyEmbeddedServices;
import io.micronaut.http.server.netty.NettyHttpRequest;
import io.micronaut.http.server.netty.NettyRequestLifecycle;
import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration;
import io.micronaut.http.server.netty.handler.OutboundAccess;
import io.micronaut.http.server.netty.handler.RequestHandler;
import io.micronaut.web.router.DefaultUrlRouteInfo;
import io.micronaut.web.router.RouteInfo;
import io.micronaut.web.router.resource.StaticResourceResolver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.compression.DecompressionException;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.OrderedEventExecutor;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import javax.net.ssl.SSLException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Internal
@ChannelHandler.Sharable
public final class RoutingInBoundHandler
implements RequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(RoutingInBoundHandler.class);
    private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile("^.*(?:connection (?:reset|closed|abort|broken)|broken pipe).*$", 2);
    final StaticResourceResolver staticResourceResolver;
    final NettyHttpServerConfiguration serverConfiguration;
    final RequestArgumentSatisfier requestArgumentSatisfier;
    final Supplier<ExecutorService> ioExecutorSupplier;
    final boolean multipartEnabled;
    final MessageBodyHandlerRegistry messageBodyHandlerRegistry;
    ExecutorService ioExecutor;
    final ApplicationEventPublisher<HttpRequestTerminatedEvent> terminateEventPublisher;
    final ApplicationEventPublisher<HttpRequestReceivedEvent> receivedPublisher;
    final RouteExecutor routeExecutor;
    final ConversionService conversionService;
    boolean supportLoggingHandler = false;

    RoutingInBoundHandler(NettyHttpServerConfiguration serverConfiguration, NettyEmbeddedServices embeddedServerContext, Supplier<ExecutorService> ioExecutor, ApplicationEventPublisher<HttpRequestTerminatedEvent> terminateEventPublisher, ApplicationEventPublisher<HttpRequestReceivedEvent> receivedPublisher, ConversionService conversionService) {
        this.staticResourceResolver = embeddedServerContext.getStaticResourceResolver();
        this.messageBodyHandlerRegistry = embeddedServerContext.getMessageBodyHandlerRegistry();
        this.ioExecutorSupplier = ioExecutor;
        this.requestArgumentSatisfier = embeddedServerContext.getRequestArgumentSatisfier();
        this.serverConfiguration = serverConfiguration;
        this.terminateEventPublisher = terminateEventPublisher;
        this.receivedPublisher = receivedPublisher;
        Optional isMultiPartEnabled = serverConfiguration.getMultipart().getEnabled();
        this.multipartEnabled = isMultiPartEnabled.isEmpty() || (Boolean)isMultiPartEnabled.get() != false;
        this.routeExecutor = embeddedServerContext.getRouteExecutor();
        this.conversionService = conversionService;
    }

    private void cleanupRequest(NettyHttpRequest<?> request) {
        try {
            request.release();
        }
        finally {
            block9: {
                if (!this.terminateEventPublisher.isEmpty()) {
                    try {
                        this.terminateEventPublisher.publishEvent((Object)new HttpRequestTerminatedEvent(request));
                    }
                    catch (Exception e) {
                        if (!LOG.isErrorEnabled()) break block9;
                        LOG.error("Error publishing request terminated event: {}", (Object)e.getMessage(), (Object)e);
                    }
                }
            }
        }
    }

    @Override
    public void responseWritten(Object attachment) {
        if (attachment != null) {
            this.cleanupRequest((NettyHttpRequest)((Object)attachment));
        }
    }

    @Override
    public void handleUnboundError(Throwable cause) {
        if (this.isIgnorable(cause)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Swallowed an IOException caused by client connectivity: {}", (Object)cause.getMessage(), (Object)cause);
            }
            return;
        }
        if (cause instanceof SSLException || cause.getCause() instanceof SSLException || cause instanceof DecompressionException) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Micronaut Server Error - No request state present. Cause: {}", (Object)cause.getMessage(), (Object)cause);
            }
        } else if (LOG.isErrorEnabled()) {
            LOG.error("Micronaut Server Error - No request state present. Cause: {}", (Object)cause.getMessage(), (Object)cause);
        }
    }

    @Override
    public void accept(ChannelHandlerContext ctx, io.netty.handler.codec.http.HttpRequest request, CloseableByteBody body, OutboundAccess outboundAccess) {
        NettyHttpRequest mnRequest = new NettyHttpRequest(request, body, ctx, this.conversionService, this.serverConfiguration);
        if (this.receivedPublisher != ApplicationEventPublisher.NO_OP) {
            this.receivedPublisher.publishEvent((Object)new HttpRequestReceivedEvent(mnRequest));
        }
        if (this.serverConfiguration.isValidateUrl()) {
            try {
                mnRequest.getUri();
            }
            catch (IllegalArgumentException e) {
                body.close();
                NettyHttpRequest errorRequest = new NettyHttpRequest((io.netty.handler.codec.http.HttpRequest)new DefaultHttpRequest(request.protocolVersion(), request.method(), "/"), (CloseableByteBody)AvailableNettyByteBody.empty(), ctx, this.conversionService, this.serverConfiguration);
                outboundAccess.attachment((Object)errorRequest);
                try (PropagatedContext.Scope ignore = PropagatedContext.getOrEmpty().plus((PropagatedContextElement)new ServerHttpRequestContext(errorRequest)).propagate();){
                    new NettyRequestLifecycle(this, outboundAccess).handleException(errorRequest, e.getCause() == null ? e : e.getCause());
                }
                return;
            }
        }
        if (this.supportLoggingHandler && ctx.pipeline().get("http-access-logger") != null) {
            AttributeKey key = AttributeKey.valueOf((String)NettyHttpRequest.class.getSimpleName());
            ctx.channel().attr(key).set(mnRequest);
        }
        outboundAccess.attachment((Object)mnRequest);
        try (PropagatedContext.Scope ignore = PropagatedContext.getOrEmpty().plus((PropagatedContextElement)new ServerHttpRequestContext(mnRequest)).propagate();){
            new NettyRequestLifecycle(this, outboundAccess).handleNormal(mnRequest);
        }
    }

    public void writeResponse(OutboundAccess outboundAccess, NettyHttpRequest<?> nettyHttpRequest, HttpResponse<?> response, Throwable throwable) {
        if (throwable != null) {
            response = this.routeExecutor.createDefaultErrorResponse(nettyHttpRequest, throwable);
        }
        if (response != null) {
            ExecutionFlow<ByteBodyHttpResponse<?>> finalResponse;
            try {
                finalResponse = this.encodeHttpResponse(nettyHttpRequest, (HttpResponse<?>)response, response.body());
            }
            catch (Throwable e) {
                try {
                    response = this.routeExecutor.createDefaultErrorResponse(nettyHttpRequest, e);
                    finalResponse = this.encodeHttpResponse(nettyHttpRequest, (HttpResponse<?>)response, response.body());
                }
                catch (Throwable f) {
                    f.addSuppressed(e);
                    finalResponse = ExecutionFlow.error((Throwable)f);
                    try {
                        outboundAccess.closeAfterWrite();
                        outboundAccess.write((io.netty.handler.codec.http.HttpResponse)new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR), (ByteBody)AvailableNettyByteBody.empty());
                    }
                    catch (Throwable g) {
                        f.addSuppressed(g);
                    }
                    LOG.warn("Failed to encode error response", f);
                }
            }
            finalResponse.onComplete((r, t) -> {
                ByteBodyHttpResponse encodedResponse = t != null ? ByteBodyHttpResponseWrapper.wrap((HttpResponse)HttpResponse.serverError(), (CloseableByteBody)AvailableNettyByteBody.empty()) : r;
                try (ByteBodyHttpResponse byteBodyHttpResponse = encodedResponse;){
                    this.closeConnectionIfError((HttpResponse<?>)encodedResponse, nettyHttpRequest, outboundAccess);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Response {} - {} {}", new Object[]{encodedResponse.code(), nettyHttpRequest.getMethodName(), nettyHttpRequest.getUri()});
                    }
                    io.netty.handler.codec.http.HttpResponse noBodyResponse = NettyMutableHttpResponse.toNoBodyResponse((HttpResponse)encodedResponse);
                    if (nettyHttpRequest.getMethod() == HttpMethod.HEAD) {
                        outboundAccess.writeHeadResponse((io.netty.handler.codec.http.HttpResponse)new DefaultHttpResponse(noBodyResponse.protocolVersion(), noBodyResponse.status(), noBodyResponse.headers()));
                    } else {
                        outboundAccess.write(noBodyResponse, encodedResponse.byteBody());
                    }
                }
                catch (Throwable u) {
                    if (t != null) {
                        u.addSuppressed((Throwable)t);
                    }
                    t = u;
                }
                if (t != null) {
                    LOG.warn("Failed to build error response", t);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ExecutorService getIoExecutor() {
        ExecutorService executor = this.ioExecutor;
        if (executor == null) {
            RoutingInBoundHandler routingInBoundHandler = this;
            synchronized (routingInBoundHandler) {
                executor = this.ioExecutor;
                if (executor == null) {
                    this.ioExecutor = executor = this.ioExecutorSupplier.get();
                }
            }
        }
        return executor;
    }

    private ExecutionFlow<ByteBodyHttpResponse<?>> encodeHttpResponse(NettyHttpRequest<?> nettyRequest, HttpResponse<?> httpResponse, Object body) {
        MutableHttpResponse response = httpResponse.toMutableResponse();
        if (nettyRequest.getMethod() != HttpMethod.HEAD && body != null) {
            MessageBodyWriter messageBodyWriter;
            RouteInfo routeInfo;
            Object routeInfoO = response.getAttribute((CharSequence)HttpAttributes.ROUTE_INFO).orElse(null);
            if (routeInfoO instanceof DefaultUrlRouteInfo) {
                DefaultUrlRouteInfo uri = routeInfoO;
                v0 = uri;
            } else {
                v0 = routeInfo = (RouteInfo)routeInfoO;
            }
            if (Publishers.isConvertibleToPublisher((Object)body)) {
                response.body(null);
                return this.writeStreamedWithErrorHandling(nettyRequest, (HttpResponse<?>)response, (Publisher<HttpContent>)this.mapToHttpContent(nettyRequest, response, body, (RouteInfo<Object>)routeInfo, nettyRequest.getChannelHandlerContext()));
            }
            Object o = response.getBodyWriter().orElse(null);
            if (o instanceof NettyJsonHandler) {
                NettyJsonHandler njh = o;
                messageBodyWriter = njh;
            } else {
                messageBodyWriter = o;
            }
            MessageBodyWriter messageBodyWriter2 = messageBodyWriter;
            MediaType responseMediaType = response.getContentType().orElse(null);
            Argument responseBodyType = routeInfo != null ? routeInfo.getResponseBodyType() : Argument.of(body.getClass());
            if (responseMediaType == null) {
                if (!(body instanceof String) && !(body instanceof byte[]) && body instanceof MediaTypeProvider) {
                    MediaTypeProvider mediaTypeProvider = (MediaTypeProvider)body;
                    responseMediaType = mediaTypeProvider.getMediaType();
                } else {
                    responseMediaType = routeInfo != null ? this.routeExecutor.resolveDefaultResponseContentType(nettyRequest, routeInfo) : MediaType.APPLICATION_JSON_TYPE;
                }
            }
            if (messageBodyWriter2 == null) {
                messageBodyWriter2 = this.messageBodyHandlerRegistry.findWriter(responseBodyType, Collections.singletonList(responseMediaType)).orElse(null);
            }
            if (messageBodyWriter2 == null || !responseBodyType.isInstance(body) || !messageBodyWriter2.isWriteable(responseBodyType, responseMediaType)) {
                responseBodyType = Argument.ofInstance((Object)body);
                messageBodyWriter2 = this.messageBodyHandlerRegistry.getWriter(responseBodyType, List.of(responseMediaType));
            }
            return this.buildFinalResponse(nettyRequest, response, responseBodyType, responseMediaType, body, messageBodyWriter2, false);
        }
        response.body(null);
        return this.writeFinalNettyResponse(response, nettyRequest);
    }

    private <T> ExecutionFlow<ByteBodyHttpResponse<?>> buildFinalResponse(NettyHttpRequest<?> nettyRequest, MutableHttpResponse<T> response, Argument<T> responseBodyType, MediaType mediaType, T body, MessageBodyWriter<T> messageBodyWriter, boolean onIoExecutor) {
        if (!onIoExecutor && messageBodyWriter.isBlocking()) {
            return ExecutionFlow.async((Executor)this.getIoExecutor(), () -> this.buildFinalResponse(nettyRequest, response, responseBodyType, mediaType, body, messageBodyWriter, true));
        }
        NettyByteBufferFactory bufferFactory = new NettyByteBufferFactory(nettyRequest.getChannelHandlerContext().alloc());
        try {
            return ExecutionFlow.just((Object)NettyResponseBodyWriterWrapper.wrap(messageBodyWriter).write((ByteBufferFactory)bufferFactory, nettyRequest, response, responseBodyType, mediaType, body));
        }
        catch (CodecException e) {
            MutableHttpResponse errorResponse = this.routeExecutor.createDefaultErrorResponse(nettyRequest, (Throwable)e);
            Object errorBody = errorResponse.body();
            Argument type = Argument.ofInstance((Object)errorBody);
            MediaType errorContentType = errorResponse.getContentType().orElse(MediaType.APPLICATION_JSON_TYPE);
            MessageBodyWriter errorBodyWriter = this.messageBodyHandlerRegistry.getWriter(type, List.of(errorContentType));
            if (!onIoExecutor && errorBodyWriter.isBlocking()) {
                return ExecutionFlow.async((Executor)this.getIoExecutor(), () -> ExecutionFlow.just((Object)NettyResponseBodyWriterWrapper.wrap(errorBodyWriter).write((ByteBufferFactory)bufferFactory, (HttpRequest)nettyRequest, errorResponse, type, errorContentType, errorBody)));
            }
            return ExecutionFlow.just((Object)NettyResponseBodyWriterWrapper.wrap(errorBodyWriter).write((ByteBufferFactory)bufferFactory, nettyRequest, errorResponse, type, errorContentType, errorBody));
        }
    }

    private Flux<HttpContent> mapToHttpContent(NettyHttpRequest<?> request, MutableHttpResponse<?> response, Object body, RouteInfo<Object> routeInfo, ChannelHandlerContext context) {
        Flux httpContentPublisher;
        MediaType mediaType = response.getContentType().orElse(null);
        NettyByteBufferFactory byteBufferFactory = new NettyByteBufferFactory(context.alloc());
        Flux bodyPublisher = Flux.from((Publisher)Publishers.convertToPublisher((ConversionService)this.conversionService, (Object)body));
        boolean isJson = false;
        if (routeInfo != null) {
            if (mediaType == null) {
                mediaType = this.routeExecutor.resolveDefaultResponseContentType(request, routeInfo);
            }
            isJson = mediaType != null && mediaType.getExtension().equals("json") && routeInfo.isResponseBodyJsonFormattable();
            MediaType finalMediaType = mediaType;
            httpContentPublisher = bodyPublisher.concatMap(message -> {
                MessageBodyWriter messageBodyWriter = routeInfo.getMessageBodyWriter();
                Argument responseBodyType = routeInfo.getResponseBodyType();
                if (messageBodyWriter == null || !responseBodyType.isInstance(message) || !messageBodyWriter.isWriteable(responseBodyType, finalMediaType)) {
                    responseBodyType = Argument.ofInstance((Object)message);
                    messageBodyWriter = ResponseBodyWriter.wrap((MessageBodyWriter)this.messageBodyHandlerRegistry.getWriter(responseBodyType, List.of(finalMediaType)));
                }
                return this.writeAsync((MessageBodyWriter)messageBodyWriter, (Argument)responseBodyType, finalMediaType, (Object)message, (MutableHeaders)response.getHeaders(), (ByteBufferFactory<?, ?>)byteBufferFactory);
            }).map(byteBuffer -> new DefaultHttpContent((ByteBuf)byteBuffer.asNativeBuffer()));
        } else {
            MediaType finalMediaType = mediaType;
            httpContentPublisher = bodyPublisher.concatMap(message -> {
                Argument type = Argument.ofInstance((Object)message);
                MessageBodyWriter messageBodyWriter = this.messageBodyHandlerRegistry.getWriter(type, finalMediaType == null ? List.of() : List.of(finalMediaType));
                return this.writeAsync((MessageBodyWriter)messageBodyWriter, (Argument)type, finalMediaType, (Object)message, (MutableHeaders)response.getHeaders(), (ByteBufferFactory<?, ?>)byteBufferFactory);
            }).map(byteBuffer -> new DefaultHttpContent((ByteBuf)byteBuffer.asNativeBuffer()));
        }
        if (isJson) {
            httpContentPublisher = JsonSubscriber.lift((Publisher)httpContentPublisher);
        }
        httpContentPublisher = httpContentPublisher.contextWrite(reactorContext -> reactorContext.put((Object)"micronaut.http.server.request", (Object)request));
        return httpContentPublisher;
    }

    private <T> Publisher<ByteBuffer<?>> writeAsync(@NonNull MessageBodyWriter<T> messageBodyWriter, @NonNull Argument<T> type, @NonNull MediaType mediaType, T object, @NonNull MutableHeaders outgoingHeaders, @NonNull ByteBufferFactory<?, ?> bufferFactory) {
        if (messageBodyWriter.isBlocking()) {
            return Mono.defer(() -> Mono.just((Object)messageBodyWriter.writeTo(type, mediaType, object, outgoingHeaders, bufferFactory))).subscribeOn(Schedulers.fromExecutor((Executor)this.ioExecutor));
        }
        return Mono.just((Object)messageBodyWriter.writeTo(type, mediaType, object, outgoingHeaders, bufferFactory));
    }

    private ExecutionFlow<ByteBodyHttpResponse<?>> writeFinalNettyResponse(MutableHttpResponse<?> message, NettyHttpRequest<?> request) {
        io.netty.handler.codec.http.HttpResponse nettyResponse = NettyHttpResponseBuilder.toHttpResponse(message);
        if (nettyResponse instanceof StreamedHttpResponse) {
            StreamedHttpResponse streamed = (StreamedHttpResponse)nettyResponse;
            return this.writeStreamedWithErrorHandling(request, (HttpResponse<?>)message, (Publisher<HttpContent>)streamed);
        }
        return ExecutionFlow.just((Object)ByteBodyHttpResponseWrapper.wrap(message, (CloseableByteBody)new AvailableNettyByteBody(((FullHttpResponse)nettyResponse).content())));
    }

    private ExecutionFlow<ByteBodyHttpResponse<?>> writeStreamedWithErrorHandling(NettyHttpRequest<?> request, HttpResponse<?> response, Publisher<HttpContent> streamed) {
        LazySendingSubscriber sub = new LazySendingSubscriber(request, response);
        streamed.subscribe((Subscriber)sub);
        return sub.output;
    }

    private void closeConnectionIfError(HttpResponse<?> message, HttpRequest<?> request, OutboundAccess outboundAccess) {
        NettyHttpRequest nettyRequest;
        boolean decodeError;
        boolean bl = decodeError = request instanceof NettyHttpRequest && (nettyRequest = (NettyHttpRequest)request).getNativeRequest().decoderResult().isFailure();
        if (decodeError || message.code() >= 500 && !this.serverConfiguration.isKeepAliveOnServerError()) {
            outboundAccess.closeAfterWrite();
        }
    }

    boolean isIgnorable(Throwable cause) {
        if (cause instanceof ClosedChannelException || cause.getCause() instanceof ClosedChannelException) {
            return true;
        }
        String message = cause.getMessage();
        return cause instanceof IOException && message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches();
    }

    private static final class NettyResponseBodyWriterWrapper<T>
    extends ResponseBodyWriterWrapper<T> {
        private NettyResponseBodyWriterWrapper(MessageBodyWriter<T> wrapped) {
            super(wrapped);
        }

        static <T> ResponseBodyWriter<T> wrap(MessageBodyWriter<T> mbw) {
            if (mbw instanceof ResponseBodyWriter) {
                ResponseBodyWriter rbw = (ResponseBodyWriter)mbw;
                return rbw;
            }
            return new NettyResponseBodyWriterWrapper<T>(mbw);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @NonNull
        public ByteBodyHttpResponse<?> write(@NonNull ByteBufferFactory<?, ?> bufferFactory, @NonNull HttpRequest<?> request, @NonNull MutableHttpResponse<T> httpResponse, @NonNull Argument<T> type, @NonNull MediaType mediaType, T object) throws CodecException {
            ByteBuf buf = (ByteBuf)((NettyByteBufferFactory)bufferFactory).buffer().asNativeBuffer();
            ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
            boolean release = true;
            try {
                this.writeTo(type, mediaType, object, (MutableHeaders)httpResponse.getHeaders(), (OutputStream)bbos);
                release = false;
                ByteBodyHttpResponse byteBodyHttpResponse = ByteBodyHttpResponseWrapper.wrap(httpResponse, (CloseableByteBody)new AvailableNettyByteBody(buf));
                return byteBodyHttpResponse;
            }
            finally {
                if (release) {
                    buf.release();
                }
            }
        }
    }

    private final class LazySendingSubscriber
    implements Subscriber<HttpContent>,
    Publisher<ByteBuf> {
        private static final Object COMPLETE = new Object();
        boolean headersSent = false;
        Subscription upstream;
        final DelayedSubscriber<ByteBuf> downstream = new DelayedSubscriber();
        @Nullable
        HttpContent first;
        Object completion = null;
        private final EventLoopFlow flow;
        private final NettyHttpRequest<?> request;
        private final HttpResponse<?> headers;
        private final DelayedExecutionFlow<ByteBodyHttpResponse<?>> output = DelayedExecutionFlow.create();

        private LazySendingSubscriber(NettyHttpRequest<?> request, HttpResponse<?> headers) {
            this.request = request;
            this.headers = headers;
            this.flow = new EventLoopFlow((OrderedEventExecutor)request.getChannelHandlerContext().channel().eventLoop());
        }

        public void subscribe(final Subscriber<? super ByteBuf> s) {
            this.downstream.onSubscribe(new Subscription(){

                public void request(long n) {
                    HttpContent first = LazySendingSubscriber.this.first;
                    if (first != null) {
                        LazySendingSubscriber.this.first = null;
                        s.onNext((Object)first.content());
                        if (LazySendingSubscriber.this.completion != null) {
                            if (LazySendingSubscriber.this.completion == COMPLETE) {
                                s.onComplete();
                            } else {
                                s.onError((Throwable)LazySendingSubscriber.this.completion);
                            }
                            return;
                        }
                        if (n != Long.MAX_VALUE && --n == 0L) {
                            return;
                        }
                    }
                    LazySendingSubscriber.this.upstream.request(n);
                }

                public void cancel() {
                    if (LazySendingSubscriber.this.first != null) {
                        LazySendingSubscriber.this.first.release();
                        LazySendingSubscriber.this.first = null;
                    }
                    LazySendingSubscriber.this.upstream.cancel();
                }
            });
            this.downstream.subscribe(s);
        }

        public void onSubscribe(Subscription s) {
            this.upstream = s;
            s.request(1L);
        }

        public void onNext(HttpContent httpContent) {
            if (this.flow.executeNow(() -> this.onNext0(httpContent))) {
                this.onNext0(httpContent);
            }
        }

        private void onNext0(HttpContent httpContent) {
            if (this.headersSent) {
                this.downstream.onNext((Object)httpContent.content());
            } else {
                this.first = httpContent;
                this.headersSent = true;
                this.output.complete((Object)ByteBodyHttpResponseWrapper.wrap(this.headers, (CloseableByteBody)NettyBodyAdapter.adapt((Publisher)this, (EventLoop)this.request.getChannelHandlerContext().channel().eventLoop())));
            }
        }

        public void onError(Throwable t) {
            if (this.flow.executeNow(() -> this.onError0(t))) {
                this.onError0(t);
            }
        }

        private void onError0(Throwable t) {
            if (this.headersSent) {
                if (this.first != null) {
                    this.completion = t;
                } else {
                    this.downstream.onError(t);
                }
            } else {
                MutableHttpResponse response;
                if (t instanceof HttpStatusException) {
                    HttpStatusException hse = (HttpStatusException)t;
                    response = HttpResponse.status((HttpStatus)hse.getStatus());
                    if (hse.getBody().isPresent()) {
                        response.body(hse.getBody().get());
                    } else if (hse.getMessage() != null) {
                        response.body((Object)hse.getMessage());
                    }
                } else {
                    response = RoutingInBoundHandler.this.routeExecutor.createDefaultErrorResponse(this.request, t);
                }
                this.output.completeFrom(RoutingInBoundHandler.this.encodeHttpResponse(this.request, (HttpResponse<?>)response, response.body()));
            }
        }

        public void onComplete() {
            if (this.flow.executeNow(this::onComplete0)) {
                this.onComplete0();
            }
        }

        private void onComplete0() {
            if (this.headersSent) {
                if (this.first != null) {
                    this.completion = COMPLETE;
                } else {
                    this.downstream.onComplete();
                }
            } else {
                this.headersSent = true;
                this.output.complete((Object)ByteBodyHttpResponseWrapper.wrap(this.headers, (CloseableByteBody)AvailableNettyByteBody.empty()));
            }
        }
    }
}

