/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.coordinator.group;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.joyqueue.broker.cluster.ClusterNameService;
import org.joyqueue.broker.index.command.ConsumeIndexQueryRequest;
import org.joyqueue.broker.index.command.ConsumeIndexQueryResponse;
import org.joyqueue.broker.index.command.ConsumeIndexStoreRequest;
import org.joyqueue.broker.index.command.ConsumeIndexStoreResponse;
import org.joyqueue.broker.index.model.IndexAndMetadata;
import org.joyqueue.broker.index.model.IndexMetadataAndError;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.group.GroupMetadataManager;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupMetadata;
import org.joyqueue.broker.kafka.model.OffsetAndMetadata;
import org.joyqueue.broker.kafka.model.OffsetMetadataAndError;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.network.transport.command.JoyQueueCommand;
import org.joyqueue.network.transport.command.JoyQueuePayload;
import org.joyqueue.network.transport.session.session.TransportSession;
import org.joyqueue.network.transport.session.session.TransportSessionManager;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupOffsetManager
extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(GroupOffsetManager.class);
    private KafkaConfig config;
    private ClusterNameService clusterNameService;
    private GroupMetadataManager groupMetadataManager;
    private TransportSessionManager sessionManager;

    public GroupOffsetManager(KafkaConfig config, ClusterNameService clusterNameService, GroupMetadataManager groupMetadataManager, TransportSessionManager sessionManager) {
        this.config = config;
        this.clusterNameService = clusterNameService;
        this.groupMetadataManager = groupMetadataManager;
        this.sessionManager = sessionManager;
    }

    public Map<String, List<OffsetMetadataAndError>> getOffsets(final String groupId, Map<String, List<Integer>> topicAndPartitions) {
        Map<Broker, Map<String, List<Integer>>> brokerTopicPartitionMap = this.splitPartitionByBroker(topicAndPartitions);
        final CountDownLatch latch = new CountDownLatch(brokerTopicPartitionMap.size());
        final HashMap result = Maps.newHashMapWithExpectedSize((int)topicAndPartitions.size());
        for (Map.Entry<Broker, Map<String, List<Integer>>> entry : brokerTopicPartitionMap.entrySet()) {
            final Broker broker = entry.getKey();
            try {
                TransportSession session = this.sessionManager.getOrCreateSession(broker);
                final ConsumeIndexQueryRequest indexQueryRequest = new ConsumeIndexQueryRequest(groupId, entry.getValue());
                JoyQueueCommand request = new JoyQueueCommand((JoyQueuePayload)indexQueryRequest);
                session.async((Command)request, this.config.getOffsetSyncTimeout(), new CommandCallback(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void onSuccess(Command request, Command response) {
                        Map map = result;
                        synchronized (map) {
                            ConsumeIndexQueryResponse payload = (ConsumeIndexQueryResponse)response.getPayload();
                            for (Map.Entry topicEntry : payload.getTopicPartitionIndex().entrySet()) {
                                String topic = (String)topicEntry.getKey();
                                List partitions = (List)result.get(topic);
                                if (partitions == null) {
                                    partitions = Lists.newLinkedList();
                                    result.put(topic, partitions);
                                }
                                for (Map.Entry partitionEntry : ((Map)topicEntry.getValue()).entrySet()) {
                                    IndexMetadataAndError indexMetadataAndError = (IndexMetadataAndError)partitionEntry.getValue();
                                    partitions.add(new OffsetMetadataAndError((Integer)partitionEntry.getKey(), indexMetadataAndError.getIndex(), indexMetadataAndError.getMetadata(), KafkaErrorCode.joyQueueCodeFor(indexMetadataAndError.getError())));
                                    if (((IndexMetadataAndError)partitionEntry.getValue()).getError() == JoyQueueCode.SUCCESS.getCode()) continue;
                                    logger.error("get offset error, broker: {}, topic: {}, partition: {}, group: {},code: {}", new Object[]{broker, topic, partitionEntry.getKey(), groupId, JoyQueueCode.valueOf((int)((IndexMetadataAndError)partitionEntry.getValue()).getError())});
                                }
                            }
                            latch.countDown();
                        }
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void onException(Command request, Throwable cause) {
                        logger.error("get offset failed, async transport exception, broker: {}, request: {}, group: {}", new Object[]{broker, indexQueryRequest, groupId, cause});
                        Map map = result;
                        synchronized (map) {
                            for (Map.Entry topicEntry : indexQueryRequest.getTopicPartitions().entrySet()) {
                                String topic = (String)topicEntry.getKey();
                                List partitions = (List)result.get(topic);
                                if (partitions == null) {
                                    partitions = Lists.newLinkedList();
                                    result.put(topic, partitions);
                                }
                                for (Integer partition : (List)topicEntry.getValue()) {
                                    partitions.add(new OffsetMetadataAndError(partition, -1L, "", KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode()));
                                }
                            }
                            latch.countDown();
                        }
                    }
                });
            }
            catch (Throwable cause) {
                logger.error("get offset failed, async transport exception, broker: {}, topic: {}, group: {}", new Object[]{broker, entry.getValue(), groupId, cause});
                latch.countDown();
            }
        }
        try {
            if (!latch.await(this.config.getOffsetSyncTimeout(), TimeUnit.MILLISECONDS)) {
                logger.error("get offset timeout, partitions: {}, group: {}", topicAndPartitions, (Object)groupId);
            }
        }
        catch (InterruptedException e) {
            logger.error("get offset latch await exception, group: {}, partitions: {}", new Object[]{groupId, topicAndPartitions, e});
        }
        this.fillErrorOffset(groupId, result);
        return result;
    }

    protected void fillErrorOffset(String groupId, Map<String, List<OffsetMetadataAndError>> result) {
        GroupMetadata groupMetadata = this.groupMetadataManager.getGroup(groupId);
        if (groupMetadata == null) {
            return;
        }
        for (Map.Entry<String, List<OffsetMetadataAndError>> entry : result.entrySet()) {
            String topic = entry.getKey();
            for (OffsetMetadataAndError offsetMetadataAndError : entry.getValue()) {
                if (offsetMetadataAndError.getError() == KafkaErrorCode.NONE.getCode()) {
                    groupMetadata.putOffsetCache(topic, offsetMetadataAndError.getPartition(), new OffsetAndMetadata(offsetMetadataAndError.getOffset(), (short)offsetMetadataAndError.getPartition()));
                    continue;
                }
                OffsetAndMetadata offsetCache = groupMetadata.getOffsetCache(topic, offsetMetadataAndError.getPartition());
                if (offsetCache == null) continue;
                logger.info("fill error offset, topic: {}, partition: {}, group: {}, offset: {}", new Object[]{topic, entry.getKey(), groupId, offsetCache});
                offsetMetadataAndError.setOffset(offsetCache.getOffset());
                offsetMetadataAndError.setMetadata(offsetCache.getMetadata());
                offsetMetadataAndError.setError(KafkaErrorCode.NONE.getCode());
            }
        }
    }

    public Map<String, List<OffsetMetadataAndError>> saveOffsets(final String groupId, Map<String, List<OffsetAndMetadata>> offsets) {
        Map<Broker, Map<String, List<OffsetAndMetadata>>> brokerTopicPartitionMap = this.splitOffsetByBroker(offsets);
        final CountDownLatch latch = new CountDownLatch(brokerTopicPartitionMap.size());
        final HashMap result = Maps.newHashMapWithExpectedSize((int)offsets.size());
        for (Map.Entry<Broker, Map<String, List<OffsetAndMetadata>>> entry : brokerTopicPartitionMap.entrySet()) {
            final Broker broker = entry.getKey();
            try {
                TransportSession session = this.sessionManager.getOrCreateSession(broker);
                final ConsumeIndexStoreRequest indexStoreRequest = new ConsumeIndexStoreRequest(groupId, this.buildSaveOffsetParam(entry.getValue()));
                JoyQueueCommand request = new JoyQueueCommand((JoyQueuePayload)indexStoreRequest);
                session.async((Command)request, this.config.getOffsetSyncTimeout(), new CommandCallback(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void onSuccess(Command request, Command response) {
                        Map map = result;
                        synchronized (map) {
                            ConsumeIndexStoreResponse payload = (ConsumeIndexStoreResponse)response.getPayload();
                            for (Map.Entry topicEntry : payload.getIndexStoreStatus().entrySet()) {
                                String topic = (String)topicEntry.getKey();
                                List partitions = (List)result.get(topic);
                                if (partitions == null) {
                                    partitions = Lists.newLinkedList();
                                    result.put(topic, partitions);
                                }
                                for (Map.Entry partitionEntry : ((Map)topicEntry.getValue()).entrySet()) {
                                    OffsetMetadataAndError offsetMetadataAndError = new OffsetMetadataAndError((Integer)partitionEntry.getKey(), -1L, "", KafkaErrorCode.joyQueueCodeFor(((Short)partitionEntry.getValue()).shortValue()));
                                    if (((Short)partitionEntry.getValue()).shortValue() != JoyQueueCode.SUCCESS.getCode()) {
                                        logger.error("save offset failed, broker: {}, topic: {}, partition: {}, group: {}, code: {}", new Object[]{broker, topic, partitionEntry.getKey(), groupId, JoyQueueCode.valueOf((int)((Short)partitionEntry.getValue()).shortValue())});
                                    }
                                    offsetMetadataAndError.setError(KafkaErrorCode.NONE.getCode());
                                    partitions.add(offsetMetadataAndError);
                                }
                            }
                            latch.countDown();
                        }
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void onException(Command request, Throwable cause) {
                        logger.error("save offset failed, async transport exception, broker: {}, request: {}, group: {}", new Object[]{broker, indexStoreRequest, groupId, cause});
                        Map map = result;
                        synchronized (map) {
                            for (Map.Entry topicEntry : indexStoreRequest.getIndexMetadata().entrySet()) {
                                String topic = (String)topicEntry.getKey();
                                List partitions = (List)result.get(topic);
                                if (partitions == null) {
                                    partitions = Lists.newLinkedList();
                                    result.put(topic, partitions);
                                }
                                for (Map.Entry partitionEntry : ((Map)topicEntry.getValue()).entrySet()) {
                                    partitions.add(new OffsetMetadataAndError((Integer)partitionEntry.getKey(), -1L, "", KafkaErrorCode.NONE.getCode()));
                                }
                            }
                            latch.countDown();
                        }
                    }
                });
            }
            catch (Throwable cause) {
                logger.error("save offset failed, async transport exception, broker: {}, topic: {}, group: {}", new Object[]{broker, brokerTopicPartitionMap, groupId, cause});
                latch.countDown();
            }
        }
        try {
            if (!latch.await(this.config.getOffsetSyncTimeout(), TimeUnit.MILLISECONDS)) {
                logger.error("save offset timeout, offsets: {}, group: {}", brokerTopicPartitionMap, (Object)groupId);
            }
        }
        catch (InterruptedException e) {
            logger.error("save offset latch await exception, group: {}, offsets: {}", new Object[]{groupId, offsets, e});
        }
        this.fillOffsetCache(groupId, offsets);
        return result;
    }

    protected void fillOffsetCache(String groupId, Map<String, List<OffsetAndMetadata>> result) {
        GroupMetadata groupMetadata = this.groupMetadataManager.getGroup(groupId);
        if (groupMetadata == null) {
            return;
        }
        for (Map.Entry<String, List<OffsetAndMetadata>> entry : result.entrySet()) {
            for (OffsetAndMetadata offsetAndMetadata : entry.getValue()) {
                groupMetadata.putOffsetCache(entry.getKey(), offsetAndMetadata.getPartition(), offsetAndMetadata);
            }
        }
    }

    protected Map<String, Map<Integer, IndexAndMetadata>> buildSaveOffsetParam(Map<String, List<OffsetAndMetadata>> topicAndPartitions) {
        HashMap result = Maps.newHashMapWithExpectedSize((int)topicAndPartitions.size());
        for (Map.Entry<String, List<OffsetAndMetadata>> entry : topicAndPartitions.entrySet()) {
            String topic = entry.getKey();
            HashMap partitions = Maps.newHashMapWithExpectedSize((int)entry.getValue().size());
            result.put(topic, partitions);
            for (OffsetAndMetadata offsetAndMetadata : entry.getValue()) {
                partitions.put(offsetAndMetadata.getPartition(), new IndexAndMetadata(offsetAndMetadata.getOffset(), null));
            }
        }
        return result;
    }

    protected Map<Broker, Map<String, List<OffsetAndMetadata>>> splitOffsetByBroker(Map<String, List<OffsetAndMetadata>> offsets) {
        HashMap result = Maps.newHashMapWithExpectedSize((int)offsets.size());
        for (Map.Entry<String, List<OffsetAndMetadata>> entry : offsets.entrySet()) {
            String topic = entry.getKey();
            TopicConfig topicConfig = this.clusterNameService.getNameService().getTopicConfig(TopicName.parse((String)topic));
            if (topicConfig == null) {
                logger.error("get leader failed, topic not exist, topic: {}", (Object)topic);
                continue;
            }
            for (OffsetAndMetadata offset : entry.getValue()) {
                List partitions;
                Broker broker = topicConfig.fetchBrokerByPartition((short)offset.getPartition());
                if (broker == null) {
                    logger.error("get leader failed, topic {}, partition {}, leader not available", (Object)topic, (Object)offset);
                    continue;
                }
                Map brokerTopicAndPartitions = (Map)result.get(broker);
                if (brokerTopicAndPartitions == null) {
                    brokerTopicAndPartitions = Maps.newHashMap();
                    result.put(broker, brokerTopicAndPartitions);
                }
                if ((partitions = (List)brokerTopicAndPartitions.get(topic)) == null) {
                    partitions = Lists.newLinkedList();
                    brokerTopicAndPartitions.put(topic, partitions);
                }
                partitions.add(offset);
            }
        }
        return result;
    }

    protected Map<Broker, Map<String, List<Integer>>> splitPartitionByBroker(Map<String, List<Integer>> topicAndPartitions) {
        HashMap result = Maps.newHashMapWithExpectedSize((int)topicAndPartitions.size());
        for (Map.Entry<String, List<Integer>> entry : topicAndPartitions.entrySet()) {
            String topic = entry.getKey();
            TopicConfig topicConfig = this.clusterNameService.getNameService().getTopicConfig(TopicName.parse((String)topic));
            if (topicConfig == null) {
                logger.error("get leader failed, topic not exist, topic: {}", (Object)topic);
                continue;
            }
            for (Integer partition : entry.getValue()) {
                List partitions;
                Broker broker = topicConfig.fetchBrokerByPartition((short)partition.intValue());
                if (broker == null) {
                    logger.error("get leader failed, topic {}, partition {}, leader not available", (Object)topic, (Object)partition);
                    continue;
                }
                Map brokerTopicAndPartitions = (Map)result.get(broker);
                if (brokerTopicAndPartitions == null) {
                    brokerTopicAndPartitions = Maps.newHashMap();
                    result.put(broker, brokerTopicAndPartitions);
                }
                if ((partitions = (List)brokerTopicAndPartitions.get(topic)) == null) {
                    partitions = Lists.newLinkedList();
                    brokerTopicAndPartitions.put(topic, partitions);
                }
                partitions.add(partition);
            }
        }
        return result;
    }
}

