/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.session;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import org.joyqueue.broker.kafka.command.ProduceRequest;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.session.KafkaChannelTransport;
import org.joyqueue.network.transport.ChannelTransport;
import org.joyqueue.network.transport.RequestBarrier;
import org.joyqueue.network.transport.TransportHelper;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.support.DefaultCommandDispatcher;
import org.joyqueue.network.transport.handler.CommandInvocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class KafkaTransportHandler
extends ChannelDuplexHandler {
    protected static final Logger logger = LoggerFactory.getLogger(KafkaTransportHandler.class);
    private KafkaConfig config;

    public KafkaTransportHandler(KafkaConfig config) {
        this.config = config;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Command command;
        Channel channel = ctx.channel();
        RequestBarrier requestBarrier = null;
        ChannelTransport transport = TransportHelper.getTransport((Channel)channel);
        if (transport == null) {
            CommandInvocation commandInvocation;
            if (ctx.pipeline().last() instanceof CommandInvocation && (commandInvocation = (CommandInvocation)ctx.pipeline().last()).getCommandDispatcher() instanceof DefaultCommandDispatcher) {
                requestBarrier = ((DefaultCommandDispatcher)commandInvocation.getCommandDispatcher()).getRequestBarrier();
            }
            if (requestBarrier != null) {
                transport = TransportHelper.getOrNewTransport((Channel)channel, requestBarrier);
            }
        }
        if (!(transport instanceof KafkaChannelTransport)) {
            KafkaChannelTransport kafkaTransport = new KafkaChannelTransport(transport);
            transport = TransportHelper.compareAndSet((Channel)channel, (ChannelTransport)transport, (ChannelTransport)kafkaTransport) ? kafkaTransport : TransportHelper.getTransport((Channel)channel);
        }
        if (!((command = (Command)msg).getPayload() instanceof ProduceRequest) || ((ProduceRequest)command.getPayload()).getRequiredAcks() != 0) {
            ((KafkaChannelTransport)transport).acquire(command);
        }
        super.channelRead(ctx, msg);
    }
}

