/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.handler;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ArrayUtils;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.kafka.KafkaAcknowledge;
import org.joyqueue.broker.kafka.KafkaCommandType;
import org.joyqueue.broker.kafka.KafkaContext;
import org.joyqueue.broker.kafka.KafkaContextAware;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.ProduceRequest;
import org.joyqueue.broker.kafka.command.ProduceResponse;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.converter.CheckResultConverter;
import org.joyqueue.broker.kafka.coordinator.transaction.ProducerSequenceManager;
import org.joyqueue.broker.kafka.handler.AbstractKafkaCommandHandler;
import org.joyqueue.broker.kafka.handler.ProduceHandler;
import org.joyqueue.broker.kafka.handler.TransactionProduceHandler;
import org.joyqueue.broker.kafka.helper.KafkaClientHelper;
import org.joyqueue.broker.kafka.message.KafkaBrokerMessage;
import org.joyqueue.broker.kafka.message.converter.KafkaMessageConverter;
import org.joyqueue.broker.kafka.model.ProducePartitionGroupRequest;
import org.joyqueue.broker.monitor.SessionManager;
import org.joyqueue.broker.network.traffic.Traffic;
import org.joyqueue.broker.producer.ProduceConfig;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Producer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.response.BooleanResponse;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.delay.AbstractDelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperationKey;
import org.joyqueue.toolkit.delay.DelayedOperationManager;
import org.joyqueue.toolkit.network.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@org.joyqueue.network.protocol.annotation.ProduceHandler
public class ProduceRequestHandler
extends AbstractKafkaCommandHandler
implements KafkaContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(ProduceRequestHandler.class);
    private ClusterManager clusterManager;
    private ProduceConfig produceConfig;
    private ProduceHandler produceHandler;
    private TransactionProduceHandler transactionProduceHandler;
    private ProducerSequenceManager producerSequenceManager;
    private SessionManager sessionManager;
    private KafkaConfig config;
    private DelayedOperationManager<DelayedOperation> delayPurgatory;

    @Override
    public void setKafkaContext(KafkaContext kafkaContext) {
        this.clusterManager = kafkaContext.getBrokerContext().getClusterManager();
        this.produceConfig = new ProduceConfig(kafkaContext.getBrokerContext().getPropertySupplier());
        this.produceHandler = new ProduceHandler(kafkaContext.getBrokerContext().getProduce());
        this.transactionProduceHandler = new TransactionProduceHandler(kafkaContext.getConfig(), kafkaContext.getBrokerContext().getProduce(), kafkaContext.getTransactionCoordinator(), kafkaContext.getTransactionIdManager());
        this.producerSequenceManager = kafkaContext.getProducerSequenceManager();
        this.sessionManager = kafkaContext.getBrokerContext().getSessionManager();
        this.config = kafkaContext.getConfig();
        this.delayPurgatory = new DelayedOperationManager("kafka-produce-delay");
        this.delayPurgatory.start();
    }

    public Command handle(final Transport transport, final Command request) {
        ProduceRequest produceRequest = (ProduceRequest)request.getPayload();
        KafkaAcknowledge kafkaAcknowledge = KafkaAcknowledge.valueOf(produceRequest.getRequiredAcks());
        QosLevel qosLevel = KafkaAcknowledge.convertToQosLevel(kafkaAcknowledge);
        String clientId = KafkaClientHelper.parseClient(produceRequest.getClientId());
        Map<String, List<ProduceRequest.PartitionRequest>> partitionRequestMap = produceRequest.getPartitionRequests();
        final HashMap partitionResponseMap = Maps.newHashMapWithExpectedSize((int)partitionRequestMap.size());
        final CountDownLatch latch = new CountDownLatch(produceRequest.getPartitionNum());
        final boolean isNeedAck = !qosLevel.equals((Object)QosLevel.ONE_WAY);
        String clientIp = ((InetSocketAddress)transport.remoteAddress()).getHostString();
        byte[] clientAddress = IpUtil.toByte((InetSocketAddress)((InetSocketAddress)transport.remoteAddress()));
        Connection connection = SessionHelper.getConnection((Transport)transport);
        final Traffic traffic = new Traffic(clientId);
        final boolean[] isNeedDelay = new boolean[]{false};
        final boolean[] singleTopic = new boolean[]{partitionRequestMap.size() == 1};
        for (Map.Entry<String, List<ProduceRequest.PartitionRequest>> partitionRequestEntry : partitionRequestMap.entrySet()) {
            boolean singleGroup;
            TopicName topic = TopicName.parse((String)partitionRequestEntry.getKey());
            HashMap partitionGroupRequestMap = Maps.newHashMap();
            final ArrayList partitionResponses = Lists.newArrayListWithCapacity((int)partitionRequestEntry.getValue().size());
            partitionResponseMap.put(topic.getFullName(), partitionResponses);
            String producerId = connection.getProducer(topic.getFullName(), clientId);
            Producer producer = this.sessionManager.getProducerById(producerId);
            TopicConfig topicConfig = this.clusterManager.getTopicConfig(topic);
            for (ProduceRequest.PartitionRequest partitionRequest : partitionRequestEntry.getValue()) {
                if (producer == null) {
                    this.buildPartitionResponse(partitionRequest.getPartition(), null, KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode(), partitionRequest.getMessages(), partitionResponses);
                    latch.countDown();
                    isNeedDelay[0] = true;
                    continue;
                }
                short checkCode = this.checkPartitionRequest(transport, produceRequest, partitionRequest, topic, producer, clientIp);
                if (checkCode != KafkaErrorCode.NONE.getCode()) {
                    this.buildPartitionResponse(partitionRequest.getPartition(), null, checkCode, partitionRequest.getMessages(), partitionResponses);
                    latch.countDown();
                    isNeedDelay[0] = true;
                    continue;
                }
                this.splitByPartitionGroup(topicConfig, topic, producer, clientAddress, traffic, partitionRequest, partitionGroupRequestMap);
            }
            boolean bl = singleGroup = partitionGroupRequestMap.size() == 1;
            if (singleTopic[0] && !singleGroup) {
                singleTopic[0] = false;
            }
            if (singleTopic[0] && isNeedAck && partitionResponses.size() == partitionRequestEntry.getValue().size()) {
                return this.delayResponse(transport, request, this.generateResponse(traffic, partitionResponseMap));
            }
            for (final Map.Entry partitionGroupEntry : partitionGroupRequestMap.entrySet()) {
                EventListener<ProduceResponse.PartitionResponse> listener = new EventListener<ProduceResponse.PartitionResponse>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void onEvent(ProduceResponse.PartitionResponse produceResponse) {
                        List<Integer> partitions = ((ProducePartitionGroupRequest)partitionGroupEntry.getValue()).getPartitions();
                        List list = partitionResponses;
                        synchronized (list) {
                            for (Integer partition : partitions) {
                                partitionResponses.add(new ProduceResponse.PartitionResponse(partition, 0L, produceResponse.getErrorCode()));
                                latch.countDown();
                            }
                        }
                        if (produceResponse.getErrorCode() != KafkaErrorCode.NONE.getCode()) {
                            isNeedDelay[0] = true;
                        }
                        if (isNeedAck && singleTopic[0]) {
                            Command response = null;
                            response = isNeedDelay[0] ? ProduceRequestHandler.this.delayResponse(transport, request, ProduceRequestHandler.this.generateResponse(traffic, partitionResponseMap)) : ProduceRequestHandler.this.generateResponse(traffic, partitionResponseMap);
                            if (response != null) {
                                transport.acknowledge(request, response);
                            }
                        }
                    }
                };
                if (produceRequest.isTransaction()) {
                    this.transactionProduceHandler.produceMessage(produceRequest, produceRequest.getTransactionalId(), produceRequest.getProducerId(), produceRequest.getProducerEpoch(), qosLevel, producer, (ProducePartitionGroupRequest)partitionGroupEntry.getValue(), listener);
                    continue;
                }
                this.produceHandler.produceMessage(produceRequest, qosLevel, producer, (ProducePartitionGroupRequest)partitionGroupEntry.getValue(), listener);
            }
        }
        if (!isNeedAck || singleTopic[0]) {
            return null;
        }
        try {
            boolean isDone = latch.await(Math.min(produceRequest.getAckTimeoutMs(), this.config.getProduceTimeout()), TimeUnit.MILLISECONDS);
            if (!isDone) {
                isNeedDelay[0] = true;
                logger.warn("wait produce timeout, transport: {}, app: {}, topics: {}", new Object[]{transport.remoteAddress(), clientId, produceRequest.getPartitionRequests().keySet()});
            }
        }
        catch (InterruptedException e) {
            logger.error("wait produce exception, transport: {}, app: {}, topics: {}", new Object[]{transport.remoteAddress(), clientId, produceRequest.getPartitionRequests().keySet(), e});
        }
        Command response = this.generateResponse(traffic, partitionResponseMap);
        if (isNeedDelay[0]) {
            return this.delayResponse(transport, request, response);
        }
        return response;
    }

    protected Command delayResponse(final Transport transport, final Command request, final Command response) {
        if (this.config.getProduceDelayEnable()) {
            return response;
        }
        this.delayPurgatory.tryCompleteElseWatch((DelayedOperation)new AbstractDelayedOperation(this.config.getProduceDelay()){

            protected void onComplete() {
                transport.acknowledge(request, response);
            }
        }, (Set)Sets.newHashSet((Object[])new Object[]{new DelayedOperationKey(new Object[0])}));
        return null;
    }

    protected Command generateResponse(Traffic traffic, Map<String, List<ProduceResponse.PartitionResponse>> partitionResponseMap) {
        ProduceResponse produceResponse = new ProduceResponse(traffic, partitionResponseMap);
        return new Command((Object)produceResponse);
    }

    protected short checkPartitionRequest(Transport transport, ProduceRequest produceRequest, ProduceRequest.PartitionRequest partitionRequest, TopicName topic, Producer producer, String clientIp) {
        short checkAndFillMessageResult = this.checkAndFillMessages(partitionRequest.getMessages());
        if (checkAndFillMessageResult != KafkaErrorCode.NONE.getCode()) {
            return checkAndFillMessageResult;
        }
        BooleanResponse checkResult = this.clusterManager.checkWritable(topic, producer.getApp(), clientIp, (short)partitionRequest.getPartition());
        if (!checkResult.isSuccess() && !checkResult.getJoyQueueCode().equals((Object)JoyQueueCode.FW_BROKER_NOT_WRITABLE)) {
            logger.warn("checkWritable failed, transport: {}, topic: {}, partition: {}, app: {}, code: {}", new Object[]{transport, topic, partitionRequest.getPartition(), producer.getApp(), checkResult.getJoyQueueCode()});
            return CheckResultConverter.convertProduceCode(checkResult.getJoyQueueCode());
        }
        int baseSequence = partitionRequest.getMessages().get(0).getBaseSequence();
        if (baseSequence != -1) {
            if (!this.producerSequenceManager.checkSequence(producer.getApp(), produceRequest.getProducerId(), produceRequest.getProducerEpoch(), partitionRequest.getPartition(), baseSequence)) {
                logger.warn("out of order sequence, topic: {}, app: {}, partition: {}, transactionId: {}, producerId: {}, producerEpoch: {}, sequence: {}", new Object[]{producer.getTopic(), producer.getApp(), partitionRequest.getPartition(), produceRequest.getTransactionalId(), produceRequest.getProducerId(), produceRequest.getProducerEpoch(), baseSequence});
                return KafkaErrorCode.OUT_OF_ORDER_SEQUENCE_NUMBER.getCode();
            }
            this.producerSequenceManager.updateSequence(producer.getApp(), produceRequest.getProducerId(), produceRequest.getProducerEpoch(), partitionRequest.getPartition(), baseSequence);
        }
        return KafkaErrorCode.NONE.getCode();
    }

    protected void splitByPartitionGroup(TopicConfig topicConfig, TopicName topic, Producer producer, byte[] clientAddress, Traffic traffic, ProduceRequest.PartitionRequest partitionRequest, Map<Integer, ProducePartitionGroupRequest> partitionGroupRequestMap) {
        PartitionGroup partitionGroup = topicConfig.fetchPartitionGroupByPartition((short)partitionRequest.getPartition());
        ProducePartitionGroupRequest producePartitionGroupRequest = partitionGroupRequestMap.get(partitionGroup.getGroup());
        if (producePartitionGroupRequest == null) {
            producePartitionGroupRequest = new ProducePartitionGroupRequest(Lists.newLinkedList(), Lists.newLinkedList(), Lists.newLinkedList(), Maps.newHashMap(), Maps.newHashMap());
            partitionGroupRequestMap.put(partitionGroup.getGroup(), producePartitionGroupRequest);
        }
        LinkedList brokerMessages = Lists.newLinkedList();
        for (KafkaBrokerMessage message : partitionRequest.getMessages()) {
            BrokerMessage brokerMessage = KafkaMessageConverter.toBrokerMessage(producer.getTopic(), partitionRequest.getPartition(), producer.getApp(), clientAddress, message);
            brokerMessages.add(brokerMessage);
        }
        traffic.record(topic.getFullName(), partitionRequest.getTraffic(), partitionRequest.getSize());
        producePartitionGroupRequest.getPartitions().add(partitionRequest.getPartition());
        producePartitionGroupRequest.getMessages().addAll(brokerMessages);
        producePartitionGroupRequest.getMessageMap().put(partitionRequest.getPartition(), brokerMessages);
        producePartitionGroupRequest.getKafkaMessages().addAll(partitionRequest.getMessages());
        producePartitionGroupRequest.getKafkaMessageMap().put(partitionRequest.getPartition(), partitionRequest.getMessages());
    }

    protected short checkAndFillMessages(List<KafkaBrokerMessage> messages) {
        for (KafkaBrokerMessage message : messages) {
            if (ArrayUtils.getLength((Object)message.getKey()) > this.produceConfig.getBusinessIdLength()) {
                return KafkaErrorCode.MESSAGE_TOO_LARGE.getCode();
            }
            if (ArrayUtils.getLength((Object)message.getValue()) <= this.produceConfig.getBodyLength()) continue;
            return KafkaErrorCode.MESSAGE_TOO_LARGE.getCode();
        }
        return KafkaErrorCode.NONE.getCode();
    }

    protected void buildPartitionResponse(int partition, long[] indices, short code, List<KafkaBrokerMessage> messages, List<ProduceResponse.PartitionResponse> partitionResponses) {
        if (ArrayUtils.isEmpty((long[])indices)) {
            partitionResponses.add(new ProduceResponse.PartitionResponse(partition, 0L, code));
        } else {
            partitionResponses.add(new ProduceResponse.PartitionResponse(partition, indices[0], code));
        }
    }

    public int type() {
        return KafkaCommandType.PRODUCE.getCode();
    }
}

