/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.handler;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.kafka.KafkaCommandType;
import org.joyqueue.broker.kafka.KafkaContext;
import org.joyqueue.broker.kafka.KafkaContextAware;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.TopicMetadataRequest;
import org.joyqueue.broker.kafka.command.TopicMetadataResponse;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.handler.AbstractKafkaCommandHandler;
import org.joyqueue.broker.kafka.helper.KafkaClientHelper;
import org.joyqueue.broker.kafka.model.KafkaBroker;
import org.joyqueue.broker.kafka.model.KafkaPartitionMetadata;
import org.joyqueue.broker.kafka.model.KafkaTopicMetadata;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.Partition;
import org.joyqueue.domain.Subscription;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.nsr.NameService;
import org.joyqueue.toolkit.delay.AbstractDelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperationKey;
import org.joyqueue.toolkit.delay.DelayedOperationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicMetadataRequestHandler
extends AbstractKafkaCommandHandler
implements KafkaContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(TopicMetadataRequestHandler.class);
    private NameService nameService;
    private KafkaConfig config;
    private DelayedOperationManager delayPurgatory;

    @Override
    public void setKafkaContext(KafkaContext kafkaContext) {
        this.nameService = kafkaContext.getBrokerContext().getNameService();
        this.config = kafkaContext.getConfig();
        this.delayPurgatory = new DelayedOperationManager("kafka-metadata-delayed");
        this.delayPurgatory.start();
    }

    public Command handle(final Transport transport, final Command command) {
        TopicMetadataRequest topicMetadataRequest = (TopicMetadataRequest)command.getPayload();
        String clientId = KafkaClientHelper.parseClient(topicMetadataRequest.getClientId());
        Map<String, TopicConfig> topicConfigs = null;
        topicConfigs = CollectionUtils.isEmpty(topicMetadataRequest.getTopics()) && StringUtils.isNotBlank((CharSequence)clientId) ? this.getAllTopicConfigs(clientId) : this.getTopicConfigs(topicMetadataRequest.getTopics());
        List<KafkaBroker> brokers = this.getTopicBrokers(topicConfigs);
        List<KafkaTopicMetadata> topicMetadata = this.getTopicMetadata(topicMetadataRequest.getTopics(), topicConfigs);
        TopicMetadataResponse topicMetadataResponse = new TopicMetadataResponse(topicMetadata, brokers);
        final Command response = new Command((Object)topicMetadataResponse);
        if (this.config.getLogDetail(clientId)) {
            logger.info("get topic metadata, transport: {}, app: {}, request: {}, response: {}", new Object[]{transport, clientId, topicMetadataRequest, topicMetadataResponse});
        }
        if (CollectionUtils.isEmpty(topicMetadata) && this.config.getMetadataDelayEnable()) {
            logger.info("get topic metadata, topics: {}, address: {}, metadata: {}, app: {}", new Object[]{topicMetadataRequest.getTopics(), transport.remoteAddress(), JSON.toJSONString(topicMetadata), topicMetadataRequest.getClientId()});
            this.delayPurgatory.tryCompleteElseWatch((DelayedOperation)new AbstractDelayedOperation(this.config.getMetadataDelay()){

                protected void onComplete() {
                    transport.acknowledge(command, response);
                }
            }, (Set)Sets.newHashSet((Object[])new DelayedOperationKey[]{new DelayedOperationKey(new Object[0])}));
            return null;
        }
        return response;
    }

    public int type() {
        return KafkaCommandType.METADATA.getCode();
    }

    protected Map<String, TopicConfig> getAllTopicConfigs(String clientId) {
        String[] appGroup = clientId.split("\\.");
        Map consumers = this.nameService.getTopicConfigByApp(clientId, Subscription.Type.CONSUMPTION);
        Map producers = this.nameService.getTopicConfigByApp(appGroup[0], Subscription.Type.PRODUCTION);
        HashMap result = Maps.newHashMap();
        if (MapUtils.isNotEmpty((Map)consumers)) {
            for (Map.Entry entry : consumers.entrySet()) {
                result.put(((TopicName)entry.getKey()).getFullName(), entry.getValue());
            }
        }
        if (MapUtils.isNotEmpty((Map)producers)) {
            for (Map.Entry entry : producers.entrySet()) {
                result.put(((TopicName)entry.getKey()).getFullName(), entry.getValue());
            }
        }
        return result;
    }

    protected Map<String, TopicConfig> getTopicConfigs(List<String> topics) {
        HashMap result = Maps.newHashMap();
        for (String topic : topics) {
            TopicConfig topicConfig = this.nameService.getTopicConfig(TopicName.parse((String)topic));
            if (topicConfig == null) continue;
            result.put(topic, topicConfig);
        }
        return result;
    }

    protected List<KafkaBroker> getTopicBrokers(Map<String, TopicConfig> topicConfigs) {
        HashSet result = Sets.newHashSet();
        for (Map.Entry<String, TopicConfig> topicEntry : topicConfigs.entrySet()) {
            for (Map.Entry entry : topicEntry.getValue().fetchAllBroker().entrySet()) {
                Broker broker = (Broker)entry.getValue();
                KafkaBroker kafkaBroker = new KafkaBroker(broker.getId(), broker.getIp(), broker.getPort());
                result.add(kafkaBroker);
            }
        }
        return Lists.newArrayList((Iterable)result);
    }

    protected List<KafkaTopicMetadata> getTopicMetadata(List<String> topics, Map<String, TopicConfig> topicConfigs) {
        LinkedList result = Lists.newLinkedList();
        if (CollectionUtils.isEmpty(topics)) {
            for (Map.Entry<String, TopicConfig> entry : topicConfigs.entrySet()) {
                List<KafkaPartitionMetadata> kafkaPartitionMetadata = this.getPartitionMetadata(entry.getValue());
                KafkaTopicMetadata kafkaTopicMetadata = new KafkaTopicMetadata(entry.getKey(), kafkaPartitionMetadata, KafkaErrorCode.NONE.getCode());
                result.add(kafkaTopicMetadata);
            }
        } else {
            for (String topic : topics) {
                TopicConfig topicConfig = topicConfigs.get(topic);
                if (topicConfig != null) {
                    List<KafkaPartitionMetadata> kafkaPartitionMetadata = this.getPartitionMetadata(topicConfig);
                    KafkaTopicMetadata kafkaTopicMetadata = new KafkaTopicMetadata(topic, kafkaPartitionMetadata, KafkaErrorCode.NONE.getCode());
                    result.add(kafkaTopicMetadata);
                    continue;
                }
                KafkaTopicMetadata kafkaTopicMetadata = new KafkaTopicMetadata(topic, Collections.emptyList(), KafkaErrorCode.TOPIC_AUTHORIZATION_FAILED.getCode());
                result.add(kafkaTopicMetadata);
            }
        }
        return result;
    }

    protected List<KafkaPartitionMetadata> getPartitionMetadata(TopicConfig topicConfig) {
        LinkedList result = Lists.newLinkedList();
        for (Partition partition : topicConfig.fetchPartitionMetadata()) {
            short errorCode = KafkaErrorCode.NONE.getCode();
            KafkaBroker leader = null;
            LinkedList replicas = Lists.newLinkedList();
            LinkedList isrs = Lists.newLinkedList();
            if (partition.getLeader() != null) {
                leader = new KafkaBroker(partition.getLeader().getId(), partition.getLeader().getIp(), partition.getLeader().getPort());
            } else {
                errorCode = KafkaErrorCode.LEADER_NOT_AVAILABLE.getCode();
            }
            if (CollectionUtils.isNotEmpty((Collection)partition.getReplicas())) {
                for (Broker replica : partition.getReplicas()) {
                    replicas.add(new KafkaBroker(replica.getId(), replica.getIp(), replica.getPort()));
                }
            }
            if (CollectionUtils.isNotEmpty((Collection)partition.getIsrs())) {
                for (Broker isr : partition.getIsrs()) {
                    isrs.add(new KafkaBroker(isr.getId(), isr.getIp(), isr.getPort()));
                }
            }
            KafkaPartitionMetadata kafkaPartitionMetadata = new KafkaPartitionMetadata(partition.getPartitionId(), leader, replicas, isrs, errorCode);
            result.add(kafkaPartitionMetadata);
        }
        return result;
    }
}

