/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.http.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObject;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.http.netty4.Netty4ChunkedHttpResponse;
import org.elasticsearch.http.netty4.Netty4FullHttpResponse;
import org.elasticsearch.http.netty4.Netty4HttpChannel;
import org.elasticsearch.http.netty4.Netty4HttpRequest;
import org.elasticsearch.http.netty4.Netty4HttpResponse;
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.transport.netty4.NettyAllocator;

public class Netty4HttpPipeliningHandler
extends ChannelDuplexHandler {
    private static final Logger logger = LogManager.getLogger(Netty4HttpPipeliningHandler.class);
    private final int maxEventsHeld;
    private final PriorityQueue<Tuple<? extends Netty4HttpResponse, ChannelPromise>> outboundHoldingQueue;
    @Nullable
    private ChunkedWrite currentChunkedWrite;
    private int readSequence;
    private int writeSequence;
    private final Queue<WriteOperation> queuedWrites = new ArrayDeque<WriteOperation>();
    private final Netty4HttpServerTransport serverTransport;
    private static final String DO_NOT_SPLIT = "es.unsafe.do_not_split_http_responses";
    private static final boolean DO_NOT_SPLIT_HTTP_RESPONSES = Booleans.parseBoolean((String)System.getProperty("es.unsafe.do_not_split_http_responses"), (boolean)false);
    private static final int SPLIT_THRESHOLD = (int)((double)NettyAllocator.suggestedMaxAllocationSize() * 0.99);

    public Netty4HttpPipeliningHandler(int maxEventsHeld, Netty4HttpServerTransport serverTransport) {
        this.maxEventsHeld = maxEventsHeld;
        this.outboundHoldingQueue = new PriorityQueue<Tuple>(1, Comparator.comparingInt(t -> ((Netty4HttpResponse)t.v1()).getSequence()));
        this.serverTransport = serverTransport;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        Netty4HttpRequest netty4HttpRequest;
        assert (msg instanceof FullHttpRequest) : "Should have fully aggregated message already but saw [" + msg + "]";
        FullHttpRequest fullHttpRequest = (FullHttpRequest)msg;
        if (fullHttpRequest.decoderResult().isFailure()) {
            Exception nonError;
            Throwable cause = fullHttpRequest.decoderResult().cause();
            if (cause instanceof Error) {
                ExceptionsHelper.maybeDieOnAnotherThread((Throwable)cause);
                nonError = new Exception(cause);
            } else {
                nonError = (Exception)cause;
            }
            netty4HttpRequest = new Netty4HttpRequest(this.readSequence++, fullHttpRequest, nonError);
        } else {
            netty4HttpRequest = new Netty4HttpRequest(this.readSequence++, fullHttpRequest);
        }
        this.handlePipelinedRequest(ctx, netty4HttpRequest);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) {
        Netty4HttpChannel channel = (Netty4HttpChannel)ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
        boolean success = false;
        assert (Transports.assertDefaultThreadContext((ThreadContext)this.serverTransport.getThreadPool().getThreadContext()));
        assert (Transports.assertTransportThread());
        try {
            this.serverTransport.incomingRequest(pipelinedRequest, channel);
            success = true;
        }
        finally {
            if (!success) {
                pipelinedRequest.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws IOException {
        assert (msg instanceof Netty4HttpResponse) : "Invalid message type: " + msg.getClass();
        boolean success = false;
        try {
            Netty4HttpResponse restResponse = (Netty4HttpResponse)msg;
            if (restResponse.getSequence() != this.writeSequence) {
                assert (restResponse.getSequence() > this.writeSequence) : "response sequence [" + restResponse.getSequence() + "] we below write sequence [" + this.writeSequence + "]";
                if (this.outboundHoldingQueue.size() >= this.maxEventsHeld) {
                    int eventCount = this.outboundHoldingQueue.size() + 1;
                    throw new IllegalStateException("Too many pipelined events [" + eventCount + "]. Max events allowed [" + this.maxEventsHeld + "].");
                }
                assert (this.outboundHoldingQueue.stream().noneMatch(t -> ((Netty4HttpResponse)t.v1()).getSequence() == this.writeSequence)) : "duplicate outbound entries for seqno " + this.writeSequence;
                this.outboundHoldingQueue.add((Tuple<? extends Netty4HttpResponse, ChannelPromise>)new Tuple((Object)restResponse, (Object)promise));
                success = true;
                return;
            }
            this.doWrite(ctx, restResponse, promise);
            success = true;
            this.doWriteQueued(ctx);
        }
        catch (IllegalStateException e) {
            ctx.channel().close();
        }
        finally {
            if (!success) {
                promise.setFailure((Throwable)new ClosedChannelException());
            }
        }
    }

    private void doWriteQueued(ChannelHandlerContext ctx) throws IOException {
        while (!this.outboundHoldingQueue.isEmpty() && ((Netty4HttpResponse)this.outboundHoldingQueue.peek().v1()).getSequence() == this.writeSequence) {
            Tuple<? extends Netty4HttpResponse, ChannelPromise> top = this.outboundHoldingQueue.poll();
            assert (top != null) : "we know the outbound holding queue to not be empty at this point";
            this.doWrite(ctx, (Netty4HttpResponse)top.v1(), (ChannelPromise)top.v2());
        }
    }

    private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse, ChannelPromise promise) throws IOException {
        assert (this.currentChunkedWrite == null) : "unexpected existing write [" + this.currentChunkedWrite + "]";
        assert (readyResponse != null) : "cannot write null response";
        assert (readyResponse.getSequence() == this.writeSequence);
        if (readyResponse instanceof Netty4FullHttpResponse) {
            Netty4FullHttpResponse fullResponse = (Netty4FullHttpResponse)readyResponse;
            this.doWriteFullResponse(ctx, fullResponse, promise);
        } else if (readyResponse instanceof Netty4ChunkedHttpResponse) {
            Netty4ChunkedHttpResponse chunkedResponse = (Netty4ChunkedHttpResponse)readyResponse;
            this.doWriteChunkedResponse(ctx, chunkedResponse, promise);
        } else {
            assert (false) : readyResponse.getClass().getCanonicalName();
            throw new IllegalStateException("illegal message type: " + readyResponse.getClass().getCanonicalName());
        }
    }

    private void doWriteFullResponse(ChannelHandlerContext ctx, Netty4FullHttpResponse readyResponse, ChannelPromise promise) {
        if (DO_NOT_SPLIT_HTTP_RESPONSES || readyResponse.content().readableBytes() <= SPLIT_THRESHOLD) {
            this.enqueueWrite(ctx, (HttpObject)readyResponse, promise);
        } else {
            this.splitAndWrite(ctx, readyResponse, promise);
        }
        ++this.writeSequence;
    }

    private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttpResponse readyResponse, ChannelPromise promise) throws IOException {
        PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
        ChannelPromise first = ctx.newPromise();
        combiner.add((Future)first);
        ChunkedRestResponseBody responseBody = readyResponse.body();
        assert (this.currentChunkedWrite == null);
        this.currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody);
        if (this.enqueueWrite(ctx, (HttpObject)readyResponse, first)) {
            while (ctx.channel().isWritable()) {
                if (!this.writeChunk(ctx, combiner, responseBody)) continue;
                this.finishChunkedWrite();
                return;
            }
        }
    }

    private void finishChunkedWrite() {
        assert (this.currentChunkedWrite != null);
        assert (this.currentChunkedWrite.responseBody().isDone());
        ChunkedWrite finishingWrite = this.currentChunkedWrite;
        this.currentChunkedWrite = null;
        ++this.writeSequence;
        finishingWrite.combiner.finish((Promise)finishingWrite.onDone());
    }

    private void splitAndWrite(ChannelHandlerContext ctx, Netty4FullHttpResponse msg, ChannelPromise promise) {
        PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
        DefaultHttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers());
        combiner.add(this.enqueueWrite(ctx, (HttpObject)response));
        ByteBuf content = msg.content();
        while (content.readableBytes() > SPLIT_THRESHOLD) {
            combiner.add(this.enqueueWrite(ctx, (HttpObject)new DefaultHttpContent(content.readRetainedSlice(SPLIT_THRESHOLD))));
        }
        combiner.add(this.enqueueWrite(ctx, (HttpObject)new DefaultLastHttpContent(content.readRetainedSlice(content.readableBytes()))));
        combiner.finish((Promise)promise);
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws IOException {
        if (ctx.channel().isWritable()) {
            this.doFlush(ctx);
        }
        ctx.fireChannelWritabilityChanged();
    }

    public void flush(ChannelHandlerContext ctx) throws IOException {
        if (!this.doFlush(ctx)) {
            ctx.flush();
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.doFlush(ctx);
        super.channelInactive(ctx);
    }

    private boolean doFlush(ChannelHandlerContext ctx) throws IOException {
        assert (ctx.executor().inEventLoop());
        Channel channel = ctx.channel();
        if (!channel.isActive()) {
            this.failQueuedWrites();
            return false;
        }
        while (channel.isWritable()) {
            WriteOperation currentWrite = this.queuedWrites.poll();
            if (currentWrite == null) {
                this.doWriteQueued(ctx);
                if (!channel.isWritable()) break;
                currentWrite = this.queuedWrites.poll();
            }
            if (currentWrite == null) {
                if (this.currentChunkedWrite == null) break;
                if (!this.writeChunk(ctx, this.currentChunkedWrite.combiner, this.currentChunkedWrite.responseBody())) continue;
                this.finishChunkedWrite();
                continue;
            }
            ctx.write((Object)currentWrite.msg, currentWrite.promise);
        }
        ctx.flush();
        if (!channel.isActive()) {
            this.failQueuedWrites();
        }
        return true;
    }

    private boolean writeChunk(ChannelHandlerContext ctx, PromiseCombiner combiner, ChunkedRestResponseBody body) throws IOException {
        assert (!body.isDone()) : "should not continue to try and serialize once done";
        ReleasableBytesReference bytes = body.encodeChunk(262144, this.serverTransport.recycler());
        ByteBuf content = Netty4Utils.toByteBuf((BytesReference)bytes);
        boolean done = body.isDone();
        ChannelFuture f = ctx.write(done ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
        f.addListener(ignored -> bytes.close());
        combiner.add((Future)f);
        return done;
    }

    private void failQueuedWrites() {
        WriteOperation queuedWrite;
        while ((queuedWrite = this.queuedWrites.poll()) != null) {
            queuedWrite.failAsClosedChannel();
        }
        if (this.currentChunkedWrite != null) {
            this.safeFailPromise(this.currentChunkedWrite.onDone, new ClosedChannelException());
            this.currentChunkedWrite = null;
        }
    }

    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        List<Tuple<? extends Netty4HttpResponse, ChannelPromise>> inflightResponses;
        if (this.currentChunkedWrite != null) {
            this.safeFailPromise(this.currentChunkedWrite.onDone, new ClosedChannelException());
            this.currentChunkedWrite = null;
        }
        if (!(inflightResponses = this.removeAllInflightResponses()).isEmpty()) {
            ClosedChannelException closedChannelException = new ClosedChannelException();
            for (Tuple<? extends Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
                this.safeFailPromise((ChannelPromise)inflightResponse.v2(), closedChannelException);
            }
        }
        ctx.close(promise);
    }

    private void safeFailPromise(ChannelPromise promise, Exception ex) {
        try {
            promise.setFailure((Throwable)ex);
        }
        catch (RuntimeException e) {
            logger.error("unexpected error while releasing pipelined http responses", (Throwable)e);
        }
    }

    private Future<Void> enqueueWrite(ChannelHandlerContext ctx, HttpObject msg) {
        ChannelPromise p = ctx.newPromise();
        this.enqueueWrite(ctx, msg, p);
        return p;
    }

    private boolean enqueueWrite(ChannelHandlerContext ctx, HttpObject msg, ChannelPromise promise) {
        if (ctx.channel().isWritable() && this.queuedWrites.isEmpty()) {
            ctx.write((Object)msg, promise);
            return true;
        }
        this.queuedWrites.add(new WriteOperation(msg, promise));
        return false;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ExceptionsHelper.maybeDieOnAnotherThread((Throwable)cause);
        assert (Transports.assertDefaultThreadContext((ThreadContext)this.serverTransport.getThreadPool().getThreadContext()));
        Netty4HttpChannel channel = (Netty4HttpChannel)ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
        if (cause instanceof Error) {
            this.serverTransport.onException(channel, new Exception(cause));
        } else {
            this.serverTransport.onException(channel, (Exception)cause);
        }
    }

    private List<Tuple<? extends Netty4HttpResponse, ChannelPromise>> removeAllInflightResponses() {
        ArrayList<Tuple<? extends Netty4HttpResponse, ChannelPromise>> responses = new ArrayList<Tuple<? extends Netty4HttpResponse, ChannelPromise>>(this.outboundHoldingQueue);
        this.outboundHoldingQueue.clear();
        return responses;
    }

    private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBody responseBody) {
    }

    private record WriteOperation(HttpObject msg, ChannelPromise promise) {
        void failAsClosedChannel() {
            this.promise.tryFailure((Throwable)new ClosedChannelException());
            ReferenceCountUtil.release((Object)this.msg);
        }
    }
}

