/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.broker.transaction.queue;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InnerLoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;

public class TransactionalMessageBridge {
    private static final InternalLogger LOGGER = InnerLoggerFactory.getLogger((String)"RocketmqTransaction");
    private final ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap();
    private final BrokerController brokerController;
    private final MessageStore store;
    private final SocketAddress storeHost;

    public TransactionalMessageBridge(BrokerController brokerController, MessageStore store) {
        try {
            this.brokerController = brokerController;
            this.store = store;
            this.storeHost = new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController.getNettyServerConfig().getListenPort());
        }
        catch (Exception e) {
            LOGGER.error("Init TransactionBridge error", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public long fetchConsumeOffset(MessageQueue mq) {
        long offset = this.brokerController.getConsumerOffsetManager().queryOffset(TransactionalMessageUtil.buildConsumerGroup(), mq.getTopic(), mq.getQueueId());
        if (offset == -1L) {
            offset = this.store.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId());
        }
        return offset;
    }

    public Set<MessageQueue> fetchMessageQueues(String topic) {
        HashSet<MessageQueue> mqSet = new HashSet<MessageQueue>();
        TopicConfig topicConfig = this.selectTopicConfig(topic);
        if (topicConfig != null && topicConfig.getReadQueueNums() > 0) {
            for (int i = 0; i < topicConfig.getReadQueueNums(); ++i) {
                MessageQueue mq = new MessageQueue();
                mq.setTopic(topic);
                mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                mq.setQueueId(i);
                mqSet.add(mq);
            }
        }
        return mqSet;
    }

    public void updateConsumeOffset(MessageQueue mq, long offset) {
        this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseSocketAddressAddr((SocketAddress)this.storeHost), TransactionalMessageUtil.buildConsumerGroup(), mq.getTopic(), mq.getQueueId(), offset);
    }

    public PullResult getHalfMessage(int queueId, long offset, int nums) {
        String group = TransactionalMessageUtil.buildConsumerGroup();
        String topic = TransactionalMessageUtil.buildHalfTopic();
        SubscriptionData sub = new SubscriptionData(topic, "*");
        return this.getMessage(group, topic, queueId, offset, nums, sub);
    }

    public PullResult getOpMessage(int queueId, long offset, int nums) {
        String group = TransactionalMessageUtil.buildConsumerGroup();
        String topic = TransactionalMessageUtil.buildOpTopic();
        SubscriptionData sub = new SubscriptionData(topic, "*");
        return this.getMessage(group, topic, queueId, offset, nums, sub);
    }

