/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.session;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.kafka.command.ApiVersionsRequest;
import org.joyqueue.broker.kafka.command.FetchRequest;
import org.joyqueue.broker.kafka.command.FindCoordinatorRequest;
import org.joyqueue.broker.kafka.command.ProduceRequest;
import org.joyqueue.broker.kafka.session.KafkaConnectionManager;
import org.joyqueue.network.session.Language;
import org.joyqueue.network.transport.ChannelTransport;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.TransportHelper;
import org.joyqueue.network.transport.command.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class KafkaConnectionHandler
extends ChannelDuplexHandler {
    protected static final Logger logger = LoggerFactory.getLogger(KafkaConnectionHandler.class);
    private KafkaConnectionManager kafkaConnectionManager;

    public KafkaConnectionHandler(KafkaConnectionManager kafkaConnectionManager) {
        this.kafkaConnectionManager = kafkaConnectionManager;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof Command) {
            this.connectionStatistic(ctx, (Command)msg);
        }
        super.channelRead(ctx, msg);
    }

    protected void connectionStatistic(ChannelHandlerContext ctx, Command command) {
        Channel channel = ctx.channel();
        Object payload = command.getPayload();
        ChannelTransport transport = TransportHelper.getTransport((Channel)channel);
        if (payload instanceof FetchRequest) {
            FetchRequest fetchRequest = (FetchRequest)payload;
            this.kafkaConnectionManager.addConnection((Transport)transport, fetchRequest.getClientId(), String.valueOf(fetchRequest.getVersion()));
            for (Map.Entry<String, List<FetchRequest.PartitionRequest>> entry : fetchRequest.getPartitionRequests().entrySet()) {
                this.kafkaConnectionManager.addConsumer((Transport)transport, entry.getKey());
            }
        } else if (payload instanceof ProduceRequest) {
            ProduceRequest produceRequest = (ProduceRequest)payload;
            this.kafkaConnectionManager.addConnection((Transport)transport, produceRequest.getClientId(), String.valueOf(produceRequest.getVersion()));
            for (Map.Entry<String, List<ProduceRequest.PartitionRequest>> entry : produceRequest.getPartitionRequests().entrySet()) {
                this.kafkaConnectionManager.addProducer((Transport)transport, entry.getKey());
            }
        } else if (!(payload instanceof FindCoordinatorRequest) && payload instanceof ApiVersionsRequest) {
            ApiVersionsRequest apiVersionsRequest = (ApiVersionsRequest)payload;
            if (StringUtils.isBlank((CharSequence)apiVersionsRequest.getClientSoftwareVersion())) {
                return;
            }
            String language = StringUtils.replace((String)apiVersionsRequest.getClientSoftwareName(), (String)"apache-kafka-", (String)"");
            String version = apiVersionsRequest.getClientSoftwareVersion();
            if (Language.parse((String)language).equals((Object)Language.OTHER)) {
                version = apiVersionsRequest.getClientSoftwareName() + "-" + apiVersionsRequest.getClientSoftwareVersion();
            }
            this.kafkaConnectionManager.addConnection((Transport)transport, apiVersionsRequest.getClientId(), version, Language.parse((String)language));
        }
    }
}

