/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.client.impl.consumer;

import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import com.alibaba.rocketmq.client.impl.FindBrokerResult;
import com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.alibaba.rocketmq.client.impl.consumer.PullRequest;
import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
import com.alibaba.rocketmq.client.log.ClientLogger;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody;
import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;

public abstract class RebalanceImpl {
    protected static final Logger log = ClientLogger.getLog();
    protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap(64);
    protected final ConcurrentHashMap<String, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap();
    protected final ConcurrentHashMap<String, SubscriptionData> subscriptionInner = new ConcurrentHashMap();
    protected String consumerGroup;
    protected MessageModel messageModel;
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    protected MQClientInstance mQClientFactory;

    public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientFactory) {
        this.consumerGroup = consumerGroup;
        this.messageModel = messageModel;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.mQClientFactory = mQClientFactory;
    }

    public void unlock(MessageQueue mq, boolean oneway) {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 0L, true);
        if (findBrokerResult != null) {
            UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.getMqSet().add(mq);
            try {
                this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000L, oneway);
                log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", new Object[]{this.consumerGroup, this.mQClientFactory.getClientId(), mq});
            }
            catch (Exception e) {
                log.error("unlockBatchMQ exception, " + mq, (Throwable)e);
            }
        }
    }

    public void unlockAll(boolean oneway) {
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
        for (Map.Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
            FindBrokerResult findBrokerResult;
            String brokerName = entry.getKey();
            Set<MessageQueue> mqs = entry.getValue();
            if (mqs.isEmpty() || (findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, 0L, true)) == null) continue;
            UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.setMqSet(mqs);
            try {
                this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000L, oneway);
                for (MessageQueue mq : mqs) {
                    ProcessQueue processQueue = this.processQueueTable.get(mq);
                    if (processQueue == null) continue;
                    processQueue.setLocked(false);
                    log.info("the message queue unlock OK, Group: {} {}", (Object)this.consumerGroup, (Object)mq);
                }
            }
            catch (Exception e) {
                log.error("unlockBatchMQ exception, " + mqs, (Throwable)e);
            }
        }
    }

    private HashMap<String, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
        HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
        for (MessageQueue mq : this.processQueueTable.keySet()) {
            Set<MessageQueue> mqs = result.get(mq.getBrokerName());
            if (null == mqs) {
                mqs = new HashSet<MessageQueue>();
                result.put(mq.getBrokerName(), mqs);
            }
            mqs.add(mq);
        }
        return result;
    }

    public boolean lock(MessageQueue mq) {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 0L, true);
        if (findBrokerResult != null) {
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.getMqSet().add(mq);
            try {
                Set<MessageQueue> lockedMq = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000L);
                for (MessageQueue mmqq : lockedMq) {
                    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                    if (processQueue == null) continue;
                    processQueue.setLocked(true);
                    processQueue.setLastLockTimestamp(System.currentTimeMillis());
                }
                boolean lockOK = lockedMq.contains(mq);
                log.info("the message queue lock {}, {} {}", new Object[]{lockOK ? "OK" : "Failed", this.consumerGroup, mq});
                return lockOK;
            }
            catch (Exception e) {
                log.error("lockBatchMQ exception, " + mq, (Throwable)e);
            }
        }
        return false;
    }

    public void lockAll() {
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
        for (Map.Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
            FindBrokerResult findBrokerResult;
            String brokerName = entry.getKey();
            Set<MessageQueue> mqs = entry.getValue();
            if (mqs.isEmpty() || (findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, 0L, true)) == null) continue;
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.setMqSet(mqs);
            try {
                ProcessQueue processQueue;
                Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000L);
                for (MessageQueue mq : lockOKMQSet) {
                    processQueue = this.processQueueTable.get(mq);
                    if (processQueue == null) continue;
                    if (!processQueue.isLocked()) {
                        log.info("the message queue locked OK, Group: {} {}", (Object)this.consumerGroup, (Object)mq);
                    }
                    processQueue.setLocked(true);
                    processQueue.setLastLockTimestamp(System.currentTimeMillis());
                }
                for (MessageQueue mq : mqs) {
                    if (lockOKMQSet.contains(mq) || (processQueue = this.processQueueTable.get(mq)) == null) continue;
                    processQueue.setLocked(false);
                    log.warn("the message queue locked Failed, Group: {} {}", (Object)this.consumerGroup, (Object)mq);
                }
            }
            catch (Exception e) {
                log.error("lockBatchMQ exception, " + mqs, (Throwable)e);
            }
        }
    }

    public void doRebalance() {
        ConcurrentHashMap<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (Map.Entry entry : subTable.entrySet()) {
                String topic = (String)entry.getKey();
                try {
                    this.rebalanceByTopic(topic);
                }
                catch (Exception e) {
                    if (topic.startsWith("%RETRY%")) continue;
                    log.warn("rebalanceByTopic Exception", (Throwable)e);
                }
            }
        }
        this.truncateMessageQueueNotMyTopic();
    }

    private void rebalanceByTopic(String topic) {
        switch (this.messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet);
                    if (!changed) break;
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, mqSet});
                    break;
                }
                log.warn("doRebalance, {}, but the topic[{}] not exist.", (Object)this.consumerGroup, (Object)topic);
                break;
            }
            case CLUSTERING: {
                boolean changed;
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);
                if (null == mqSet && !topic.startsWith("%RETRY%")) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", (Object)this.consumerGroup, (Object)topic);
                }
                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", (Object)this.consumerGroup, (Object)topic);
                }
                if (mqSet == null || cidAll == null) break;
                ArrayList<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);
                Collections.sort(mqAll);
                Collections.sort(cidAll);
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                List<MessageQueue> allocateResult = null;
                try {
                    allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                }
                catch (Throwable e) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception", e);
                    return;
                }
                HashSet<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }
                if (!(changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet))) break;
                log.info("rebalanced result changed. mqSet={}, ConsumerId={}, mqSize={}, cidSize={}", new Object[]{allocateResult, this.mQClientFactory.getClientId(), mqAll.size(), cidAll.size()});
                this.messageQueueChanged(topic, mqSet, allocateResultSet);
                log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, allocateResultSet});
                log.info("messageQueueChanged consumerIdList: {}", cidAll);
                break;
            }
        }
    }

    public abstract void messageQueueChanged(String var1, Set<MessageQueue> var2, Set<MessageQueue> var3);

    public void removeProcessQueue(MessageQueue mq) {
        ProcessQueue prev = this.processQueueTable.remove(mq);
        if (prev != null) {
            boolean droped = prev.isDroped();
            prev.setDroped(true);
            this.removeUnnecessaryMessageQueue(mq, prev);
            log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", new Object[]{this.consumerGroup, mq, droped});
        }
    }

    private boolean updateProcessQueueTableInRebalance(String topic, Set<MessageQueue> mqSet) {
        boolean changed = false;
        Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();
            if (!mq.getTopic().equals(topic)) continue;
            if (!mqSet.contains(mq)) {
                pq.setDroped(true);
                if (!this.removeUnnecessaryMessageQueue(mq, pq)) continue;
                it.remove();
                changed = true;
                log.info("doRebalance, {}, remove unnecessary mq, {}", (Object)this.consumerGroup, (Object)mq);
                continue;
            }
            if (!pq.isPullExpired()) continue;
            pq.setDroped(true);
            if (!this.removeUnnecessaryMessageQueue(mq, pq)) continue;
            it.remove();
            changed = true;
            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", (Object)this.consumerGroup, (Object)mq);
        }
        ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (this.processQueueTable.containsKey(mq)) continue;
            PullRequest pullRequest = new PullRequest();
            pullRequest.setConsumerGroup(this.consumerGroup);
            pullRequest.setMessageQueue(mq);
            pullRequest.setProcessQueue(new ProcessQueue());
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0L) {
                pullRequest.setNextOffset(nextOffset);
                pullRequestList.add(pullRequest);
                changed = true;
                this.processQueueTable.put(mq, pullRequest.getProcessQueue());
                log.info("doRebalance, {}, add a new mq, {}", (Object)this.consumerGroup, (Object)mq);
                continue;
            }
            log.warn("doRebalance, {}, add new mq failed, {}", (Object)this.consumerGroup, (Object)mq);
        }
        this.dispatchPullRequest(pullRequestList);
        return changed;
    }

    public abstract boolean removeUnnecessaryMessageQueue(MessageQueue var1, ProcessQueue var2);

    public abstract void dispatchPullRequest(List<PullRequest> var1);

    public abstract long computePullFromWhere(MessageQueue var1);

    private void truncateMessageQueueNotMyTopic() {
        ConcurrentHashMap<String, SubscriptionData> subTable = this.getSubscriptionInner();
        for (MessageQueue mq : this.processQueueTable.keySet()) {
            ProcessQueue pq;
            if (subTable.containsKey(mq.getTopic()) || (pq = this.processQueueTable.remove(mq)) == null) continue;
            pq.setDroped(true);
            log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", (Object)this.consumerGroup, (Object)mq);
        }
    }

    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
        return this.subscriptionInner;
    }

    public ConcurrentHashMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
        return this.processQueueTable;
    }

    public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
        return this.topicSubscribeInfoTable;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public MessageModel getMessageModel() {
        return this.messageModel;
    }

    public void setMessageModel(MessageModel messageModel) {
        this.messageModel = messageModel;
    }

    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
        return this.allocateMessageQueueStrategy;
    }

    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    }

    public MQClientInstance getmQClientFactory() {
        return this.mQClientFactory;
    }

    public void setmQClientFactory(MQClientInstance mQClientFactory) {
        this.mQClientFactory = mQClientFactory;
    }
}

