package org.joyqueue.broker.kafka.coordinator.group;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.SyncGroupAssignment;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.group.callback.JoinCallback;
import org.joyqueue.broker.kafka.coordinator.group.callback.SyncCallback;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupDescribe;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupJoinGroupResult;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupMemberMetadata;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupMetadata;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupState;
import org.joyqueue.broker.kafka.message.compressor.lz4.KafkaLZ4BlockOutputStream;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/group/GroupBalanceHandler.class */
public class GroupBalanceHandler extends Service {
    protected Logger logger = LoggerFactory.getLogger(getClass());
    private KafkaConfig config;
    private GroupMetadataManager groupMetadataManager;
    private GroupBalanceManager groupBalanceManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.joyqueue.broker.kafka.coordinator.group.GroupBalanceHandler$1, reason: invalid class name */
    /* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/group/GroupBalanceHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState = new int[GroupState.values().length];

        static {
            try {
                $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[GroupState.DEAD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[GroupState.PREPARINGREBALANCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[GroupState.AWAITINGSYNC.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[GroupState.EMPTY.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[GroupState.STABLE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public GroupBalanceHandler(KafkaConfig kafkaConfig, GroupMetadataManager groupMetadataManager, GroupBalanceManager groupBalanceManager) {
        this.config = kafkaConfig;
        this.groupMetadataManager = groupMetadataManager;
        this.groupBalanceManager = groupBalanceManager;
    }

    public void joinGroup(String str, String str2, String str3, String str4, int i, int i2, String str5, Map<String, byte[]> map, JoinCallback joinCallback) {
        if (!isStarted()) {
            joinCallback.sendResponseCallback(GroupJoinGroupResult.buildError(str2, KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode()));
            return;
        }
        if (!validGroupId(str)) {
            joinCallback.sendResponseCallback(GroupJoinGroupResult.buildError(str2, KafkaErrorCode.INVALID_GROUP_ID.getCode()));
            return;
        }
        if (i2 < this.config.getSessionMaxTimeout() || i2 > this.config.getSessionMinTimeout()) {
            joinCallback.sendResponseCallback(GroupJoinGroupResult.buildError(str2, KafkaErrorCode.INVALID_SESSION_TIMEOUT.getCode()));
            return;
        }
        GroupMetadata group = this.groupMetadataManager.getGroup(str);
        if (group == null) {
            if (!str2.equals("")) {
                joinCallback.sendResponseCallback(GroupJoinGroupResult.buildError(str2, KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode()));
                return;
            }
            group = this.groupMetadataManager.getOrCreateGroup(new GroupMetadata(str, str5));
        }
        synchronized (group) {
            doJoinGroup(group, str2, str3, str4, i, i2, str5, map, joinCallback);
        }
    }

    protected void doJoinGroup(GroupMetadata groupMetadata, String str, String str2, String str3, int i, int i2, String str4, Map<String, byte[]> map, JoinCallback joinCallback) {
        if (!groupMetadata.stateIs(GroupState.EMPTY) && (!groupMetadata.getProtocolType().equals(str4) || !groupMetadata.supportsProtocols(map.keySet()))) {
            joinCallback.sendResponseCallback(GroupJoinGroupResult.buildError(str, KafkaErrorCode.INCONSISTENT_GROUP_PROTOCOL.getCode()));
            return;
        }
        if (groupMetadata.stateIs(GroupState.EMPTY) && (map.isEmpty() || str4.isEmpty())) {
            joinCallback.sendResponseCallback(GroupJoinGroupResult.buildError(str, KafkaErrorCode.INCONSISTENT_GROUP_PROTOCOL.getCode()));
            return;
        }
        if (!str.equals("") && !groupMetadata.isHasMember(str)) {
            joinCallback.sendResponseCallback(GroupJoinGroupResult.buildError(str, KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode()));
            return;
        }
        switch (AnonymousClass1.$SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[groupMetadata.getState().ordinal()]) {
            case 1:
                joinCallback.sendResponseCallback(GroupJoinGroupResult.buildError(str, KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode()));
                break;
            case 2:
                if (!str.equals("")) {
                    this.groupBalanceManager.updateMemberAndRebalance(groupMetadata, groupMetadata.getMember(str), map, joinCallback);
                    break;
                } else {
                    this.groupBalanceManager.addMemberAndRebalance(i, i2, str2, str3, map, groupMetadata, joinCallback);
                    break;
                }
            case 3:
                if (!str.equals("")) {
                    GroupMemberMetadata member = groupMetadata.getMember(str);
                    if (!member.matches(map)) {
                        this.groupBalanceManager.updateMemberAndRebalance(groupMetadata, member, map, joinCallback);
                        break;
                    } else {
                        joinCallback.sendResponseCallback(new GroupJoinGroupResult(str.equals(groupMetadata.getLeaderId()) ? groupMetadata.currentMemberMetadata() : Collections.emptyMap(), str, groupMetadata.getGenerationId(), groupMetadata.getProtocol(), groupMetadata.getLeaderId(), KafkaErrorCode.NONE.getCode()));
                        break;
                    }
                } else {
                    this.groupBalanceManager.addMemberAndRebalance(i, i2, str2, str3, map, groupMetadata, joinCallback);
                    break;
                }
            case KafkaLZ4BlockOutputStream.BLOCKSIZE_64KB /* 4 */:
            case KafkaLZ4BlockOutputStream.BLOCKSIZE_256KB /* 5 */:
                if (!str.equals("")) {
                    GroupMemberMetadata member2 = groupMetadata.getMember(str);
                    if (!str.equals(groupMetadata.getLeaderId()) && member2.matches(map)) {
                        joinCallback.sendResponseCallback(new GroupJoinGroupResult(Collections.emptyMap(), str, groupMetadata.getGenerationId(), groupMetadata.getProtocol(), groupMetadata.getLeaderId(), KafkaErrorCode.NONE.getCode()));
                        break;
                    } else {
                        this.groupBalanceManager.updateMemberAndRebalance(groupMetadata, member2, map, joinCallback);
                        break;
                    }
                } else {
                    this.groupBalanceManager.addMemberAndRebalance(i, i2, str2, str3, map, groupMetadata, joinCallback);
                    break;
                }
        }
        if (groupMetadata.stateIs(GroupState.PREPARINGREBALANCE)) {
            this.groupBalanceManager.checkAndComplete(groupMetadata);
        }
    }

    public void syncGroup(String str, int i, String str2, Map<String, SyncGroupAssignment> map, SyncCallback syncCallback) {
        if (!isStarted()) {
            syncCallback.sendResponseCallback(null, KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode());
            return;
        }
        GroupMetadata group = this.groupMetadataManager.getGroup(str);
        if (group == null) {
            syncCallback.sendResponseCallback(null, KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode());
        } else {
            synchronized (group) {
                doSyncGroup(group, i, str2, map, syncCallback);
            }
        }
    }

    protected void doSyncGroup(GroupMetadata groupMetadata, int i, String str, Map<String, SyncGroupAssignment> map, SyncCallback syncCallback) {
        this.logger.info("sync group, groupId = {}, memberId = {}, memberCount = {}", new Object[]{groupMetadata.getId(), str, Integer.valueOf(groupMetadata.getAllMemberIds().size())});
        if (!groupMetadata.isHasMember(str)) {
            syncCallback.sendResponseCallback(null, KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode());
            return;
        }
        if (i != groupMetadata.getGenerationId()) {
            syncCallback.sendResponseCallback(null, KafkaErrorCode.ILLEGAL_GENERATION.getCode());
            return;
        }
        switch (AnonymousClass1.$SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[groupMetadata.getState().ordinal()]) {
            case 1:
            case KafkaLZ4BlockOutputStream.BLOCKSIZE_64KB /* 4 */:
                syncCallback.sendResponseCallback(null, KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode());
                return;
            case 2:
                syncCallback.sendResponseCallback(null, KafkaErrorCode.REBALANCE_IN_PROGRESS.getCode());
                return;
            case 3:
                groupMetadata.getMember(str).setAwaitingSyncCallback(syncCallback);
                this.groupBalanceManager.completeAndScheduleNextHeartbeatExpiration(groupMetadata, groupMetadata.getMember(str));
                if (str.equals(groupMetadata.getLeaderId())) {
                    List<String> allMemberIds = groupMetadata.getAllMemberIds();
                    Set<String> keySet = map.keySet();
                    HashSet newHashSet = Sets.newHashSet();
                    newHashSet.addAll(allMemberIds);
                    newHashSet.removeAll(keySet);
                    if (!newHashSet.isEmpty()) {
                        Iterator it = newHashSet.iterator();
                        while (it.hasNext()) {
                            map.put((String) it.next(), null);
                        }
                    }
                    if (groupMetadata.stateIs(GroupState.AWAITINGSYNC) && i == groupMetadata.getGenerationId()) {
                        this.logger.info("sync group {}, transition to STABLE state, generation id is {}", groupMetadata.getId(), Integer.valueOf(i));
                        this.groupBalanceManager.setAndPropagateAssignment(groupMetadata, map);
                        groupMetadata.transitionStateTo(GroupState.STABLE);
                        return;
                    }
                    return;
                }
                return;
            case KafkaLZ4BlockOutputStream.BLOCKSIZE_256KB /* 5 */:
                syncCallback.sendResponseCallback(groupMetadata.getMember(str).getAssignment(), KafkaErrorCode.NONE.getCode());
                this.groupBalanceManager.completeAndScheduleNextHeartbeatExpiration(groupMetadata, groupMetadata.getMember(str));
                return;
            default:
                return;
        }
    }

    public short leaveGroup(String str, String str2) {
        if (!isStarted()) {
            return KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode();
        }
        GroupMetadata group = this.groupMetadataManager.getGroup(str);
        if (group == null) {
            return KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode();
        }
        this.logger.info("member leave group, memberId: {}, group: {}, state: {}", new Object[]{str2, str, group.getState()});
        synchronized (group) {
            if (group.stateIs(GroupState.DEAD) || !group.isHasMember(str2)) {
                return KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode();
            }
            GroupMemberMetadata member = group.getMember(str2);
            this.groupBalanceManager.removeHeartbeatForLeavingMember(group, member);
            this.groupBalanceManager.removeMemberAndUpdateGroup(group, member);
            group.addExpiredMember(member);
            return KafkaErrorCode.NONE.getCode();
        }
    }

    public short heartbeat(String str, String str2, int i) {
        if (!isStarted()) {
            return KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode();
        }
        GroupMetadata group = this.groupMetadataManager.getGroup(str);
        return group == null ? KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode() : doHeartbeat(group, str2, i);
    }

    protected short doHeartbeat(GroupMetadata groupMetadata, String str, int i) {
        switch (AnonymousClass1.$SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[groupMetadata.getState().ordinal()]) {
            case 1:
            case KafkaLZ4BlockOutputStream.BLOCKSIZE_64KB /* 4 */:
                return KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode();
            case 2:
                if (!groupMetadata.isHasMember(str)) {
                    return KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode();
                }
                if (i != groupMetadata.getGenerationId()) {
                    return KafkaErrorCode.ILLEGAL_GENERATION.getCode();
                }
                this.groupBalanceManager.completeAndScheduleNextHeartbeatExpiration(groupMetadata, groupMetadata.getMember(str));
                return KafkaErrorCode.REBALANCE_IN_PROGRESS.getCode();
            case 3:
                return !groupMetadata.isHasMember(str) ? KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode() : KafkaErrorCode.REBALANCE_IN_PROGRESS.getCode();
            case KafkaLZ4BlockOutputStream.BLOCKSIZE_256KB /* 5 */:
                if (!groupMetadata.isHasMember(str)) {
                    return KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode();
                }
                if (i != groupMetadata.getGenerationId()) {
                    return KafkaErrorCode.ILLEGAL_GENERATION.getCode();
                }
                this.groupBalanceManager.completeAndScheduleNextHeartbeatExpiration(groupMetadata, groupMetadata.getMember(str));
                return KafkaErrorCode.NONE.getCode();
            default:
                this.logger.error("handle heartbeat, invalid group state {} of group {}", groupMetadata.getState(), groupMetadata.getId());
                return KafkaErrorCode.ILLEGAL_GENERATION.getCode();
        }
    }

    public List<GroupDescribe> describeGroups(List<String> list) {
        LinkedList newLinkedList = Lists.newLinkedList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            GroupDescribe buildDescribeGroup = buildDescribeGroup(this.groupMetadataManager.getGroup(it.next()));
            if (buildDescribeGroup != null) {
                newLinkedList.add(buildDescribeGroup);
            }
        }
        return newLinkedList;
    }

    protected GroupDescribe buildDescribeGroup(GroupMetadata groupMetadata) {
        if (groupMetadata == null) {
            return null;
        }
        GroupDescribe groupDescribe = new GroupDescribe();
        groupDescribe.setGroupId(groupMetadata.getId());
        if (groupMetadata == null) {
            groupDescribe.setState("");
            groupDescribe.setProtocolType("");
            groupDescribe.setProtocol("");
            return groupDescribe;
        }
        groupDescribe.setProtocolType(groupMetadata.getProtocolType());
        groupDescribe.setProtocol(groupMetadata.getProtocol());
        groupDescribe.setState(groupMetadata.getState().toString());
        groupDescribe.setErrCode(KafkaErrorCode.NONE.getCode());
        groupDescribe.setMembers(Lists.newArrayList(groupMetadata.getAllMembers()));
        return groupDescribe;
    }

    protected boolean validGroupId(String str) {
        return StringUtils.isNotBlank(str);
    }
}
