/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.protocol.http.server;

import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.events.Clock;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.HttpServerResponseImpl;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import io.reactivex.netty.protocol.http.server.events.HttpServerEventPublisher;
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

public class HttpConnectionHandler<I, O>
implements ConnectionHandler<HttpServerRequest<I>, Object> {
    private static final Logger logger = LoggerFactory.getLogger(HttpConnectionHandler.class);
    private final RequestHandler<I, O> requestHandler;
    private final HttpServerEventPublisher eventPublisher;
    private final boolean sendHttp10ResponseFor10Request;

    public HttpConnectionHandler(RequestHandler<I, O> requestHandler, HttpServerEventPublisher eventPublisher, boolean sendHttp10ResponseFor10Request) {
        this.requestHandler = requestHandler;
        this.eventPublisher = eventPublisher;
        this.sendHttp10ResponseFor10Request = sendHttp10ResponseFor10Request;
    }

    @Override
    public Observable<Void> handle(final Connection<HttpServerRequest<I>, Object> c) {
        return c.getInput().nest().concatMap(new Func1<Observable<HttpServerRequest<I>>, Observable<Void>>(){

            @Override
            public Observable<Void> call(Observable<HttpServerRequest<I>> reqSource) {
                return reqSource.take(1).flatMap(new Func1<HttpServerRequest<I>, Observable<Void>>(){

                    @Override
                    public Observable<Void> call(HttpServerRequest<I> req) {
                        long startNanos;
                        long l = startNanos = HttpConnectionHandler.this.eventPublisher.publishingEnabled() ? Clock.newStartTimeNanos() : -1L;
                        if (HttpConnectionHandler.this.eventPublisher.publishingEnabled()) {
                            HttpConnectionHandler.this.eventPublisher.onNewRequestReceived();
                        }
                        HttpServerResponse response = HttpConnectionHandler.this.newResponse(req, c);
                        return HttpConnectionHandler.this.handleRequest(req, startNanos, response, c);
                    }
                });
            }
        }).repeat().ambWith(c.closeListener());
    }

    private Observable<Void> handleRequest(HttpServerRequest<I> request, final long startTimeNanos, final HttpServerResponse<O> response, final Connection<HttpServerRequest<I>, Object> c) {
        Observable<Void> requestHandlingResult = null;
        try {
            if (request.decoderResult().isSuccess()) {
                requestHandlingResult = this.requestHandler.handle(request, response);
            }
            if (null == requestHandlingResult) {
                if (response.getStatus().equals(HttpResponseStatus.OK)) {
                    response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
                }
                requestHandlingResult = response.write(Observable.empty());
            }
        }
        catch (Throwable throwable) {
            logger.error("Unexpected error while invoking HTTP user handler.", throwable);
            requestHandlingResult = response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR).write(Observable.empty());
        }
        if (this.eventPublisher.publishingEnabled()) {
            requestHandlingResult = requestHandlingResult.lift(new Observable.Operator<Void, Void>(){

                @Override
                public Subscriber<? super Void> call(final Subscriber<? super Void> o) {
                    if (HttpConnectionHandler.this.eventPublisher.publishingEnabled()) {
                        HttpConnectionHandler.this.eventPublisher.onRequestHandlingStart(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS);
                    }
                    return new Subscriber<Void>(o){

                        @Override
                        public void onCompleted() {
                            if (HttpConnectionHandler.this.eventPublisher.publishingEnabled()) {
                                HttpConnectionHandler.this.eventPublisher.onRequestHandlingSuccess(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS);
                            }
                            o.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            if (HttpConnectionHandler.this.eventPublisher.publishingEnabled()) {
                                HttpConnectionHandler.this.eventPublisher.onRequestHandlingFailed(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS, e);
                            }
                            logger.error("Unexpected error processing a request.", e);
                            o.onError(e);
                        }

                        @Override
                        public void onNext(Void aVoid) {
                        }
                    };
                }
            });
        }
        return requestHandlingResult.onErrorResumeNext(new Func1<Throwable, Observable<Void>>(){

            @Override
            public Observable<Void> call(Throwable throwable) {
                logger.error("Unexpected error while processing request.", throwable);
                return response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR).dispose().concatWith(c.close()).onErrorResumeNext(Observable.empty());
            }
        }).concatWith(request.dispose()).concatWith(response.dispose());
    }

    private HttpServerResponse<O> newResponse(HttpServerRequest<I> request, Connection<HttpServerRequest<I>, Object> c) {
        DefaultHttpResponse responseHeaders;
        HttpVersion version;
        HttpVersion httpVersion = version = this.sendHttp10ResponseFor10Request ? request.getHttpVersion() : HttpVersion.HTTP_1_1;
        if (request.decoderResult().isFailure()) {
            responseHeaders = new DefaultHttpResponse(version, HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE);
            responseHeaders.headers().set((CharSequence)HttpHeaderNames.CONNECTION, (Object)HttpHeaderValues.CLOSE).set((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)0);
        } else {
            responseHeaders = new DefaultHttpResponse(version, HttpResponseStatus.OK);
        }
        HttpServerResponse response = HttpServerResponseImpl.create(request, c, responseHeaders);
        this.setConnectionHeader(request, response);
        return response;
    }

    private void setConnectionHeader(HttpServerRequest<I> request, HttpServerResponse<O> response) {
        if (request.isKeepAlive()) {
            if (!request.getHttpVersion().isKeepAliveDefault()) {
                response.setHeader((CharSequence)HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            }
        } else {
            response.setHeader((CharSequence)HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
        }
    }
}

