/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.network.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.BrokerContextAware;
import org.joyqueue.broker.kafka.KafkaContext;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.Coordinator;
import org.joyqueue.broker.kafka.coordinator.group.GroupBalanceHandler;
import org.joyqueue.broker.kafka.coordinator.group.GroupBalanceManager;
import org.joyqueue.broker.kafka.coordinator.group.GroupCoordinator;
import org.joyqueue.broker.kafka.coordinator.group.GroupMetadataManager;
import org.joyqueue.broker.kafka.coordinator.group.GroupOffsetHandler;
import org.joyqueue.broker.kafka.coordinator.group.GroupOffsetManager;
import org.joyqueue.broker.kafka.coordinator.transaction.ProducerIdManager;
import org.joyqueue.broker.kafka.coordinator.transaction.ProducerSequenceManager;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionCoordinator;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionHandler;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionIdManager;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionMetadataManager;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionOffsetHandler;
import org.joyqueue.broker.kafka.coordinator.transaction.completion.TransactionCompletionHandler;
import org.joyqueue.broker.kafka.coordinator.transaction.completion.TransactionCompletionScheduler;
import org.joyqueue.broker.kafka.coordinator.transaction.log.TransactionLog;
import org.joyqueue.broker.kafka.coordinator.transaction.synchronizer.TransactionSynchronizer;
import org.joyqueue.broker.kafka.manage.KafkaManageServiceFactory;
import org.joyqueue.broker.kafka.network.helper.KafkaProtocolHelper;
import org.joyqueue.broker.kafka.network.protocol.KafkaCodecFactory;
import org.joyqueue.broker.kafka.network.protocol.KafkaCommandHandlerFactory;
import org.joyqueue.broker.kafka.network.protocol.KafkaExceptionHandler;
import org.joyqueue.broker.kafka.session.KafkaConnectionHandler;
import org.joyqueue.broker.kafka.session.KafkaConnectionManager;
import org.joyqueue.broker.kafka.session.KafkaTransportHandler;
import org.joyqueue.network.protocol.CommandHandlerProvider;
import org.joyqueue.network.protocol.ExceptionHandlerProvider;
import org.joyqueue.network.protocol.ProtocolService;
import org.joyqueue.network.transport.codec.CodecFactory;
import org.joyqueue.network.transport.command.handler.CommandHandlerFactory;
import org.joyqueue.network.transport.command.handler.ExceptionHandler;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProtocol
extends Service
implements ProtocolService,
BrokerContextAware,
CommandHandlerProvider,
ExceptionHandlerProvider {
    protected static final Logger logger = LoggerFactory.getLogger(KafkaProtocol.class);
    private KafkaConfig config;
    private Coordinator coordinator;
    private GroupMetadataManager groupMetadataManager;
    private GroupOffsetManager groupOffsetManager;
    private GroupBalanceManager groupBalanceManager;
    private GroupOffsetHandler groupOffsetHandler;
    private GroupBalanceHandler groupBalanceHandler;
    private GroupCoordinator groupCoordinator;
    private ProducerIdManager producerIdManager;
    private TransactionIdManager transactionIdManager;
    private ProducerSequenceManager producerSequenceManager;
    private TransactionMetadataManager transactionMetadataManager;
    private TransactionLog transactionLog;
    private TransactionSynchronizer transactionSynchronizer;
    private TransactionCompletionHandler transactionCompletionHandler;
    private TransactionCompletionScheduler transactionCompletionScheduler;
    private TransactionHandler transactionHandler;
    private TransactionOffsetHandler transactionOffsetHandler;
    private TransactionCoordinator transactionCoordinator;
    private KafkaConnectionManager connectionManager;
    private KafkaConnectionHandler connectionHandler;
    private KafkaTransportHandler transportHandler;
    private KafkaContext kafkaContext;

    public void setBrokerContext(BrokerContext brokerContext) {
        org.joyqueue.broker.coordinator.group.GroupMetadataManager groupMetadataManager = brokerContext.getCoordinatorService().getOrCreateGroupMetadataManager("kafka");
        org.joyqueue.broker.coordinator.transaction.TransactionMetadataManager transactionMetadataManager = brokerContext.getCoordinatorService().getOrCreateTransactionMetadataManager("kafka");
        this.config = new KafkaConfig(brokerContext.getPropertySupplier());
        this.coordinator = new Coordinator(brokerContext.getCoordinatorService().getCoordinator());
        this.groupMetadataManager = new GroupMetadataManager(this.config, groupMetadataManager);
        this.groupOffsetManager = new GroupOffsetManager(this.config, brokerContext.getClusterManager(), this.groupMetadataManager, this.coordinator.getSessionManager());
        this.groupBalanceManager = new GroupBalanceManager(this.config, this.groupMetadataManager);
        this.groupOffsetHandler = new GroupOffsetHandler(this.config, this.coordinator, this.groupMetadataManager, this.groupBalanceManager, this.groupOffsetManager);
        this.groupBalanceHandler = new GroupBalanceHandler(this.config, this.groupMetadataManager, this.groupBalanceManager);
        this.groupCoordinator = new GroupCoordinator(this.coordinator, this.groupBalanceHandler, this.groupOffsetHandler, this.groupMetadataManager);
        this.producerIdManager = new ProducerIdManager();
        this.transactionIdManager = new TransactionIdManager();
        this.producerSequenceManager = new ProducerSequenceManager(this.config);
        this.transactionMetadataManager = new TransactionMetadataManager(this.config, transactionMetadataManager);
        this.transactionLog = new TransactionLog(this.config, brokerContext.getProduce(), brokerContext.getConsume(), this.coordinator, brokerContext.getClusterManager());
        this.transactionSynchronizer = new TransactionSynchronizer(this.config, this.transactionIdManager, this.transactionLog, this.coordinator.getSessionManager(), brokerContext.getNameService());
        this.transactionCompletionHandler = new TransactionCompletionHandler(this.config, this.coordinator, this.transactionMetadataManager, this.transactionLog, this.transactionSynchronizer);
        this.transactionCompletionScheduler = new TransactionCompletionScheduler(this.config, this.transactionCompletionHandler);
        this.transactionHandler = new TransactionHandler(this.coordinator, this.transactionMetadataManager, this.producerIdManager, this.transactionSynchronizer, brokerContext.getNameService());
        this.transactionOffsetHandler = new TransactionOffsetHandler(this.coordinator, this.transactionMetadataManager, this.transactionSynchronizer);
        this.transactionCoordinator = new TransactionCoordinator(this.coordinator, this.transactionMetadataManager, this.transactionHandler, this.transactionOffsetHandler);
        this.connectionManager = new KafkaConnectionManager(brokerContext.getSessionManager());
        this.connectionHandler = new KafkaConnectionHandler(this.connectionManager);
        this.transportHandler = new KafkaTransportHandler(this.config);
        this.kafkaContext = new KafkaContext(this.config, this.groupCoordinator, this.transactionCoordinator, this.transactionIdManager, this.producerSequenceManager, brokerContext);
        this.registerManage(brokerContext, this.kafkaContext);
    }

    protected void registerManage(BrokerContext brokerContext, KafkaContext kafkaContext) {
        KafkaManageServiceFactory manageServiceFactory = new KafkaManageServiceFactory(brokerContext, kafkaContext);
        brokerContext.getBrokerManageService().registerService("kafkaManageService", (Object)manageServiceFactory.getKafkaManageService());
        brokerContext.getBrokerManageService().registerService("kafkaMonitorService", (Object)manageServiceFactory.getKafkaMonitorService());
    }

    public void doStart() throws Exception {
        this.groupOffsetManager.start();
        this.groupBalanceManager.start();
        this.groupOffsetHandler.start();
        this.groupBalanceHandler.start();
        this.groupCoordinator.start();
        this.transactionCoordinator.start();
        this.transactionLog.start();
        this.transactionSynchronizer.start();
        this.transactionHandler.start();
        this.transactionOffsetHandler.start();
        this.transactionCompletionHandler.start();
        this.transactionCompletionScheduler.start();
    }

    protected void doStop() {
        this.groupCoordinator.stop();
        this.groupOffsetManager.stop();
        this.groupBalanceManager.stop();
        this.groupOffsetHandler.stop();
        this.groupBalanceHandler.stop();
        this.transactionCompletionScheduler.stop();
        this.transactionCompletionHandler.stop();
        this.transactionOffsetHandler.stop();
        this.transactionHandler.stop();
        this.transactionSynchronizer.stop();
        this.transactionLog.stop();
        this.transactionCoordinator.stop();
    }

    public boolean isSupport(ByteBuf buffer) {
        return KafkaProtocolHelper.isSupport(buffer);
    }

    public CodecFactory createCodecFactory() {
        return new KafkaCodecFactory();
    }

    public CommandHandlerFactory createCommandHandlerFactory() {
        return new KafkaCommandHandlerFactory(this.kafkaContext);
    }

    public ChannelHandler getCommandHandler(final ChannelHandler channelHandler) {
        return new ChannelInitializer<Channel>(){

            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new ChannelHandler[]{KafkaProtocol.this.transportHandler}).addLast(new ChannelHandler[]{KafkaProtocol.this.connectionHandler}).addLast(new ChannelHandler[]{channelHandler});
            }
        };
    }

    public ExceptionHandler getExceptionHandler() {
        return new KafkaExceptionHandler();
    }

    public String type() {
        return "kafka";
    }
}

