/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.client.impl.consumer;

import com.alibaba.rocketmq.client.QueryResult;
import com.alibaba.rocketmq.client.Validators;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.PullCallback;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
import com.alibaba.rocketmq.client.hook.ConsumeMessageHook;
import com.alibaba.rocketmq.client.hook.FilterMessageHook;
import com.alibaba.rocketmq.client.impl.CommunicationMode;
import com.alibaba.rocketmq.client.impl.MQClientManager;
import com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageService;
import com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner;
import com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper;
import com.alibaba.rocketmq.client.impl.consumer.PullRequest;
import com.alibaba.rocketmq.client.impl.consumer.RebalanceImpl;
import com.alibaba.rocketmq.client.impl.consumer.RebalancePushImpl;
import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
import com.alibaba.rocketmq.client.log.ClientLogger;
import com.alibaba.rocketmq.client.stat.ConsumerStatManager;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.ServiceState;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.filter.FilterAPI;
import com.alibaba.rocketmq.common.help.FAQUrl;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageAccessor;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;

public class DefaultMQPushConsumerImpl
implements MQConsumerInner {
    private static final long PullTimeDelayMillsWhenException = 3000L;
    private static final long PullTimeDelayMillsWhenFlowControl = 50L;
    private static final long PullTimeDelayMillsWhenSuspend = 1000L;
    private static final long BrokerSuspendMaxTimeMillis = 15000L;
    private static final long ConsumerTimeoutMillisWhenSuspend = 30000L;
    private final Logger log = ClientLogger.getLog();
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    private final ConsumerStatManager consumerStatManager = new ConsumerStatManager();
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private MQClientInstance mQClientFactory;
    private PullAPIWrapper pullAPIWrapper;
    private volatile boolean pause = false;
    private boolean consumeOrderly = false;
    private MessageListener messageListenerInner;
    private OffsetStore offsetStore;
    private ConsumeMessageService consumeMessageService;
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList();
    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList();
    private long flowControlTimes1 = 0L;
    private long flowControlTimes2 = 0L;

    public void registerFilterMessageHook(FilterMessageHook hook) {
        this.filterMessageHookList.add(hook);
        this.log.info("register FilterMessageHook Hook, {}", (Object)hook.hookName());
    }

    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer) {
        this.defaultMQPushConsumer = defaultMQPushConsumer;
    }

    public boolean hasHook() {
        return !this.consumeMessageHookList.isEmpty();
    }

    public void registerConsumeMessageHook(ConsumeMessageHook hook) {
        this.consumeMessageHookList.add(hook);
        this.log.info("register consumeMessageHook Hook, {}", (Object)hook.hookName());
    }

    public void executeHookBefore(ConsumeMessageContext context) {
        if (!this.consumeMessageHookList.isEmpty()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
                    hook.consumeMessageBefore(context);
                }
                catch (Throwable e) {}
            }
        }
    }

    public void executeHookAfter(ConsumeMessageContext context) {
        if (!this.consumeMessageHookList.isEmpty()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
                    hook.consumeMessageAfter(context);
                }
                catch (Throwable e) {}
            }
        }
    }

    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum);
    }

    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
        if (null == result) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
        }
        if (null == result) {
            throw new MQClientException("The topic[" + topic + "] not exist", null);
        }
        return result;
    }

    public DefaultMQPushConsumer getDefaultMQPushConsumer() {
        return this.defaultMQPushConsumer;
    }

    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
        return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
    }

    public long maxOffset(MessageQueue mq) throws MQClientException {
        return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
    }

    public long minOffset(MessageQueue mq) throws MQClientException {
        return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
    }

    public OffsetStore getOffsetStore() {
        return this.offsetStore;
    }

    public void setOffsetStore(OffsetStore offsetStore) {
        this.offsetStore = offsetStore;
    }

    @Override
    public String groupName() {
        return this.defaultMQPushConsumer.getConsumerGroup();
    }

    @Override
    public MessageModel messageModel() {
        return this.defaultMQPushConsumer.getMessageModel();
    }

    @Override
    public ConsumeType consumeType() {
        return ConsumeType.CONSUME_PASSIVELY;
    }

    @Override
    public ConsumeFromWhere consumeFromWhere() {
        return this.defaultMQPushConsumer.getConsumeFromWhere();
    }

    @Override
    public Set<SubscriptionData> subscriptions() {
        HashSet<SubscriptionData> subSet = new HashSet<SubscriptionData>();
        subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
        return subSet;
    }

    @Override
    public void doRebalance() {
        if (this.rebalanceImpl != null) {
            this.rebalanceImpl.doRebalance();
        }
    }

    @Override
    public void persistConsumerOffset() {
        try {
            this.makeSureStateOK();
            HashSet<MessageQueue> mqs = new HashSet<MessageQueue>();
            Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
            if (allocateMq != null) {
                mqs.addAll(allocateMq);
            }
            this.offsetStore.persistAll(mqs);
        }
        catch (Exception e) {
            this.log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", (Throwable)e);
        }
    }

    @Override
    public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
        ConcurrentHashMap<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null && subTable.containsKey(topic)) {
            this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);
        }
    }

    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
        return this.rebalanceImpl.getSubscriptionInner();
    }

    @Override
    public boolean isSubscribeTopicNeedUpdate(String topic) {
        ConcurrentHashMap<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null && subTable.containsKey(topic)) {
            return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
        }
        return false;
    }

    private void correctTagsOffset(PullRequest pullRequest) {
        if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
            this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
        }
    }

    public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDroped()) {
            this.log.info("the pull request[{}] is droped.", (Object)pullRequest.toString());
            return;
        }
        try {
            this.makeSureStateOK();
        }
        catch (MQClientException e) {
            this.log.warn("pullMessage exception, consumer state not ok", (Throwable)e);
            this.executePullRequestLater(pullRequest, 3000L);
            return;
        }
        if (this.isPause()) {
            this.log.warn("consumer was paused, execute pull request later. instanceName={}", (Object)this.defaultMQPushConsumer.getInstanceName());
            this.executePullRequestLater(pullRequest, 1000L);
            return;
        }
        long size = processQueue.getMsgCount().get();
        if (size > (long)this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, 50L);
            if (this.flowControlTimes1++ % 1000L == 0L) {
                this.log.warn("the consumer message buffer is full, so do flow control, {} {} {}", new Object[]{size, pullRequest, this.flowControlTimes1});
            }
            return;
        }
        if (!this.consumeOrderly && processQueue.getMaxSpan() > (long)this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
            this.executePullRequestLater(pullRequest, 50L);
            if (this.flowControlTimes2++ % 1000L == 0L) {
                this.log.warn("the queue's messages, span too long, so do flow control, {} {} {}", new Object[]{processQueue.getMaxSpan(), pullRequest, this.flowControlTimes2});
            }
            return;
        }
        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, 3000L);
            this.log.warn("find the consumer's subscription failed, {}", (Object)pullRequest);
            return;
        }
        final long beginTimestamp = System.currentTimeMillis();
        PullCallback pullCallback = new PullCallback(){

            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND: {
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatManager().getConsumertat().getPullTimesTotal().incrementAndGet();
                            DefaultMQPushConsumerImpl.this.getConsumerStatManager().getConsumertat().getPullRTTotal().addAndGet(pullRT);
                            boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispathToConsume);
                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0L) {
                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                break;
                            }
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        }
                        case NO_NEW_MSG: {
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        }
                        case NO_MATCHED_MSG: {
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        }
                        case OFFSET_ILLEGAL: {
                            DefaultMQPushConsumerImpl.this.log.warn("the pull request offset illegal, {} {}", (Object)pullRequest.toString(), (Object)pullResult.toString());
                            if (pullRequest.getNextOffset() < pullResult.getMinOffset()) {
                                pullRequest.setNextOffset(pullResult.getMinOffset());
                            } else if (pullRequest.getNextOffset() > pullResult.getMaxOffset()) {
                                pullRequest.setNextOffset(pullResult.getMaxOffset());
                            }
                            pullRequest.getProcessQueue().setDroped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable(){

                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.log.warn("fix the pull request offset, {}", (Object)pullRequest);
                                    }
                                    catch (Throwable e) {
                                        DefaultMQPushConsumerImpl.this.log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000L);
                            break;
                        }
                    }
                }
            }

            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith("%RETRY%")) {
                    DefaultMQPushConsumerImpl.this.log.warn("execute the pull request exception", e);
                }
                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 3000L);
            }
        };
        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel() && (commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY)) > 0L) {
            commitOffsetEnable = true;
        }
        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }
            classFilter = sd.isClassFilterMode();
        }
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
        int sysFlag = PullSysFlag.buildSysFlag((boolean)commitOffsetEnable, (boolean)true, (subExpression != null ? 1 : 0) != 0, (boolean)classFilter);
        try {
            this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression, subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, 15000L, 30000L, CommunicationMode.ASYNC, pullCallback);
        }
        catch (Exception e) {
            this.log.error("pullKernelImpl exception", (Throwable)e);
            this.executePullRequestLater(pullRequest, 3000L);
        }
    }

    public void executePullRequestImmediately(PullRequest pullRequest) {
        this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
    }

    public void executeTaskLater(Runnable r, long timeDelay) {
        this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay);
    }

    private void executePullRequestLater(PullRequest pullRequest, long timeDelay) {
        this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
    }

    public boolean isPause() {
        return this.pause;
    }

    public void setPause(boolean pause) {
        this.pause = pause;
    }

    private void makeSureStateOK() throws MQClientException {
        if (this.serviceState != ServiceState.RUNNING) {
            throw new MQClientException("The consumer service state not OK, " + this.serviceState + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/214"), null);
        }
    }

    public ConsumerStatManager getConsumerStatManager() {
        return this.consumerStatManager;
    }

    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException {
        return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
    }

    public void registerMessageListener(MessageListener messageListener) {
        this.messageListenerInner = messageListener;
    }

    public void resume() {
        this.pause = false;
        this.log.info("resume this consumer, {}", (Object)this.defaultMQPushConsumer.getConsumerGroup());
    }

    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
        return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
    }

    public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 3000L);
        }
        catch (Exception e) {
            this.log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), (Throwable)e);
            Message newMsg = new Message(MixAll.getRetryTopic((String)this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
            newMsg.setFlag(msg.getFlag());
            MessageAccessor.setProperties((Message)newMsg, (Map)msg.getProperties());
            MessageAccessor.putProperty((Message)newMsg, (String)"RETRY_TOPIC", (String)msg.getTopic());
            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
        }
    }

    public void shutdown() {
        switch (this.serviceState) {
            case CREATE_JUST: {
                break;
            }
            case RUNNING: {
                this.consumeMessageService.shutdown();
                this.persistConsumerOffset();
                this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
                this.mQClientFactory.shutdown();
                this.log.info("the consumer [{}] shutdown OK", (Object)this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                break;
            }
            case SHUTDOWN_ALREADY: {
                break;
            }
        }
    }

    public void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST: {
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), this.isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING: {
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        }
                        case CLUSTERING: {
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        }
                    }
                }
                this.offsetStore.load();
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
                }
                this.consumeMessageService.start();
                boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/63"), null);
                }
                this.mQClientFactory.start();
                this.log.info("the consumer [{}] start OK", (Object)this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            }
            case RUNNING: 
            case SHUTDOWN_ALREADY: 
            case START_FAILED: {
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/214"), null);
            }
        }
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }

    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
        if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
            throw new MQClientException("consumerGroup is null" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (this.defaultMQPushConsumer.getConsumerGroup().equals("DEFAULT_CONSUMER")) {
            throw new MQClientException("consumerGroup can not equal DEFAULT_CONSUMER, please specify another one." + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (null == this.defaultMQPushConsumer.getMessageModel()) {
            throw new MQClientException("messageModel is null" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
            throw new MQClientException("consumeFromWhere is null" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        Date dt = UtilAll.parseDate((String)this.defaultMQPushConsumer.getConsumeTimestamp(), (String)"yyyyMMddHHmmss");
        if (null == dt) {
            throw new MQClientException("consumeTimestamp is invalid, yyyyMMddHHmmss" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {
            throw new MQClientException("allocateMessageQueueStrategy is null" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (null == this.defaultMQPushConsumer.getSubscription()) {
            throw new MQClientException("subscription is null" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (null == this.defaultMQPushConsumer.getMessageListener()) {
            throw new MQClientException("messageListener is null" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
        boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
        if (!orderly && !concurrently) {
            throw new MQClientException("messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000 || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
            throw new MQClientException("consumeThreadMin Out of range [1, 1000]" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
            throw new MQClientException("consumeThreadMax Out of range [1, 1000]" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1 || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
            throw new MQClientException("consumeConcurrentlyMaxSpan Out of range [1, 65535]" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {
            throw new MQClientException("pullThresholdForQueue Out of range [1, 65535]" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (this.defaultMQPushConsumer.getPullInterval() < 0L || this.defaultMQPushConsumer.getPullInterval() > 65535L) {
            throw new MQClientException("pullInterval Out of range [0, 65535]" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1 || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
            throw new MQClientException("consumeMessageBatchMaxSize Out of range [1, 1024]" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
            throw new MQClientException("pullBatchSize Out of range [1, 1024]" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
    }

    private void copySubscription() throws MQClientException {
        try {
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (Map.Entry<String, String> entry : sub.entrySet()) {
                    String topic = entry.getKey();
                    String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)this.defaultMQPushConsumer.getConsumerGroup(), (String)topic, (String)subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }
            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING: {
                    break;
                }
                case CLUSTERING: {
                    String retryTopic = MixAll.getRetryTopic((String)this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)this.defaultMQPushConsumer.getConsumerGroup(), (String)retryTopic, (String)"*");
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                }
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

    public MessageListener getMessageListenerInner() {
        return this.messageListenerInner;
    }

    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
        ConcurrentHashMap<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (Map.Entry entry : subTable.entrySet()) {
                String topic = (String)entry.getKey();
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            }
        }
    }

    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)this.defaultMQPushConsumer.getConsumerGroup(), (String)topic, (String)subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

    public void suspend() {
        this.pause = true;
        this.log.info("suspend this consumer, {}", (Object)this.defaultMQPushConsumer.getConsumerGroup());
    }

    public void unsubscribe(String topic) {
        this.rebalanceImpl.getSubscriptionInner().remove(topic);
    }

    public void updateConsumeOffset(MessageQueue mq, long offset) {
        this.offsetStore.updateOffset(mq, offset, false);
    }

    public void updateCorePoolSize(int corePoolSize) {
        this.consumeMessageService.updateCorePoolSize(corePoolSize);
    }

    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
    }

    public RebalanceImpl getRebalanceImpl() {
        return this.rebalanceImpl;
    }

    public boolean isConsumeOrderly() {
        return this.consumeOrderly;
    }

    public void setConsumeOrderly(boolean consumeOrderly) {
        this.consumeOrderly = consumeOrderly;
    }

    @Override
    public boolean isUnitMode() {
        return this.defaultMQPushConsumer.isUnitMode();
    }

    public void resetOffsetByTimeStamp(long timeStamp) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        for (String topic : this.rebalanceImpl.getSubscriptionInner().keySet()) {
            Set<MessageQueue> mqs = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
            HashMap<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
            for (MessageQueue mq : mqs) {
                long offset = this.searchOffset(mq, timeStamp);
                offsetTable.put(mq, offset);
            }
            this.mQClientFactory.resetOffset(topic, this.groupName(), offsetTable);
        }
    }

    public MQClientInstance getmQClientFactory() {
        return this.mQClientFactory;
    }

    public void setmQClientFactory(MQClientInstance mQClientFactory) {
        this.mQClientFactory = mQClientFactory;
    }

    public ServiceState getServiceState() {
        return this.serviceState;
    }

    public void setServiceState(ServiceState serviceState) {
        this.serviceState = serviceState;
    }

    private long computeDuijiTotal() {
        long msgDuijiCntTotal = 0L;
        ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
        for (Map.Entry<MessageQueue, ProcessQueue> next : processQueueTable.entrySet()) {
            ProcessQueue value = next.getValue();
            msgDuijiCntTotal += value.getMsgDuijiCnt();
        }
        return msgDuijiCntTotal;
    }

    public void adjustThreadPool() {
        long computeDuijiTotal = this.computeDuijiTotal();
        long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
        long incThreshold = (long)((double)adjustThreadPoolNumsThreshold * 1.0);
        long decThreshold = (long)((double)adjustThreadPoolNumsThreshold * 0.8);
        if (computeDuijiTotal >= incThreshold) {
            this.consumeMessageService.incCorePoolSize();
        }
        if (computeDuijiTotal < decThreshold) {
            this.consumeMessageService.decCorePoolSize();
        }
    }
}

