/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.coordinator.group;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.SyncGroupAssignment;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.group.GroupMetadataManager;
import org.joyqueue.broker.kafka.coordinator.group.callback.JoinCallback;
import org.joyqueue.broker.kafka.coordinator.group.delay.DelayedHeartbeat;
import org.joyqueue.broker.kafka.coordinator.group.delay.DelayedInitialJoin;
import org.joyqueue.broker.kafka.coordinator.group.delay.DelayedJoin;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupMemberMetadata;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupMetadata;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupState;
import org.joyqueue.toolkit.delay.DelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperationKey;
import org.joyqueue.toolkit.delay.DelayedOperationManager;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupBalanceManager
extends Service {
    protected Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private KafkaConfig config;
    private GroupMetadataManager groupMetadataManager;
    private DelayedOperationManager<DelayedJoin> joinPurgatory;
    private DelayedOperationManager<DelayedHeartbeat> heartbeatPurgatory;

    public GroupBalanceManager(KafkaConfig config, GroupMetadataManager groupMetadataManager) {
        this.config = config;
        this.groupMetadataManager = groupMetadataManager;
        this.joinPurgatory = new DelayedOperationManager("kafka-rebalance");
        this.heartbeatPurgatory = new DelayedOperationManager("kafka-heartbeat");
    }

    protected void doStart() {
        this.joinPurgatory.start();
        this.heartbeatPurgatory.start();
    }

    protected void doStop() {
        this.joinPurgatory.shutdown();
        this.heartbeatPurgatory.shutdown();
    }

    public GroupMemberMetadata addMemberAndRebalance(int rebalanceTimeoutMs, int sessionTimeoutMs, String clientId, String clientHost, Map<String, byte[]> protocols, GroupMetadata group, JoinCallback callback) {
        String memberId = this.generateMemberId(group, clientId, clientHost);
        GroupMemberMetadata member = new GroupMemberMetadata(memberId, group.getId(), clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocols);
        this.logger.info("add member, groupId: {}, state: {}, generationId: {}, leaderId: {}, memberId: {}, memberCount = {}, rebalanceTimeout:{}, sessionTimeout:{}", new Object[]{group.getId(), group.getState(), group.getGenerationId(), group.getLeaderId(), memberId, group.getAllMemberIds().size(), rebalanceTimeoutMs, sessionTimeoutMs});
        if (group.stateIs(GroupState.PREPARINGREBALANCE) && group.isNewGroup()) {
            group.setNewMemberAdded(true);
        }
        if (group.stateIs(GroupState.STABLE)) {
            group.addExpiredMember(member);
        }
        member.setAwaitingJoinCallback(callback);
        group.addMember(member);
        this.maybePrepareRebalance(group);
        return member;
    }

    protected String generateMemberId(GroupMetadata group, String clientId, String clientHost) {
        return group.getId() + "-" + clientId + "-" + clientHost + "-" + SystemClock.now();
    }

    public void updateMemberAndRebalance(GroupMetadata group, GroupMemberMetadata member, Map<String, byte[]> protocols, JoinCallback callback) {
        member.setSupportedProtocols(protocols);
        member.setAwaitingJoinCallback(callback);
        this.maybePrepareRebalance(group);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void maybePrepareRebalance(GroupMetadata group) {
        GroupMetadata groupMetadata = group;
        synchronized (groupMetadata) {
            if (group.canRebalance()) {
                this.prepareRebalance(group);
            }
        }
    }

    public void prepareRebalance(GroupMetadata group) {
        this.logger.info("prepare rebalance, groupId:{}, state:{}, generationId:{}, leaderId:{}", new Object[]{group.getId(), group.getState(), group.getGenerationId(), group.getLeaderId()});
        if (group.stateIs(GroupState.AWAITINGSYNC)) {
            this.resetAndPropagateAssignmentError(group, KafkaErrorCode.REBALANCE_IN_PROGRESS.getCode());
        }
        int rebalanceTimeout = this.config.getRebalanceTimeout() != 0 ? this.config.getRebalanceTimeout() : group.getMaxRebalanceTimeout();
        int rebalanceDelay = this.config.getRebalanceInitialDelay();
        DelayedJoin delayedJoin = group.stateIs(GroupState.EMPTY) ? new DelayedInitialJoin(this, this.groupMetadataManager, group, this.joinPurgatory, rebalanceDelay, rebalanceDelay, Math.max(rebalanceTimeout - rebalanceDelay, 0)) : new DelayedJoin(this, this.groupMetadataManager, group, rebalanceTimeout);
        group.transitionStateTo(GroupState.PREPARINGREBALANCE);
        DelayedOperationKey groupKey = new DelayedOperationKey(new Object[]{group.getId()});
        HashSet delayedOperationKeys = Sets.newHashSet((Object[])new Object[]{groupKey});
        this.joinPurgatory.tryCompleteElseWatch((DelayedOperation)delayedJoin, (Set)delayedOperationKeys);
    }

    public void setAndPropagateAssignment(GroupMetadata group, Map<String, SyncGroupAssignment> assignment) {
        Preconditions.checkState((boolean)group.stateIs(GroupState.AWAITINGSYNC));
        List<GroupMemberMetadata> allMemberMetadata = group.getAllMembers();
        for (GroupMemberMetadata member : allMemberMetadata) {
            SyncGroupAssignment syncGroupAssignment = assignment.get(member.getId());
            HashMap topicPartitions = Maps.newHashMap();
            for (Map.Entry<String, List<Integer>> entry : syncGroupAssignment.getTopicPartitions().entrySet()) {
                ArrayList partitions = Lists.newArrayListWithCapacity((int)entry.getValue().size());
                for (Integer partition : entry.getValue()) {
                    partitions.add((short)partition.intValue());
                }
                topicPartitions.put(entry.getKey(), partitions);
            }
            member.setAssignment(syncGroupAssignment);
            member.setAssignments(topicPartitions);
        }
        this.propagateAssignment(group, KafkaErrorCode.NONE.getCode());
    }

    public void resetAndPropagateAssignmentError(GroupMetadata group, short errorCode) {
        Preconditions.checkState((boolean)group.stateIs(GroupState.AWAITINGSYNC));
        List<GroupMemberMetadata> memberMetadatas = group.getAllMembers();
        for (GroupMemberMetadata member : memberMetadatas) {
            member.setAssignment(null);
        }
        this.propagateAssignment(group, errorCode);
    }

    public void propagateAssignment(GroupMetadata group, short errorCode) {
        List<GroupMemberMetadata> memberMetadatas = group.getAllMembers();
        for (GroupMemberMetadata member : memberMetadatas) {
            if (member.getAwaitingSyncCallback() == null) continue;
            member.getAwaitingSyncCallback().sendResponseCallback(member.getAssignment(), errorCode);
            member.setAwaitingSyncCallback(null);
            this.completeAndScheduleNextHeartbeatExpiration(group, member);
        }
    }

    public void removeMemberAndUpdateGroup(GroupMetadata group, GroupMemberMetadata member) {
        this.logger.info("member {} in group {} has failed, group state is {}, member count is {}", new Object[]{member.getId(), group.getId(), group.getState(), group.getAllMemberIds().size()});
        group.removeMember(member.getId());
        switch (group.getState()) {
            case DEAD: 
            case EMPTY: {
                break;
            }
            case STABLE: 
            case AWAITINGSYNC: {
                this.maybePrepareRebalance(group);
                break;
            }
            case PREPARINGREBALANCE: {
                DelayedOperationKey groupKey = new DelayedOperationKey(new Object[]{group.getId()});
                this.joinPurgatory.checkAndComplete((Object)groupKey);
            }
        }
    }

    public void checkAndComplete(GroupMetadata groupMetadata) {
        this.joinPurgatory.checkAndComplete((Object)new DelayedOperationKey(new Object[]{groupMetadata.getId()}));
    }

    public void completeAndScheduleNextHeartbeatExpiration(GroupMetadata group, GroupMemberMetadata member) {
        member.setLatestHeartbeat(SystemClock.now());
        DelayedOperationKey memberKey = new DelayedOperationKey(new Object[]{member.getGroupId(), member.getId()});
        this.heartbeatPurgatory.checkAndComplete((Object)memberKey);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("handle heartbeat, group {}, member {}, latestHeartbeat is {}, sessionTimeout is {}", new Object[]{group.getId(), member.getId(), member.getLatestHeartbeat(), member.getSessionTimeout()});
        }
        long newHeartbeatDeadline = member.getLatestHeartbeat() + (long)member.getSessionTimeout();
        DelayedHeartbeat delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.getSessionTimeout());
        this.heartbeatPurgatory.tryCompleteElseWatch((DelayedOperation)delayedHeartbeat, (Set)Sets.newHashSet((Object[])new Object[]{memberKey}));
    }

    public void removeHeartbeatForLeavingMember(GroupMetadata group, GroupMemberMetadata member) {
        member.setLeaving(true);
        DelayedOperationKey memberKey = new DelayedOperationKey(new Object[]{member.getGroupId(), member.getId()});
        this.heartbeatPurgatory.checkAndComplete((Object)memberKey);
    }

    public boolean shouldKeepMemberAlive(GroupMemberMetadata member, long heartbeatDeadline) {
        return member.getAwaitingJoinCallback() != null || member.getAwaitingSyncCallback() != null || member.getLatestHeartbeat() + (long)member.getSessionTimeout() > heartbeatDeadline;
    }
}

