/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.client.impl.consumer;

import com.alibaba.rocketmq.client.QueryResult;
import com.alibaba.rocketmq.client.Validators;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullCallback;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.hook.FilterMessageHook;
import com.alibaba.rocketmq.client.impl.CommunicationMode;
import com.alibaba.rocketmq.client.impl.MQClientManager;
import com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner;
import com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper;
import com.alibaba.rocketmq.client.impl.consumer.RebalanceImpl;
import com.alibaba.rocketmq.client.impl.consumer.RebalancePullImpl;
import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
import com.alibaba.rocketmq.client.log.ClientLogger;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.ServiceState;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.filter.FilterAPI;
import com.alibaba.rocketmq.common.help.FAQUrl;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageAccessor;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
import com.alibaba.rocketmq.remoting.RPCHook;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;

public class DefaultMQPullConsumerImpl
implements MQConsumerInner {
    private final Logger log = ClientLogger.getLog();
    private final DefaultMQPullConsumer defaultMQPullConsumer;
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private MQClientInstance mQClientFactory;
    private PullAPIWrapper pullAPIWrapper;
    private OffsetStore offsetStore;
    private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
    private final long consumerStartTimestamp = System.currentTimeMillis();
    private final RPCHook rpcHook;
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList();

    public DefaultMQPullConsumerImpl(DefaultMQPullConsumer defaultMQPullConsumer, RPCHook rpcHook) {
        this.defaultMQPullConsumer = defaultMQPullConsumer;
        this.rpcHook = rpcHook;
    }

    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
        this.makeSureStateOK();
        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum);
    }

    private void makeSureStateOK() throws MQClientException {
        if (this.serviceState != ServiceState.RUNNING) {
            throw new MQClientException("The consumer service state not OK, " + this.serviceState + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/214"), null);
        }
    }

    public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {
        this.makeSureStateOK();
        return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
    }

    public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException {
        this.makeSureStateOK();
        if (null == topic) {
            throw new IllegalArgumentException("topic is null");
        }
        ConcurrentHashMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable();
        HashSet<MessageQueue> mqResult = new HashSet<MessageQueue>();
        for (MessageQueue mq : mqTable.keySet()) {
            if (!mq.getTopic().equals(topic)) continue;
            mqResult.add(mq);
        }
        return mqResult;
    }

    public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
        this.makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
    }

    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        this.makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
    }

    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
        this.makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
    }

    @Override
    public String groupName() {
        return this.defaultMQPullConsumer.getConsumerGroup();
    }

    @Override
    public MessageModel messageModel() {
        return this.defaultMQPullConsumer.getMessageModel();
    }

    @Override
    public ConsumeType consumeType() {
        return ConsumeType.CONSUME_ACTIVELY;
    }

    @Override
    public ConsumeFromWhere consumeFromWhere() {
        return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<SubscriptionData> subscriptions() {
        HashSet<SubscriptionData> result = new HashSet<SubscriptionData>();
        Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics();
        if (topics != null) {
            Set<String> set = topics;
            synchronized (set) {
                for (String t : topics) {
                    SubscriptionData ms = null;
                    try {
                        ms = FilterAPI.buildSubscriptionData((String)this.groupName(), (String)t, (String)"*");
                    }
                    catch (Exception e) {
                        this.log.error("parse subscription error", (Throwable)e);
                    }
                    ms.setSubVersion(0L);
                    result.add(ms);
                }
            }
        }
        return result;
    }

    @Override
    public void doRebalance() {
        if (this.rebalanceImpl != null) {
            this.rebalanceImpl.doRebalance();
        }
    }

    @Override
    public void persistConsumerOffset() {
        try {
            this.makeSureStateOK();
            HashSet<MessageQueue> mqs = new HashSet<MessageQueue>();
            Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
            if (allocateMq != null) {
                mqs.addAll(allocateMq);
            }
            this.offsetStore.persistAll(mqs);
        }
        catch (Exception e) {
            this.log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", (Throwable)e);
        }
    }

    @Override
    public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
        ConcurrentHashMap<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
        if (subTable != null && subTable.containsKey(topic)) {
            this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info);
        }
    }

    @Override
    public boolean isSubscribeTopicNeedUpdate(String topic) {
        ConcurrentHashMap<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
        if (subTable != null && subTable.containsKey(topic)) {
            return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
        }
        return false;
    }

    @Override
    public boolean isUnitMode() {
        return this.defaultMQPullConsumer.isUnitMode();
    }

    public long maxOffset(MessageQueue mq) throws MQClientException {
        this.makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
    }

    public long minOffset(MessageQueue mq) throws MQClientException {
        this.makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
    }

    public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pullSyncImpl(mq, subExpression, offset, maxNums, false);
    }

    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        SubscriptionData subscriptionData;
        this.makeSureStateOK();
        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }
        if (offset < 0L) {
            throw new MQClientException("offset < 0", null);
        }
        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }
        this.subscriptionAutomatically(mq.getTopic());
        int sysFlag = PullSysFlag.buildSysFlag((boolean)false, (boolean)block, (boolean)true, (boolean)false);
        try {
            subscriptionData = FilterAPI.buildSubscriptionData((String)this.defaultMQPullConsumer.getConsumerGroup(), (String)mq.getTopic(), (String)subExpression);
        }
        catch (Exception e) {
            throw new MQClientException("parse subscription error", e);
        }
        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : this.defaultMQPullConsumer.getConsumerPullTimeoutMillis();
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(mq, subscriptionData.getSubString(), 0L, offset, maxNums, sysFlag, 0L, this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis, CommunicationMode.SYNC, null);
        return this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
    }

    private void subscriptionAutomatically(String topic) {
        if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
            try {
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)this.defaultMQPullConsumer.getConsumerGroup(), (String)topic, (String)"*");
                this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
        this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false);
    }

    private void pullAsyncImpl(final MessageQueue mq, String subExpression, long offset, int maxNums, final PullCallback pullCallback, boolean block) throws MQClientException, RemotingException, InterruptedException {
        this.makeSureStateOK();
        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }
        if (offset < 0L) {
            throw new MQClientException("offset < 0", null);
        }
        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }
        if (null == pullCallback) {
            throw new MQClientException("pullCallback is null", null);
        }
        this.subscriptionAutomatically(mq.getTopic());
        try {
            SubscriptionData subscriptionData;
            int sysFlag = PullSysFlag.buildSysFlag((boolean)false, (boolean)block, (boolean)true, (boolean)false);
            try {
                subscriptionData = FilterAPI.buildSubscriptionData((String)this.defaultMQPullConsumer.getConsumerGroup(), (String)mq.getTopic(), (String)subExpression);
            }
            catch (Exception e) {
                throw new MQClientException("parse subscription error", e);
            }
            long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : this.defaultMQPullConsumer.getConsumerPullTimeoutMillis();
            this.pullAPIWrapper.pullKernelImpl(mq, subscriptionData.getSubString(), 0L, offset, maxNums, sysFlag, 0L, this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis, CommunicationMode.ASYNC, new PullCallback(){

                @Override
                public void onException(Throwable e) {
                    pullCallback.onException(e);
                }

                @Override
                public void onSuccess(PullResult pullResult) {
                    pullCallback.onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
                }
            });
        }
        catch (MQBrokerException e) {
            throw new MQClientException("pullAsync unknow exception", e);
        }
    }

    public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true);
    }

    public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
        this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true);
    }

    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException {
        this.makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
    }

    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
        this.makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
    }

    public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(msg, this.defaultMQPullConsumer.getConsumerGroup(), delayLevel, 3000L);
        }
        catch (Exception e) {
            this.log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), (Throwable)e);
            Message newMsg = new Message(MixAll.getRetryTopic((String)this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody());
            newMsg.setFlag(msg.getFlag());
            MessageAccessor.setProperties((Message)newMsg, (Map)msg.getProperties());
            MessageAccessor.putProperty((Message)newMsg, (String)"RETRY_TOPIC", (String)msg.getTopic());
            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
        }
    }

    public void shutdown() {
        switch (this.serviceState) {
            case CREATE_JUST: {
                break;
            }
            case RUNNING: {
                this.persistConsumerOffset();
                this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup());
                this.mQClientFactory.shutdown();
                this.log.info("the consumer [{}] shutdown OK", (Object)this.defaultMQPullConsumer.getConsumerGroup());
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                break;
            }
            case SHUTDOWN_ALREADY: {
                break;
            }
        }
    }

    public void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST: {
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPullConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup(), this.isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);
                if (this.defaultMQPullConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPullConsumer.getMessageModel()) {
                        case BROADCASTING: {
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                            break;
                        }
                        case CLUSTERING: {
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                            break;
                        }
                    }
                }
                this.offsetStore.load();
                boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/63"), null);
                }
                this.mQClientFactory.start();
                this.log.info("the consumer [{}] start OK", (Object)this.defaultMQPullConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            }
            case RUNNING: 
            case SHUTDOWN_ALREADY: 
            case START_FAILED: {
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/214"), null);
            }
        }
    }

    private void copySubscription() throws MQClientException {
        try {
            Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
            if (registerTopics != null) {
                for (String topic : registerTopics) {
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)this.defaultMQPullConsumer.getConsumerGroup(), (String)topic, (String)"*");
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPullConsumer.getConsumerGroup());
        if (null == this.defaultMQPullConsumer.getConsumerGroup()) {
            throw new MQClientException("consumerGroup is null" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (this.defaultMQPullConsumer.getConsumerGroup().equals("DEFAULT_CONSUMER")) {
            throw new MQClientException("consumerGroup can not equal DEFAULT_CONSUMER, please specify another one." + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (null == this.defaultMQPullConsumer.getMessageModel()) {
            throw new MQClientException("messageModel is null" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
        if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {
            throw new MQClientException("allocateMessageQueueStrategy is null" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/73"), null);
        }
    }

    public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
        this.makeSureStateOK();
        this.offsetStore.updateOffset(mq, offset, false);
    }

    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
    }

    public void registerFilterMessageHook(FilterMessageHook hook) {
        this.filterMessageHookList.add(hook);
        this.log.info("register FilterMessageHook Hook, {}", (Object)hook.hookName());
    }

    public DefaultMQPullConsumer getDefaultMQPullConsumer() {
        return this.defaultMQPullConsumer;
    }

    public OffsetStore getOffsetStore() {
        return this.offsetStore;
    }

    public void setOffsetStore(OffsetStore offsetStore) {
        this.offsetStore = offsetStore;
    }

    public PullAPIWrapper getPullAPIWrapper() {
        return this.pullAPIWrapper;
    }

    public void setPullAPIWrapper(PullAPIWrapper pullAPIWrapper) {
        this.pullAPIWrapper = pullAPIWrapper;
    }

    public ServiceState getServiceState() {
        return this.serviceState;
    }

    public void setServiceState(ServiceState serviceState) {
        this.serviceState = serviceState;
    }

    @Override
    public ConsumerRunningInfo consumerRunningInfo() {
        ConsumerRunningInfo info = new ConsumerRunningInfo();
        Properties prop = MixAll.object2Properties((Object)this.defaultMQPullConsumer);
        prop.put("PROP_CONSUMER_START_TIMESTAMP", (Object)this.consumerStartTimestamp);
        info.setProperties(prop);
        info.getSubscriptionSet().addAll(this.subscriptions());
        return info;
    }

    public long getConsumerStartTimestamp() {
        return this.consumerStartTimestamp;
    }
}

