/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.handler;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.kafka.KafkaCommandType;
import org.joyqueue.broker.kafka.KafkaContext;
import org.joyqueue.broker.kafka.KafkaContextAware;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.ListOffsetsRequest;
import org.joyqueue.broker.kafka.command.ListOffsetsResponse;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.handler.AbstractKafkaCommandHandler;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicName;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.store.PartitionGroupStore;
import org.joyqueue.store.StoreService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ListOffsetsRequestHandler
extends AbstractKafkaCommandHandler
implements KafkaContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(ListOffsetsRequestHandler.class);
    private static final long EARLIEST_TIMESTAMP = -2L;
    private static final long LATEST_TIMESTAMP = -1L;
    private ClusterManager clusterManager;
    private StoreService storeService;
    private KafkaConfig config;

    @Override
    public void setKafkaContext(KafkaContext kafkaContext) {
        this.clusterManager = kafkaContext.getBrokerContext().getClusterManager();
        this.storeService = kafkaContext.getBrokerContext().getStoreService();
        this.config = kafkaContext.getConfig();
    }

    public Command handle(Transport transport, Command command) {
        ListOffsetsRequest request = (ListOffsetsRequest)command.getPayload();
        Map<String, List<ListOffsetsRequest.PartitionOffsetRequest>> partitionRequestMap = request.getPartitionRequests();
        HashMap partitionResponseMap = Maps.newHashMapWithExpectedSize((int)partitionRequestMap.size());
        for (Map.Entry<String, List<ListOffsetsRequest.PartitionOffsetRequest>> entry : partitionRequestMap.entrySet()) {
            TopicName topicName = TopicName.parse((String)entry.getKey());
            ArrayList partitionResponses = Lists.newArrayListWithCapacity((int)entry.getValue().size());
            for (ListOffsetsRequest.PartitionOffsetRequest partitionOffsetRequest : entry.getValue()) {
                ListOffsetsResponse.PartitionOffsetResponse partitionOffsetResponse = this.getOffsetByTimestamp(topicName, partitionOffsetRequest.getPartition(), partitionOffsetRequest.getTime());
                partitionResponses.add(partitionOffsetResponse);
            }
            partitionResponseMap.put(entry.getKey(), partitionResponses);
        }
        if (this.config.getLogDetail(request.getClientId())) {
            logger.info("list offset, transport: {}, app: {}, request: {}, response: {}", new Object[]{transport, request.getClientId(), partitionRequestMap, partitionResponseMap});
        }
        ListOffsetsResponse response = new ListOffsetsResponse(partitionResponseMap);
        return new Command((Object)response);
    }

    private ListOffsetsResponse.PartitionOffsetResponse getOffsetByTimestamp(TopicName topic, int partition, long timestamp) {
        try {
            PartitionGroup partitionGroup = this.clusterManager.getPartitionGroup(topic, (short)partition);
            if (partitionGroup == null) {
                logger.error("list offset error, partitionGroup not exist, topic: {}, partition: {}", (Object)topic, (Object)partition);
                return new ListOffsetsResponse.PartitionOffsetResponse(partition, KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode(), timestamp, -1L);
            }
            long offset = 0L;
            PartitionGroupStore partitionGroupStore = this.storeService.getStore(topic.getFullName(), partitionGroup.getGroup());
            offset = timestamp == -1L ? partitionGroupStore.getRightIndex((short)partition) : (timestamp == -2L ? partitionGroupStore.getLeftIndex((short)partition) : partitionGroupStore.getIndex((short)partition, timestamp));
            return new ListOffsetsResponse.PartitionOffsetResponse(partition, KafkaErrorCode.NONE.getCode(), timestamp, offset);
        }
        catch (Exception e) {
            logger.error("list offset exception, topic: {}, partition: {}", new Object[]{topic, partition, e});
            short errorCode = KafkaErrorCode.exceptionFor(e);
            return new ListOffsetsResponse.PartitionOffsetResponse(partition, errorCode, timestamp, -1L);
        }
    }

    public int type() {
        return KafkaCommandType.LIST_OFFSETS.getCode();
    }
}

