/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.coordinator.group;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.Coordinator;
import org.joyqueue.broker.kafka.coordinator.group.GroupBalanceManager;
import org.joyqueue.broker.kafka.coordinator.group.GroupMetadataManager;
import org.joyqueue.broker.kafka.coordinator.group.GroupOffsetManager;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupMetadata;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupState;
import org.joyqueue.broker.kafka.model.OffsetAndMetadata;
import org.joyqueue.broker.kafka.model.OffsetMetadataAndError;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupOffsetHandler
extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(GroupOffsetHandler.class);
    private KafkaConfig config;
    private Coordinator coordinator;
    private GroupMetadataManager groupMetadataManager;
    private GroupBalanceManager groupBalanceManager;
    private GroupOffsetManager groupOffsetManager;

    public GroupOffsetHandler(KafkaConfig config, Coordinator coordinator, GroupMetadataManager groupMetadataManager, GroupBalanceManager groupBalanceManager, GroupOffsetManager groupOffsetManager) {
        this.config = config;
        this.coordinator = coordinator;
        this.groupMetadataManager = groupMetadataManager;
        this.groupBalanceManager = groupBalanceManager;
        this.groupOffsetManager = groupOffsetManager;
    }

    public Map<String, List<OffsetMetadataAndError>> commitOffsets(String groupId, String memberId, int generationId, Map<String, List<OffsetAndMetadata>> offsets) {
        if (!this.isStarted()) {
            return this.buildCommitError(offsets, KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode());
        }
        if (!this.coordinator.isCurrentGroup(groupId)) {
            logger.info("group {} coordinator changed", (Object)groupId);
            return this.buildCommitError(offsets, KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode());
        }
        if (StringUtils.isBlank((CharSequence)memberId)) {
            return this.groupOffsetManager.saveOffsets(groupId, offsets);
        }
        GroupMetadata group = this.groupMetadataManager.getGroup(groupId);
        if (group == null) {
            logger.info("offset commit, group({}) is null, member id is {}, generationId is {}, offsetMetadata is {}", new Object[]{groupId, memberId, generationId, JSON.toJSONString(offsets)});
            if (generationId < 0) {
                return this.groupOffsetManager.saveOffsets(groupId, offsets);
            }
            return this.buildCommitError(offsets, KafkaErrorCode.ILLEGAL_GENERATION.getCode());
        }
        return this.handleCommitOffsets(group, groupId, memberId, generationId, offsets);
    }

    protected Map<String, List<OffsetMetadataAndError>> handleCommitOffsets(GroupMetadata group, String groupId, String memberId, int generationId, Map<String, List<OffsetAndMetadata>> offsets) {
        if (group.stateIs(GroupState.DEAD) || !group.isHasMember(memberId)) {
            return this.buildCommitError(offsets, KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode());
        }
        if (group.stateIs(GroupState.EMPTY) && generationId < 0) {
            return this.groupOffsetManager.saveOffsets(groupId, offsets);
        }
        if (group.stateIs(GroupState.AWAITINGSYNC)) {
            return this.buildCommitError(offsets, KafkaErrorCode.REBALANCE_IN_PROGRESS.getCode());
        }
        if (generationId != group.getGenerationId()) {
            return this.buildCommitError(offsets, KafkaErrorCode.ILLEGAL_GENERATION.getCode());
        }
        this.groupBalanceManager.completeAndScheduleNextHeartbeatExpiration(group, group.getMember(memberId));
        return this.groupOffsetManager.saveOffsets(groupId, offsets);
    }

    public Map<String, List<OffsetMetadataAndError>> fetchOffsets(String groupId, Map<String, List<Integer>> topicAndPartitions) {
        if (!this.isStarted()) {
            return this.buildFetchError(topicAndPartitions, KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode());
        }
        return this.groupOffsetManager.getOffsets(groupId, topicAndPartitions);
    }

    protected Map<String, List<OffsetMetadataAndError>> buildFetchError(Map<String, List<Integer>> topicAndPartitions, short errorCode) {
        HashMap result = Maps.newHashMapWithExpectedSize((int)topicAndPartitions.size());
        for (Map.Entry<String, List<Integer>> entry : topicAndPartitions.entrySet()) {
            ArrayList offsetList = Lists.newArrayListWithCapacity((int)entry.getValue().size());
            result.put(entry.getKey(), offsetList);
            for (Integer partition : entry.getValue()) {
                offsetList.add(new OffsetMetadataAndError(partition, errorCode));
            }
        }
        return result;
    }

    protected Map<String, List<OffsetMetadataAndError>> buildCommitError(Map<String, List<OffsetAndMetadata>> offsets, short errorCode) {
        HashMap result = Maps.newHashMapWithExpectedSize((int)offsets.size());
        for (Map.Entry<String, List<OffsetAndMetadata>> entry : offsets.entrySet()) {
            ArrayList offsetList = Lists.newArrayListWithCapacity((int)entry.getValue().size());
            result.put(entry.getKey(), offsetList);
            for (OffsetAndMetadata offsetAndMetadata : entry.getValue()) {
                offsetList.add(new OffsetMetadataAndError(offsetAndMetadata.getPartition(), errorCode));
            }
        }
        return result;
    }
}

