/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.client.impl.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
import com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageService;
import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import com.alibaba.rocketmq.client.impl.consumer.MessageQueueLock;
import com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.alibaba.rocketmq.client.log.ClientLogger;
import com.alibaba.rocketmq.client.stat.ConsumerStat;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;

public class ConsumeMessageOrderlyService
implements ConsumeMessageService {
    private static final Logger log = ClientLogger.getLog();
    private static final long MaxTimeConsumeContinuously = Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
    private volatile boolean stoped = false;
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final MessageListenerOrderly messageListener;
    private final BlockingQueue<Runnable> consumeRequestQueue;
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;
    private final MessageQueueLock messageQueueLock = new MessageQueueLock();
    private final ScheduledExecutorService scheduledExecutorService;

    public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;
        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
        this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactory(){
            private AtomicLong threadIndex = new AtomicLong(0L);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "ConsumeMessageThread-" + ConsumeMessageOrderlyService.this.consumerGroup + "-" + this.threadIndex.incrementAndGet());
            }
        });
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "ConsumeMessageScheduledThread-" + ConsumeMessageOrderlyService.this.consumerGroup);
            }
        });
    }

    @Override
    public void start() {
        if (MessageModel.CLUSTERING.equals((Object)this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000L, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void shutdown() {
        this.stoped = true;
        this.scheduledExecutorService.shutdown();
        this.consumeExecutor.shutdown();
        if (MessageModel.CLUSTERING.equals((Object)this.defaultMQPushConsumerImpl.messageModel())) {
            this.unlockAllMQ();
        }
    }

    public synchronized void unlockAllMQ() {
        this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
    }

    public synchronized void lockMQPeriodically() {
        if (!this.stoped) {
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }

    public synchronized boolean lockOneMQ(MessageQueue mq) {
        if (!this.stoped) {
            return this.defaultMQPushConsumerImpl.getRebalanceImpl().lock(mq);
        }
        return false;
    }

    public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue, long delayMills) {
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq);
                if (lockOK) {
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10L);
                } else {
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000L);
                }
            }
        }, delayMills, TimeUnit.MILLISECONDS);
    }

    public ConsumerStat getConsumerStat() {
        return this.defaultMQPushConsumerImpl.getConsumerStatManager().getConsumertat();
    }

    public boolean processConsumeResult(List<MessageExt> msgs, ConsumeOrderlyStatus status, ConsumeOrderlyContext context, ConsumeRequest consumeRequest) {
        boolean continueConsume = true;
        long commitOffset = -1L;
        if (context.isAutoCommit()) {
            switch (status) {
                case COMMIT: 
                case ROLLBACK: {
                    log.warn("the message queue consume result is illegal, we think you want to ack these message {}", (Object)consumeRequest.getMessageQueue());
                }
                case SUCCESS: {
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    this.getConsumerStat().getConsumeMsgOKTotal().addAndGet(msgs.size());
                    break;
                }
                case SUSPEND_CURRENT_QUEUE_A_MOMENT: {
                    consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                    this.submitConsumeRequestLater(consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                    this.getConsumerStat().getConsumeMsgFailedTotal().addAndGet(msgs.size());
                    break;
                }
            }
        } else {
            switch (status) {
                case SUCCESS: {
                    this.getConsumerStat().getConsumeMsgOKTotal().addAndGet(msgs.size());
                    break;
                }
                case COMMIT: {
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    this.getConsumerStat().getConsumeMsgOKTotal().addAndGet(msgs.size());
                    break;
                }
                case ROLLBACK: {
                    consumeRequest.getProcessQueue().rollback();
                    this.submitConsumeRequestLater(consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                    this.getConsumerStat().getConsumeMsgFailedTotal().addAndGet(msgs.size());
                    break;
                }
                case SUSPEND_CURRENT_QUEUE_A_MOMENT: {
                    consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                    this.submitConsumeRequestLater(consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                    this.getConsumerStat().getConsumeMsgFailedTotal().addAndGet(msgs.size());
                    break;
                }
            }
        }
        if (commitOffset >= 0L) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
        }
        return continueConsume;
    }

    private void submitConsumeRequestLater(final ProcessQueue processQueue, final MessageQueue messageQueue, long suspendTimeMillis) {
        long timeMillis = suspendTimeMillis;
        if (timeMillis < 10L) {
            timeMillis = 10L;
        } else if (timeMillis > 30000L) {
            timeMillis = 30000L;
        }
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true);
            }
        }, timeMillis, TimeUnit.MILLISECONDS);
    }

    @Override
    public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispathToConsume) {
        if (dispathToConsume) {
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }

    @Override
    public void updateCorePoolSize(int corePoolSize) {
        if (corePoolSize > 0 && corePoolSize <= Short.MAX_VALUE) {
            this.consumeExecutor.setCorePoolSize(corePoolSize);
        }
    }

    class ConsumeRequest
    implements Runnable {
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;

        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object objLock;
            if (this.processQueue.isDroped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", (Object)this.messageQueue);
                return;
            }
            Object object = objLock = ConsumeMessageOrderlyService.this.messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (object) {
                if (MessageModel.BROADCASTING.equals((Object)ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || this.processQueue.isLocked() || !this.processQueue.isLockExpired()) {
                    long beginTime = System.currentTimeMillis();
                    boolean continueConsume = true;
                    while (continueConsume) {
                        if (this.processQueue.isDroped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", (Object)this.messageQueue);
                            break;
                        }
                        if (MessageModel.CLUSTERING.equals((Object)ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", (Object)this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10L);
                            break;
                        }
                        if (MessageModel.CLUSTERING.equals((Object)ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}", (Object)this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10L);
                            break;
                        }
                        long interval = System.currentTimeMillis() - beginTime;
                        if (interval > MaxTimeConsumeContinuously) {
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(this.processQueue, this.messageQueue, 10L);
                            break;
                        }
                        int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                        if (!msgs.isEmpty()) {
                            ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
                            ConsumeOrderlyStatus status = null;
                            ConsumeMessageContext consumeMessageContext = null;
                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                consumeMessageContext = new ConsumeMessageContext();
                                consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                                consumeMessageContext.setMq(this.messageQueue);
                                consumeMessageContext.setMsgList(msgs);
                                consumeMessageContext.setSuccess(false);
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                            }
                            long beginTimestamp = System.currentTimeMillis();
                            try {
                                this.processQueue.getLockConsume().lock();
                                if (this.processQueue.isDroped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", (Object)this.messageQueue);
                                    break;
                                }
                                status = ConsumeMessageOrderlyService.this.messageListener.consumeMessage(msgs, context);
                            }
                            catch (Throwable e) {
                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", new Object[]{RemotingHelper.exceptionSimpleDesc((Throwable)e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, this.messageQueue});
                            }
                            finally {
                                this.processQueue.getLockConsume().unlock();
                            }
                            if (null == status || ConsumeOrderlyStatus.ROLLBACK == status || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", new Object[]{ConsumeMessageOrderlyService.this.consumerGroup, msgs, this.messageQueue});
                            }
                            long consumeRT = System.currentTimeMillis() - beginTimestamp;
                            if (null == status) {
                                status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                            }
                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                            }
                            ConsumeMessageOrderlyService.this.getConsumerStat().getConsumeMsgRTTotal().addAndGet(consumeRT);
                            MixAll.compareAndIncreaseOnly((AtomicLong)ConsumeMessageOrderlyService.this.getConsumerStat().getConsumeMsgRTMax(), (long)consumeRT);
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                            continue;
                        }
                        continueConsume = false;
                    }
                } else {
                    if (this.processQueue.isDroped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", (Object)this.messageQueue);
                        return;
                    }
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100L);
                }
            }
        }

        public ProcessQueue getProcessQueue() {
            return this.processQueue;
        }

        public MessageQueue getMessageQueue() {
            return this.messageQueue;
        }
    }
}

