/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.client.impl.factory;

import com.alibaba.rocketmq.client.ClientConfig;
import com.alibaba.rocketmq.client.admin.MQAdminExtInner;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.impl.ClientRemotingProcessor;
import com.alibaba.rocketmq.client.impl.FindBrokerResult;
import com.alibaba.rocketmq.client.impl.MQAdminImpl;
import com.alibaba.rocketmq.client.impl.MQClientAPIImpl;
import com.alibaba.rocketmq.client.impl.MQClientManager;
import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner;
import com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.alibaba.rocketmq.client.impl.consumer.PullMessageService;
import com.alibaba.rocketmq.client.impl.consumer.RebalanceService;
import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import com.alibaba.rocketmq.client.impl.producer.MQProducerInner;
import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
import com.alibaba.rocketmq.client.log.ClientLogger;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.stat.ConsumerStatsManager;
import com.alibaba.rocketmq.common.MQVersion;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.ServiceState;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.constant.PermName;
import com.alibaba.rocketmq.common.filter.FilterAPI;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumerData;
import com.alibaba.rocketmq.common.protocol.heartbeat.HeartbeatData;
import com.alibaba.rocketmq.common.protocol.heartbeat.ProducerData;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.alibaba.rocketmq.common.protocol.route.BrokerData;
import com.alibaba.rocketmq.common.protocol.route.QueueData;
import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
import com.alibaba.rocketmq.remoting.RPCHook;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import java.io.UnsupportedEncodingException;
import java.net.DatagramSocket;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;

