/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.nifty.core;

import com.facebook.nifty.core.TNiftyTransport;
import com.facebook.nifty.core.ThriftServerDef;
import com.facebook.nifty.core.ThriftTransportType;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransport;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NiftyDispatcher
extends SimpleChannelUpstreamHandler {
    private static final Logger log = LoggerFactory.getLogger(NiftyDispatcher.class);
    private final TProcessorFactory processorFactory;
    private final TProtocolFactory inProtocolFactory;
    private final TProtocolFactory outProtocolFactory;
    private final Executor exe;
    private final int queuedResponseLimit;
    private final Map<Integer, ChannelBuffer> responseMap = new HashMap<Integer, ChannelBuffer>();
    private final AtomicInteger dispatcherSequenceId = new AtomicInteger(0);
    private final AtomicInteger lastResponseWrittenId = new AtomicInteger(0);

    public NiftyDispatcher(ThriftServerDef def) {
        this.processorFactory = def.getProcessorFactory();
        this.inProtocolFactory = def.getInProtocolFactory();
        this.outProtocolFactory = def.getOutProtocolFactory();
        this.queuedResponseLimit = def.getQueuedResponseLimit();
        this.exe = def.getExecutor();
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        if (e.getMessage() instanceof TNiftyTransport) {
            TNiftyTransport messageTransport = (TNiftyTransport)((Object)e.getMessage());
            this.processRequest(ctx, messageTransport);
        } else {
            ctx.sendUpstream((ChannelEvent)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processRequest(final ChannelHandlerContext ctx, final TNiftyTransport messageTransport) {
        final int requestSequenceId = this.dispatcherSequenceId.incrementAndGet();
        Map<Integer, ChannelBuffer> map = this.responseMap;
        synchronized (map) {
            if (requestSequenceId > this.lastResponseWrittenId.get() + this.queuedResponseLimit && !this.isChannelReadBlocked(ctx)) {
                this.blockChannelReads(ctx);
            }
        }
        this.exe.execute(new Runnable(){

            @Override
            public void run() {
                TProtocol inProtocol = NiftyDispatcher.this.inProtocolFactory.getProtocol((TTransport)messageTransport);
                TProtocol outProtocol = NiftyDispatcher.this.outProtocolFactory.getProtocol((TTransport)messageTransport);
                try {
                    NiftyDispatcher.this.processorFactory.getProcessor((TTransport)messageTransport).process(inProtocol, outProtocol);
                    NiftyDispatcher.this.writeResponse(ctx, messageTransport, requestSequenceId);
                }
                catch (TException e1) {
                    log.error("Exception while invoking!", (Throwable)e1);
                    NiftyDispatcher.this.closeChannel(ctx);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeResponse(ChannelHandlerContext ctx, TNiftyTransport messageTransport, int responseSequenceId) {
        Map<Integer, ChannelBuffer> map = this.responseMap;
        synchronized (map) {
            ChannelBuffer response = messageTransport.getOutputBuffer();
            ThriftTransportType transportType = messageTransport.getTransportType();
            int currentResponseId = this.lastResponseWrittenId.get() + 1;
            if (responseSequenceId != currentResponseId) {
                this.responseMap.put(responseSequenceId, response);
            } else {
                int lastRequestSequenceId;
                do {
                    response = this.addFraming(response, transportType);
                    Channels.write((Channel)ctx.getChannel(), (Object)response);
                    this.lastResponseWrittenId.incrementAndGet();
                } while (null != (response = this.responseMap.remove(++currentResponseId)));
                if (this.isChannelReadBlocked(ctx) && (lastRequestSequenceId = this.dispatcherSequenceId.get()) <= this.lastResponseWrittenId.get() + this.queuedResponseLimit) {
                    this.unblockChannelReads(ctx);
                }
            }
        }
    }

    private ChannelBuffer addFraming(ChannelBuffer response, ThriftTransportType transportType) {
        if (transportType == ThriftTransportType.UNFRAMED) {
            return response;
        }
        if (transportType == ThriftTransportType.FRAMED) {
            ChannelBuffer frameSizeBuffer = ChannelBuffers.buffer((int)4);
            frameSizeBuffer.writeInt(response.readableBytes());
            return ChannelBuffers.wrappedBuffer((ChannelBuffer[])new ChannelBuffer[]{frameSizeBuffer, response});
        }
        throw new UnsupportedOperationException("Header protocol is not supported");
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        this.closeChannel(ctx);
    }

    private void closeChannel(ChannelHandlerContext ctx) {
        if (ctx.getChannel().isOpen()) {
            ctx.getChannel().close();
        }
    }

    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        ctx.setAttachment((Object)ReadBlockedState.NOT_BLOCKED);
        super.channelOpen(ctx, e);
    }

    private boolean isChannelReadBlocked(ChannelHandlerContext ctx) {
        return ctx.getAttachment() == ReadBlockedState.BLOCKED;
    }

    private void blockChannelReads(ChannelHandlerContext ctx) {
        ctx.setAttachment((Object)ReadBlockedState.BLOCKED);
        ctx.getChannel().setReadable(false);
    }

    private void unblockChannelReads(ChannelHandlerContext ctx) {
        ctx.setAttachment((Object)ReadBlockedState.NOT_BLOCKED);
        ctx.getChannel().setReadable(true);
    }

    private static enum ReadBlockedState {
        NOT_BLOCKED,
        BLOCKED;

    }
}

