/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto;

import java.nio.channels.ClosedChannelException;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.runtime.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.pulsar.functions.runtime.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.pulsar.functions.runtime.shaded.io.netty.channel.group.ChannelGroup;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieProtoEncoding;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookieRequestHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger(BookieRequestHandler.class);
    private static final int DEFAULT_PENDING_RESPONSE_SIZE = 256;
    private final RequestProcessor requestProcessor;
    private final ChannelGroup allChannels;
    private ChannelHandlerContext ctx;
    private ByteBuf pendingSendResponses = null;
    private int maxPendingResponsesSize = 256;

    BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) {
        this.requestProcessor = processor;
        this.allChannels = allChannels;
    }

    public ChannelHandlerContext ctx() {
        return this.ctx;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel connected {}", (Object)ctx.channel());
        this.ctx = ctx;
        super.channelActive(ctx);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.allChannels.add(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channels disconnected: {}", (Object)ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof ClosedChannelException) {
            log.info("Client died before request could be completed on {}", (Object)ctx.channel(), (Object)cause);
            return;
        }
        log.error("Unhandled exception occurred in I/O thread or handler on {}", (Object)ctx.channel(), (Object)cause);
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof BookkeeperProtocol.Request) && !(msg instanceof BookieProtocol.Request)) {
            ctx.fireChannelRead(msg);
            return;
        }
        this.requestProcessor.processRequest(msg, this);
    }

    public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) {
        if (this.pendingSendResponses == null) {
            this.pendingSendResponses = this.ctx().alloc().directBuffer(this.maxPendingResponsesSize);
        }
        BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, this.pendingSendResponses);
    }

    public synchronized void flushPendingResponse() {
        if (this.pendingSendResponses != null) {
            this.maxPendingResponsesSize = (int)Math.max((double)this.maxPendingResponsesSize * 0.5 + 0.5 * (double)this.pendingSendResponses.readableBytes(), 256.0);
            if (this.ctx().channel().isActive()) {
                this.ctx().writeAndFlush(this.pendingSendResponses, this.ctx.voidPromise());
            } else {
                this.pendingSendResponses.release();
            }
            this.pendingSendResponses = null;
        }
    }
}