public class MQClientInstance {
    private static final long LockTimeoutMillis = 3000L;
    private final Logger log = ClientLogger.getLog();
    private final ClientConfig clientConfig;
    private final int instanceIndex;
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    private final ConcurrentHashMap<String, MQProducerInner> producerTable = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, MQAdminExtInner> adminExtTable = new ConcurrentHashMap();
    private final NettyClientConfig nettyClientConfig;
    private final MQClientAPIImpl mQClientAPIImpl;
    private final MQAdminImpl mQAdminImpl;
    private final ConcurrentHashMap<String, TopicRouteData> topicRouteTable = new ConcurrentHashMap();
    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();
    private final ConcurrentHashMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });
    private final ClientRemotingProcessor clientRemotingProcessor;
    private final PullMessageService pullMessageService;
    private final RebalanceService rebalanceService;
    private final DefaultMQProducer defaultMQProducer;
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private DatagramSocket datagramSocket;
    private final ConsumerStatsManager consumerStatsManager;

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
        this.nettyClientConfig = new NettyClientConfig();
        this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook);
        if (this.clientConfig.getNamesrvAddr() != null) {
            this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
            this.log.info("user specfied name server address: {}", (Object)this.clientConfig.getNamesrvAddr());
        }
        this.clientId = clientId;
        this.mQAdminImpl = new MQAdminImpl(this);
        this.pullMessageService = new PullMessageService(this);
        this.rebalanceService = new RebalanceService(this);
        this.defaultMQProducer = new DefaultMQProducer("CLIENT_INNER_PRODUCER");
        this.defaultMQProducer.resetClientConfig(clientConfig);
        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
        this.log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}", new Object[]{this.instanceIndex, this.clientId, this.clientConfig, MQVersion.getVersionDesc((int)MQVersion.CurrentVersion)});
    }

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
        this(clientConfig, instanceIndex, clientId, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws MQClientException {
        MQClientInstance mQClientInstance = this;
        synchronized (mQClientInstance) {
            switch (this.serviceState) {
                case CREATE_JUST: {
                    this.serviceState = ServiceState.START_FAILED;
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
                    }
                    this.mQClientAPIImpl.start();
                    this.startScheduledTask();
                    this.pullMessageService.start();
                    this.rebalanceService.start();
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    this.log.info("the client factory [{}] start OK", (Object)this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                }
                case RUNNING: {
                    break;
                }
                case SHUTDOWN_ALREADY: {
                    break;
                }
                case START_FAILED: {
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                }
            }
        }
    }

    private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    catch (Exception e) {
                        MQClientInstance.this.log.error("ScheduledTask fetchNameServerAddr exception", (Throwable)e);
                    }
                }
            }, 10000L, 120000L, TimeUnit.MILLISECONDS);
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                }
                catch (Exception e) {
                    MQClientInstance.this.log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", (Throwable)e);
                }
            }
        }, 10L, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                }
                catch (Exception e) {
                    MQClientInstance.this.log.error("ScheduledTask sendHeartbeatToAllBroker exception", (Throwable)e);
                }
            }
        }, 1000L, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                }
                catch (Exception e) {
                    MQClientInstance.this.log.error("ScheduledTask persistAllConsumerOffset exception", (Throwable)e);
                }
            }
        }, 10000L, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                }
                catch (Exception e) {
                    MQClientInstance.this.log.error("ScheduledTask adjustThreadPool exception", (Throwable)e);
                }
            }
        }, 1L, 1L, TimeUnit.MINUTES);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanOfflineBroker() {
        block9: {
            try {
                if (!this.lockNamesrv.tryLock(3000L, TimeUnit.MILLISECONDS)) break block9;
                try {
                    ConcurrentHashMap updatedTable = new ConcurrentHashMap();
                    Iterator<Map.Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
                    while (itBrokerTable.hasNext()) {
                        Map.Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
                        String brokerName = entry.getKey();
                        HashMap<Long, String> oneTable = entry.getValue();
                        HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
                        cloneAddrTable.putAll(oneTable);
                        Iterator it = cloneAddrTable.entrySet().iterator();
                        while (it.hasNext()) {
                            Map.Entry ee = it.next();
                            String addr = (String)ee.getValue();
                            if (this.isBrokerAddrExistInTopicRouteTable(addr)) continue;
                            it.remove();
                            this.log.info("the broker addr[{} {}] is offline, remove it", (Object)brokerName, (Object)addr);
                        }
                        if (cloneAddrTable.isEmpty()) {
                            itBrokerTable.remove();
                            this.log.info("the broker[{}] name's host is offline, remove it", (Object)brokerName);
                            continue;
                        }
                        updatedTable.put(brokerName, cloneAddrTable);
                    }
                    if (!updatedTable.isEmpty()) {
                        this.brokerAddrTable.putAll(updatedTable);
                    }
                }
                finally {
                    this.lockNamesrv.unlock();
                }
            }
            catch (InterruptedException e) {
                this.log.warn("cleanOfflineBroker Exception", (Throwable)e);
            }
        }
    }

    private boolean isBrokerAddrExistInTopicRouteTable(String addr) {
        for (Map.Entry<String, TopicRouteData> entry : this.topicRouteTable.entrySet()) {
            TopicRouteData topicRouteData = entry.getValue();
            List bds = topicRouteData.getBrokerDatas();
            for (BrokerData bd : bds) {
                boolean exist;
                if (bd.getBrokerAddrs() == null || !(exist = bd.getBrokerAddrs().containsValue(addr))) continue;
                return true;
            }
        }
        return false;
    }

    private void persistAllConsumerOffset() {
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            impl.persistConsumerOffset();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            }
            catch (Exception e) {
                this.log.error("sendHeartbeatToAllBroker exception", (Throwable)e);
            }
            finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            this.log.warn("lock heartBeat, but failed.");
        }
    }

    private void uploadFilterClassToAllFilterServer(String consumerGroup, String className, String topic) throws UnsupportedEncodingException {
        URL classFile = FilterAPI.classFile((String)className);
        byte[] classBody = null;
        int classCRC = 0;
        try {
            String fileContent = MixAll.file2String((URL)classFile);
            classBody = fileContent.getBytes("UTF-8");
            classCRC = UtilAll.crc32((byte[])classBody);
        }
        catch (Exception e1) {
            this.log.warn("uploadFilterClassToAllFilterServer Exception, ClassFile: {} ClassName: {} {}", new Object[]{classFile, className, RemotingHelper.exceptionSimpleDesc((Throwable)e1)});
        }
        TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
        if (topicRouteData != null && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
            for (Map.Entry next : topicRouteData.getFilterServerTable().entrySet()) {
                List value = (List)next.getValue();
                for (String fsAddr : value) {
                    try {
                        this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, className, classCRC, classBody, 5000L);
                        this.log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {} ClassFile: {}", new Object[]{fsAddr, consumerGroup, topic, className, classFile});
                    }
                    catch (Exception e) {
                        this.log.error("uploadFilterClassToAllFilterServer Exception", (Throwable)e);
                    }
                }
            }
        } else {
            this.log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}", new Object[]{consumerGroup, topic, className});
        }
    }

    private void uploadFilterClassSource() {
        for (Map.Entry<String, MQConsumerInner> next : this.consumerTable.entrySet()) {
            MQConsumerInner consumer = next.getValue();
            if (ConsumeType.CONSUME_PASSIVELY != consumer.consumeType()) continue;
            Set<SubscriptionData> subscriptions = consumer.subscriptions();
            for (SubscriptionData sub : subscriptions) {
                if (!sub.isClassFilterMode()) continue;
                String consumerGroup = consumer.groupName();
                String className = sub.getSubString();
                String topic = sub.getTopic();
                try {
                    this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic);
                }
                catch (Exception e) {
                    this.log.error("uploadFilterClassToAllFilterServer Exception", (Throwable)e);
                }
            }
        }
    }

    private void sendHeartbeatToAllBroker() {
        HeartbeatData heartbeatData = this.prepareHeartbeatData();
        boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
        boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
        if (producerEmpty && consumerEmpty) {
            this.log.warn("sending hearbeat, but no consumer and no producer");
            return;
        }
        for (Map.Entry<String, HashMap<Long, String>> entry : this.brokerAddrTable.entrySet()) {
            String brokerName = entry.getKey();
            HashMap<Long, String> oneTable = entry.getValue();
            if (oneTable == null) continue;
            for (Long id : oneTable.keySet()) {
                String addr = oneTable.get(id);
                if (addr == null || consumerEmpty && id != 0L) continue;
                try {
                    this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000L);
                    this.log.info("send heart beat to broker[{} {} {}] success", new Object[]{brokerName, id, addr});
                    this.log.info(heartbeatData.toString());
                }
                catch (Exception e) {
                    this.log.error("send heart beat to broker exception", (Throwable)e);
                }
            }
        }
    }

    private HeartbeatData prepareHeartbeatData() {
        Object impl;
        HeartbeatData heartbeatData = new HeartbeatData();
        heartbeatData.setClientID(this.clientId);
        for (String group : this.consumerTable.keySet()) {
            impl = this.consumerTable.get(group);
            if (impl == null) continue;
            ConsumerData consumerData = new ConsumerData();
            consumerData.setGroupName(impl.groupName());
            consumerData.setConsumeType(impl.consumeType());
            consumerData.setMessageModel(impl.messageModel());
            consumerData.setConsumeFromWhere(impl.consumeFromWhere());
            consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
            consumerData.setUnitMode(impl.isUnitMode());
            heartbeatData.getConsumerDataSet().add(consumerData);
        }
        for (String group : this.producerTable.keySet()) {
            impl = this.producerTable.get(group);
            if (impl == null) continue;
            ProducerData producerData = new ProducerData();
            producerData.setGroupName(group);
            heartbeatData.getProducerDataSet().add(producerData);
        }
        return heartbeatData;
    }

    public void updateTopicRouteInfoFromNameServer() {
        Object impl;
        HashSet<String> topicList = new HashSet<String>();
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            Set<SubscriptionData> subList;
            impl = entry.getValue();
            if (impl == null || (subList = impl.subscriptions()) == null) continue;
            for (SubscriptionData subData : subList) {
                topicList.add(subData.getTopic());
            }
        }
        for (Map.Entry<String, Object> entry : this.producerTable.entrySet()) {
            impl = (MQProducerInner)entry.getValue();
            if (impl == null) continue;
            Set<String> lst = impl.getPublishTopicList();
            topicList.addAll(lst);
        }
        for (String string : topicList) {
            this.updateTopicRouteInfoFromNameServer(string);
        }
    }

    public boolean updateTopicRouteInfoFromNameServer(String topic) {
        return this.updateTopicRouteInfoFromNameServer(topic, false, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean updateTopicRouteInfoFromNameServer(String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
        try {
            if (!this.lockNamesrv.tryLock(3000L, TimeUnit.MILLISECONDS)) {
                this.log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", (Object)3000L);
                return false;
            }
            try {
                TopicRouteData topicRouteData;
                if (isDefault && defaultMQProducer != null) {
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 3000L);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 3000L);
                }
                if (topicRouteData != null) {
                    Object impl;
                    TopicRouteData old = this.topicRouteTable.get(topic);
                    boolean changed = this.topicRouteDataIsChange(old, topicRouteData);
                    if (!changed) {
                        changed = this.isNeedUpdateTopicRouteInfo(topic);
                    } else {
                        this.log.info("the topic[{}] route info changed, odl[{}] ,new[{}]", new Object[]{topic, old, topicRouteData});
                    }
                    if (!changed) return false;
                    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                    }
                    TopicPublishInfo publishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
                    for (Map.Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
                        impl = entry.getValue();
                        if (impl == null) continue;
                        impl.updateTopicPublishInfo(topic, publishInfo);
                    }
                    Set<MessageQueue> subscribeInfo = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                    Iterator<Map.Entry<String, Object>> it = this.consumerTable.entrySet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            this.log.info("topicRouteTable.put TopicRouteData[{}]", (Object)cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            boolean bl = true;
                            return bl;
                        }
                        Map.Entry<String, Object> entry = it.next();
                        impl = (MQConsumerInner)entry.getValue();
                        if (impl == null) continue;
                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                    }
                }
                this.log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", (Object)topic);
                return false;
            }
            catch (Exception e) {
                if (topic.startsWith("%RETRY%")) return false;
                if (topic.equals("TBW102")) return false;
                this.log.warn("updateTopicRouteInfoFromNameServer Exception", (Throwable)e);
                return false;
            }
            finally {
                this.lockNamesrv.unlock();
            }
        }
        catch (InterruptedException e) {
            this.log.warn("updateTopicRouteInfoFromNameServer Exception", (Throwable)e);
        }
        return false;
    }

    private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
        if (olddata == null || nowdata == null) {
            return true;
        }
        TopicRouteData old = olddata.cloneTopicRouteData();
        TopicRouteData now = nowdata.cloneTopicRouteData();
        Collections.sort(old.getQueueDatas());
        Collections.sort(old.getBrokerDatas());
        Collections.sort(now.getQueueDatas());
        Collections.sort(now.getBrokerDatas());
        return !old.equals((Object)now);
    }

    public static TopicPublishInfo topicRouteData2TopicPublishInfo(String topic, TopicRouteData route) {
        TopicPublishInfo info = new TopicPublishInfo();
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
            String[] brokers;
            for (String broker : brokers = route.getOrderTopicConf().split(";")) {
                String[] item = broker.split(":");
                int nums = Integer.parseInt(item[1]);
                for (int i = 0; i < nums; ++i) {
                    MessageQueue mq = new MessageQueue(topic, item[0], i);
                    info.getMessageQueueList().add(mq);
                }
            }
            info.setOrderTopic(true);
        } else {
            List qds = route.getQueueDatas();
            Collections.sort(qds);
            for (QueueData qd : qds) {
                if (!PermName.isWriteable((int)qd.getPerm())) continue;
                BrokerData brokerData = null;
                for (BrokerData bd : route.getBrokerDatas()) {
                    if (!bd.getBrokerName().equals(qd.getBrokerName())) continue;
                    brokerData = bd;
                    break;
                }
                if (null == brokerData || !brokerData.getBrokerAddrs().containsKey(0L)) continue;
                for (int i = 0; i < qd.getWriteQueueNums(); ++i) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    info.getMessageQueueList().add(mq);
                }
            }
            info.setOrderTopic(false);
        }
        return info;
    }

    public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(String topic, TopicRouteData route) {
        HashSet<MessageQueue> mqList = new HashSet<MessageQueue>();
        List qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            if (!PermName.isReadable((int)qd.getPerm())) continue;
            for (int i = 0; i < qd.getReadQueueNums(); ++i) {
                MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                mqList.add(mq);
            }
        }
        return mqList;
    }

    private boolean isNeedUpdateTopicRouteInfo(String topic) {
        Object impl;
        Map.Entry<String, Object> entry;
        boolean result = false;
        Iterator<Map.Entry<String, Object>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext() && !result) {
            entry = it.next();
            impl = entry.getValue();
            if (impl == null) continue;
            result = impl.isPublishTopicNeedUpdate(topic);
        }
        it = this.consumerTable.entrySet().iterator();
        while (it.hasNext() && !result) {
            entry = it.next();
            impl = (MQConsumerInner)entry.getValue();
            if (impl == null) continue;
            result = impl.isSubscribeTopicNeedUpdate(topic);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        if (!this.consumerTable.isEmpty()) {
            return;
        }
        if (!this.adminExtTable.isEmpty()) {
            return;
        }
        if (this.producerTable.size() > 1) {
            return;
        }
        MQClientInstance mQClientInstance = this;
        synchronized (mQClientInstance) {
            switch (this.serviceState) {
                case CREATE_JUST: {
                    break;
                }
                case RUNNING: {
                    this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
                    this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                    this.pullMessageService.shutdown(true);
                    this.scheduledExecutorService.shutdown();
                    this.mQClientAPIImpl.shutdown();
                    this.rebalanceService.shutdown();
                    if (this.datagramSocket != null) {
                        this.datagramSocket.close();
                        this.datagramSocket = null;
                    }
                    MQClientManager.getInstance().removeClientFactory(this.clientId);
                    this.log.info("the client factory [{}] shutdown OK", (Object)this.clientId);
                    break;
                }
                case SHUTDOWN_ALREADY: {
                    break;
                }
            }
        }
    }

    public boolean registerConsumer(String group, MQConsumerInner consumer) {
        if (null == group || null == consumer) {
            return false;
        }
        MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
        if (prev != null) {
            this.log.warn("the consumer group[" + group + "] exist already.");
            return false;
        }
        return true;
    }

    public void unregisterConsumer(String group) {
        this.consumerTable.remove(group);
        this.unregisterClientWithLock(null, group);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unregisterClientWithLock(String producerGroup, String consumerGroup) {
        block8: {
            try {
                if (this.lockHeartbeat.tryLock(3000L, TimeUnit.MILLISECONDS)) {
                    try {
                        this.unregisterClient(producerGroup, consumerGroup);
                        break block8;
                    }
                    catch (Exception e) {
                        this.log.error("unregisterClient exception", (Throwable)e);
                        break block8;
                    }
                    finally {
                        this.lockHeartbeat.unlock();
                    }
                }
                this.log.warn("lock heartBeat, but failed.");
            }
            catch (InterruptedException e) {
                this.log.warn("unregisterClientWithLock exception", (Throwable)e);
            }
        }
    }

    private void unregisterClient(String producerGroup, String consumerGroup) {
        for (Map.Entry<String, HashMap<Long, String>> entry : this.brokerAddrTable.entrySet()) {
            String brokerName = entry.getKey();
            HashMap<Long, String> oneTable = entry.getValue();
            if (oneTable == null) continue;
            for (Long id : oneTable.keySet()) {
                String addr = oneTable.get(id);
                if (addr == null) continue;
                try {
                    this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000L);
                    this.log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", new Object[]{producerGroup, consumerGroup, brokerName, id, addr});
                }
                catch (RemotingException e) {
                    this.log.error("unregister client exception from broker: " + addr, (Throwable)e);
                }
                catch (MQBrokerException e) {
                    this.log.error("unregister client exception from broker: " + addr, (Throwable)e);
                }
                catch (InterruptedException e) {
                    this.log.error("unregister client exception from broker: " + addr, (Throwable)e);
                }
            }
        }
    }

    public boolean registerProducer(String group, DefaultMQProducerImpl producer) {
        if (null == group || null == producer) {
            return false;
        }
        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
        if (prev != null) {
            this.log.warn("the producer group[{}] exist already.", (Object)group);
            return false;
        }
        return true;
    }

    public void unregisterProducer(String group) {
        this.producerTable.remove(group);
        this.unregisterClientWithLock(group, null);
    }

    public boolean registerAdminExt(String group, MQAdminExtInner admin) {
        if (null == group || null == admin) {
            return false;
        }
        MQAdminExtInner prev = this.adminExtTable.putIfAbsent(group, admin);
        if (prev != null) {
            this.log.warn("the admin group[{}] exist already.", (Object)group);
            return false;
        }
        return true;
    }

    public void unregisterAdminExt(String group) {
        this.adminExtTable.remove(group);
    }

    public void rebalanceImmediately() {
        this.rebalanceService.wakeup();
    }

    public void doRebalance() {
        for (String group : this.consumerTable.keySet()) {
            MQConsumerInner impl = this.consumerTable.get(group);
            if (impl == null) continue;
            try {
                impl.doRebalance();
            }
            catch (Exception e) {
                this.log.error("doRebalance exception", (Throwable)e);
            }
        }
    }

    public MQProducerInner selectProducer(String group) {
        return this.producerTable.get(group);
    }

    public MQConsumerInner selectConsumer(String group) {
        return this.consumerTable.get(group);
    }

    public FindBrokerResult findBrokerAddressInAdmin(String brokerName) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;
        HashMap<Long, String> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            for (Map.Entry<Long, String> entry : map.entrySet()) {
                Long id = entry.getKey();
                brokerAddr = entry.getValue();
                if (brokerAddr == null) continue;
                found = true;
                if (0L == id) {
                    slave = false;
                    break;
                }
                slave = true;
                break;
            }
        }
        if (found) {
            return new FindBrokerResult(brokerAddr, slave);
        }
        return null;
    }

    public String findBrokerAddressInPublish(String brokerName) {
        HashMap<Long, String> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            return map.get(0L);
        }
        return null;
    }

    public FindBrokerResult findBrokerAddressInSubscribe(String brokerName, long brokerId, boolean onlyThisBroker) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;
        HashMap<Long, String> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            brokerAddr = map.get(brokerId);
            slave = brokerId != 0L;
            boolean bl = found = brokerAddr != null;
            if (!found && !onlyThisBroker) {
                Map.Entry<Long, String> entry = map.entrySet().iterator().next();
                brokerAddr = entry.getValue();
                slave = entry.getKey() != 0L;
                found = true;
            }
        }
        if (found) {
            return new FindBrokerResult(brokerAddr, slave);
        }
        return null;
    }

    public List<String> findConsumerIdList(String topic, String group) {
        String brokerAddr = this.findBrokerAddrByTopic(topic);
        if (null == brokerAddr) {
            this.updateTopicRouteInfoFromNameServer(topic);
            brokerAddr = this.findBrokerAddrByTopic(topic);
        }
        if (null != brokerAddr) {
            try {
                return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000L);
            }
            catch (Exception e) {
                this.log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, (Throwable)e);
            }
        }
        return null;
    }

    public String findBrokerAddrByTopic(String topic) {
        List brokers;
        TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
        if (topicRouteData != null && !(brokers = topicRouteData.getBrokerDatas()).isEmpty()) {
            BrokerData bd = (BrokerData)brokers.get(0);
            return bd.selectBrokerAddr();
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
        DefaultMQPushConsumerImpl consumer = null;
        try {
            MQConsumerInner impl = this.consumerTable.get(group);
            if (impl == null || !(impl instanceof DefaultMQPushConsumerImpl)) {
                this.log.info("[reset-offset] consumer dose not exist. group={}", (Object)group);
                return;
            }
            consumer = (DefaultMQPushConsumerImpl)impl;
            ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
            for (MessageQueue mq : processQueueTable.keySet()) {
                if (!topic.equals(mq.getTopic())) continue;
                ProcessQueue pq = processQueueTable.get(mq);
                pq.setDroped(true);
                pq.clear();
            }
            for (MessageQueue mq : offsetTable.keySet()) {
                consumer.updateConsumeOffset(mq, offsetTable.get(mq));
                this.log.info("[reset-offset] reset offsetTable. topic={}, group={}, mq={}, offset={}", new Object[]{topic, group, mq, offsetTable.get(mq)});
            }
            consumer.getOffsetStore().persistAll(offsetTable.keySet());
            try {
                TimeUnit.SECONDS.sleep(10L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            for (MessageQueue mq : offsetTable.keySet()) {
                consumer.updateConsumeOffset(mq, offsetTable.get(mq));
                this.log.info("[reset-offset] reset offsetTable. topic={}, group={}, mq={}, offset={}", new Object[]{topic, group, mq, offsetTable.get(mq)});
            }
            consumer.getOffsetStore().persistAll(offsetTable.keySet());
            Iterator<MessageQueue> iterator = offsetTable.keySet().iterator();
            processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
            while (iterator.hasNext()) {
                MessageQueue mq;
                mq = iterator.next();
                processQueueTable.remove(mq);
            }
        }
        finally {
            consumer.getRebalanceImpl().doRebalance();
        }
    }

    public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
        MQConsumerInner impl = this.consumerTable.get(group);
        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)impl;
            return consumer.getOffsetStore().cloneOffsetTable(topic);
        }
        if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {
            DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl)impl;
            return consumer.getOffsetStore().cloneOffsetTable(topic);
        }
        return Collections.EMPTY_MAP;
    }

    public TopicRouteData getAnExistTopicRouteData(String topic) {
        return this.topicRouteTable.get(topic);
    }

    public MQClientAPIImpl getMQClientAPIImpl() {
        return this.mQClientAPIImpl;
    }

    public MQAdminImpl getMQAdminImpl() {
        return this.mQAdminImpl;
    }

    public String getClientId() {
        return this.clientId;
    }

    public long getBootTimestamp() {
        return this.bootTimestamp;
    }

    public ScheduledExecutorService getScheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    public PullMessageService getPullMessageService() {
        return this.pullMessageService;
    }

    public DefaultMQProducer getDefaultMQProducer() {
        return this.defaultMQProducer;
    }

    public ConcurrentHashMap<String, TopicRouteData> getTopicRouteTable() {
        return this.topicRouteTable;
    }

    public void adjustThreadPool() {
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl == null) continue;
            try {
                if (!(impl instanceof DefaultMQPushConsumerImpl)) continue;
                DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl)impl;
                dmq.adjustThreadPool();
            }
            catch (Exception e) {}
        }
    }

    public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String consumerGroup, String brokerName) {
        MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
        if (null != mqConsumerInner) {
            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)mqConsumerInner;
            ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);
            return result;
        }
        return null;
    }

    public ConsumerRunningInfo consumerRunningInfo(String consumerGroup) {
        MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
        ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo();
        List nsList = this.mQClientAPIImpl.getRemotingClient().getNameServerAddressList();
        String nsAddr = "";
        if (nsList != null) {
            for (String addr : nsList) {
                nsAddr = nsAddr + addr + ";";
            }
        }
        consumerRunningInfo.getProperties().put("PROP_NAMESERVER_ADDR", nsAddr);
        consumerRunningInfo.getProperties().put("PROP_CONSUME_TYPE", mqConsumerInner.consumeType());
        consumerRunningInfo.getProperties().put("PROP_CLIENT_VERSION", MQVersion.getVersionDesc((int)MQVersion.CurrentVersion));
        return consumerRunningInfo;
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.consumerStatsManager;
    }
}

