/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.http.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.http.netty4.Netty4HttpChannel;
import org.elasticsearch.http.netty4.Netty4HttpRequest;
import org.elasticsearch.http.netty4.Netty4HttpResponse;
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.netty4.NettyAllocator;

public class Netty4HttpPipeliningHandler
extends ChannelDuplexHandler {
    private final Logger logger;
    private final int maxEventsHeld;
    private final PriorityQueue<Tuple<Netty4HttpResponse, ChannelPromise>> outboundHoldingQueue;
    private int readSequence;
    private int writeSequence;
    private final Netty4HttpServerTransport serverTransport;
    private static final String DO_NOT_SPLIT = "es.unsafe.do_not_split_http_responses";
    private static final boolean DO_NOT_SPLIT_HTTP_RESPONSES = Booleans.parseBoolean((String)System.getProperty("es.unsafe.do_not_split_http_responses"), (boolean)false);
    private static final int SPLIT_THRESHOLD = (int)((double)NettyAllocator.suggestedMaxAllocationSize() * 0.99);

    public Netty4HttpPipeliningHandler(Logger logger, int maxEventsHeld, Netty4HttpServerTransport serverTransport) {
        this.logger = logger;
        this.maxEventsHeld = maxEventsHeld;
        this.outboundHoldingQueue = new PriorityQueue<Tuple>(1, Comparator.comparingInt(t -> ((Netty4HttpResponse)((Object)((Object)t.v1()))).getSequence()));
        this.serverTransport = serverTransport;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        Netty4HttpRequest netty4HttpRequest;
        assert (msg instanceof FullHttpRequest) : "Should have fully aggregated message already but saw [" + msg + "]";
        FullHttpRequest fullHttpRequest = (FullHttpRequest)msg;
        if (fullHttpRequest.decoderResult().isFailure()) {
            Exception nonError;
            Throwable cause = fullHttpRequest.decoderResult().cause();
            if (cause instanceof Error) {
                ExceptionsHelper.maybeDieOnAnotherThread((Throwable)cause);
                nonError = new Exception(cause);
            } else {
                nonError = (Exception)cause;
            }
            netty4HttpRequest = new Netty4HttpRequest(this.readSequence++, fullHttpRequest, nonError);
        } else {
            netty4HttpRequest = new Netty4HttpRequest(this.readSequence++, fullHttpRequest);
        }
        this.handlePipelinedRequest(ctx, netty4HttpRequest);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) {
        Netty4HttpChannel channel = (Netty4HttpChannel)ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
        boolean success = false;
        assert (Transports.assertDefaultThreadContext((ThreadContext)this.serverTransport.getThreadPool().getThreadContext()));
        assert (Transports.assertTransportThread());
        try {
            this.serverTransport.incomingRequest(pipelinedRequest, channel);
            success = true;
        }
        finally {
            if (!success) {
                pipelinedRequest.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        assert (msg instanceof Netty4HttpResponse) : "Invalid message type: " + msg.getClass();
        boolean success = false;
        try {
            Netty4HttpResponse response = (Netty4HttpResponse)((Object)msg);
            if (response.getSequence() != this.writeSequence) {
                assert (response.getSequence() > this.writeSequence) : "response sequence [" + response.getSequence() + "] we below write sequence [" + this.writeSequence + "]";
                if (this.outboundHoldingQueue.size() >= this.maxEventsHeld) {
                    int eventCount = this.outboundHoldingQueue.size() + 1;
                    throw new IllegalStateException("Too many pipelined events [" + eventCount + "]. Max events allowed [" + this.maxEventsHeld + "].");
                }
                this.outboundHoldingQueue.add((Tuple<Netty4HttpResponse, ChannelPromise>)new Tuple((Object)response, (Object)promise));
                success = true;
                return;
            }
            this.doWrite(ctx, response, promise);
            success = true;
            while (!this.outboundHoldingQueue.isEmpty() && ((Netty4HttpResponse)((Object)this.outboundHoldingQueue.peek().v1())).getSequence() == this.writeSequence) {
                Tuple<Netty4HttpResponse, ChannelPromise> top = this.outboundHoldingQueue.poll();
                assert (top != null) : "we know the outbound holding queue to not be empty at this point";
                this.doWrite(ctx, (Netty4HttpResponse)((Object)top.v1()), (ChannelPromise)top.v2());
            }
        }
        catch (IllegalStateException e) {
            ctx.channel().close();
        }
        finally {
            if (!success) {
                promise.setFailure((Throwable)new ClosedChannelException());
            }
        }
    }

    private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse, ChannelPromise promise) {
        if (DO_NOT_SPLIT_HTTP_RESPONSES || readyResponse.content().readableBytes() <= SPLIT_THRESHOLD) {
            ctx.write((Object)readyResponse, promise);
        } else {
            this.splitAndWrite(ctx, readyResponse, promise);
        }
        ++this.writeSequence;
    }

    private void splitAndWrite(ChannelHandlerContext ctx, Netty4HttpResponse msg, ChannelPromise promise) {
        PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
        DefaultHttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers());
        combiner.add((Future)ctx.write((Object)response));
        ByteBuf content = msg.content();
        while (content.readableBytes() > SPLIT_THRESHOLD) {
            combiner.add((Future)ctx.write((Object)new DefaultHttpContent(content.readRetainedSlice(SPLIT_THRESHOLD))));
        }
        combiner.add((Future)ctx.write((Object)new DefaultLastHttpContent(content.readRetainedSlice(content.readableBytes()))));
        combiner.finish((Promise)promise);
    }

    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        List<Tuple<Netty4HttpResponse, ChannelPromise>> inflightResponses = this.removeAllInflightResponses();
        if (!inflightResponses.isEmpty()) {
            ClosedChannelException closedChannelException = new ClosedChannelException();
            for (Tuple<Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
                try {
                    ((ChannelPromise)inflightResponse.v2()).setFailure((Throwable)closedChannelException);
                }
                catch (RuntimeException e) {
                    this.logger.error("unexpected error while releasing pipelined http responses", (Throwable)e);
                }
            }
        }
        ctx.close(promise);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ExceptionsHelper.maybeDieOnAnotherThread((Throwable)cause);
        assert (Transports.assertDefaultThreadContext((ThreadContext)this.serverTransport.getThreadPool().getThreadContext()));
        Netty4HttpChannel channel = (Netty4HttpChannel)ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
        if (cause instanceof Error) {
            this.serverTransport.onException(channel, new Exception(cause));
        } else {
            this.serverTransport.onException(channel, (Exception)cause);
        }
    }

    private List<Tuple<Netty4HttpResponse, ChannelPromise>> removeAllInflightResponses() {
        ArrayList<Tuple<Netty4HttpResponse, ChannelPromise>> responses = new ArrayList<Tuple<Netty4HttpResponse, ChannelPromise>>(this.outboundHoldingQueue);
        this.outboundHoldingQueue.clear();
        return responses;
    }
}