    private PullResult getMessage(String group, String topic, int queueId, long offset, int nums, SubscriptionData sub) {
        GetMessageResult getMessageResult = this.store.getMessage(group, topic, queueId, offset, nums, null);
        if (getMessageResult != null) {
            PullStatus pullStatus = PullStatus.NO_NEW_MSG;
            List<MessageExt> foundList = null;
            switch (getMessageResult.getStatus()) {
                case FOUND: {
                    pullStatus = PullStatus.FOUND;
                    foundList = this.decodeMsgList(getMessageResult);
                    this.brokerController.getBrokerStatsManager().incGroupGetNums(group, topic, getMessageResult.getMessageCount());
                    this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic, getMessageResult.getBufferTotalSize());
                    this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
                    if (foundList == null || foundList.size() == 0) break;
                    this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1).getStoreTimestamp());
                    break;
                }
                case NO_MATCHED_MESSAGE: {
                    pullStatus = PullStatus.NO_MATCHED_MSG;
                    LOGGER.warn("No matched message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}", new Object[]{getMessageResult.getStatus(), topic, group, offset});
                    break;
                }
                case NO_MESSAGE_IN_QUEUE: 
                case OFFSET_OVERFLOW_ONE: {
                    pullStatus = PullStatus.NO_NEW_MSG;
                    LOGGER.warn("No new message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}", new Object[]{getMessageResult.getStatus(), topic, group, offset});
                    break;
                }
                case MESSAGE_WAS_REMOVING: 
                case NO_MATCHED_LOGIC_QUEUE: 
                case OFFSET_FOUND_NULL: 
                case OFFSET_OVERFLOW_BADLY: 
                case OFFSET_TOO_SMALL: {
                    pullStatus = PullStatus.OFFSET_ILLEGAL;
                    LOGGER.warn("Offset illegal. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}", new Object[]{getMessageResult.getStatus(), topic, group, offset});
                    break;
                }
                default: {
                    assert (false);
                    break;
                }
            }
            return new PullResult(pullStatus, getMessageResult.getNextBeginOffset(), getMessageResult.getMinOffset(), getMessageResult.getMaxOffset(), foundList);
        }
        LOGGER.error("Get message from store return null. topic={}, groupId={}, requestOffset={}", new Object[]{topic, group, offset});
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<MessageExt> decodeMsgList(GetMessageResult getMessageResult) {
        ArrayList<MessageExt> foundList = new ArrayList<MessageExt>();
        try {
            List messageBufferList = getMessageResult.getMessageBufferList();
            for (ByteBuffer bb : messageBufferList) {
                MessageExt msgExt = MessageDecoder.decode((ByteBuffer)bb, (boolean)true, (boolean)false);
                if (msgExt == null) continue;
                foundList.add(msgExt);
            }
        }
        finally {
            getMessageResult.release();
        }
        return foundList;
    }

    public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
        return this.store.putMessage(this.parseHalfMessageInner(messageInner));
    }

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        MessageAccessor.putProperty((Message)msgInner, (String)"REAL_TOPIC", (String)msgInner.getTopic());
        MessageAccessor.putProperty((Message)msgInner, (String)"REAL_QID", (String)String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(MessageSysFlag.resetTransactionValue((int)msgInner.getSysFlag(), (int)0));
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String((Map)msgInner.getProperties()));
        return msgInner;
    }

    public boolean putOpMessage(MessageExt messageExt, String opType) {
        MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(), this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
        if ("d".equals(opType)) {
            return this.addRemoveTagInTransactionOp(messageExt, messageQueue);
        }
        return true;
    }

    public PutMessageResult putMessageReturnResult(MessageExtBrokerInner messageInner) {
        LOGGER.debug("[BUG-TO-FIX] Thread:{} msgID:{}", (Object)Thread.currentThread().getName(), (Object)messageInner.getMsgId());
        return this.store.putMessage(messageInner);
    }

    public boolean putMessage(MessageExtBrokerInner messageInner) {
        PutMessageResult putMessageResult = this.store.putMessage(messageInner);
        if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
            return true;
        }
        LOGGER.error("Put message failed, topic: {}, queueId: {}, msgId: {}", new Object[]{messageInner.getTopic(), messageInner.getQueueId(), messageInner.getMsgId()});
        return false;
    }

    public MessageExtBrokerInner renewImmunityHalfMessageInner(MessageExt msgExt) {
        MessageExtBrokerInner msgInner = this.renewHalfMessageInner(msgExt);
        String queueOffsetFromPrepare = msgExt.getUserProperty("TRAN_PREPARED_QUEUE_OFFSET");
        if (null != queueOffsetFromPrepare) {
            MessageAccessor.putProperty((Message)msgInner, (String)"TRAN_PREPARED_QUEUE_OFFSET", (String)String.valueOf(queueOffsetFromPrepare));
        } else {
            MessageAccessor.putProperty((Message)msgInner, (String)"TRAN_PREPARED_QUEUE_OFFSET", (String)String.valueOf(msgExt.getQueueOffset()));
        }
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String((Map)msgInner.getProperties()));
        return msgInner;
    }

    public MessageExtBrokerInner renewHalfMessageInner(MessageExt msgExt) {
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(msgExt.getTopic());
        msgInner.setBody(msgExt.getBody());
        msgInner.setQueueId(msgExt.getQueueId());
        msgInner.setMsgId(msgExt.getMsgId());
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setTags(msgExt.getTags());
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode((String)msgInner.getTags()));
        MessageAccessor.setProperties((Message)msgInner, (Map)msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String((Map)msgExt.getProperties()));
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        msgInner.setWaitStoreMsgOK(false);
        return msgInner;
    }

    private MessageExtBrokerInner makeOpMessageInner(Message message, MessageQueue messageQueue) {
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(message.getTopic());
        msgInner.setBody(message.getBody());
        msgInner.setQueueId(messageQueue.getQueueId());
        msgInner.setTags(message.getTags());
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode((String)msgInner.getTags()));
        msgInner.setSysFlag(0);
        MessageAccessor.setProperties((Message)msgInner, (Map)message.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String((Map)message.getProperties()));
        msgInner.setBornTimestamp(System.currentTimeMillis());
        msgInner.setBornHost(this.storeHost);
        msgInner.setStoreHost(this.storeHost);
        msgInner.setWaitStoreMsgOK(false);
        MessageClientIDSetter.setUniqID((Message)msgInner);
        return msgInner;
    }

    private TopicConfig selectTopicConfig(String topic) {
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (topicConfig == null) {
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(topic, 1, 6, 0);
        }
        return topicConfig;
    }

    private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
        Message message = new Message(TransactionalMessageUtil.buildOpTopic(), "d", String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
        this.writeOp(message, messageQueue);
        return true;
    }

    private void writeOp(Message message, MessageQueue mq) {
        MessageQueue opQueue;
        if (this.opQueueMap.containsKey(mq)) {
            opQueue = this.opQueueMap.get(mq);
        } else {
            opQueue = this.getOpQueueByHalf(mq);
            MessageQueue oldQueue = this.opQueueMap.putIfAbsent(mq, opQueue);
            if (oldQueue != null) {
                opQueue = oldQueue;
            }
        }
        if (opQueue == null) {
            opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
        }
        this.putMessage(this.makeOpMessageInner(message, opQueue));
    }

    private MessageQueue getOpQueueByHalf(MessageQueue halfMQ) {
        MessageQueue opQueue = new MessageQueue();
        opQueue.setTopic(TransactionalMessageUtil.buildOpTopic());
        opQueue.setBrokerName(halfMQ.getBrokerName());
        opQueue.setQueueId(halfMQ.getQueueId());
        return opQueue;
    }

    public MessageExt lookMessageByOffset(long commitLogOffset) {
        return this.store.lookMessageByOffset(commitLogOffset);
    }

    public BrokerController getBrokerController() {
        return this.brokerController;
    }
}

