/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.queue.tubemq;

import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.queue.tubemq.ConsumerGroupResponse;
import org.apache.inlong.manager.pojo.queue.tubemq.TopicResponse;
import org.apache.inlong.manager.pojo.queue.tubemq.TubeBrokerInfo;
import org.apache.inlong.manager.pojo.queue.tubemq.TubeHttpResponse;
import org.apache.inlong.manager.pojo.queue.tubemq.TubeMessageResponse;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.message.DeserializeOperator;
import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class TubeMQOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
    private static final Integer SUCCESS_CODE = 0;
    private static final String TOPIC_NAME = "&topicName=";
    private static final String GROUP_NAME = "&groupName=";
    private static final String BROKER_ID = "&brokerId=";
    private static final String CREATE_USER = "&createUser=";
    private static final String CONF_MOD_AUTH_TOKEN = "&confModAuthToken=";
    private static final String MSG_COUNT = "&msgCount=";
    private static final String FILTER_CONDS = "&filterConds=";
    private static final String QUERY_TOPIC_PATH = "/webapi.htm?method=admin_query_cluster_topic_view";
    private static final String QUERY_BROKER_PATH = "/webapi.htm?method=admin_query_broker_run_status";
    private static final String ADD_TOPIC_PATH = "/webapi.htm?method=admin_add_new_topic_record";
    private static final String QUERY_CONSUMER_PATH = "/webapi.htm?method=admin_query_allowed_consumer_group_info";
    private static final String ADD_CONSUMER_PATH = "/webapi.htm?method=admin_add_authorized_consumergroup_info";
    private static final String QUERY_MESSAGE_PATH = "/broker.htm?method=admin_snapshot_message";
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    public DeserializeOperatorFactory deserializeOperatorFactory;

    public void createTopic(@Nonnull TubeClusterInfo tubeCluster, String topicName, String operator) {
        String masterUrl = tubeCluster.getMasterWebUrl();
        LOGGER.info("begin to create TubeMQ topic {} in master {}", (Object)topicName, (Object)masterUrl);
        if (StringUtils.isEmpty((CharSequence)masterUrl) || StringUtils.isEmpty((CharSequence)topicName)) {
            throw new BusinessException("TubeMQ master url or TubeMQ topic cannot be null");
        }
        if (this.isTopicExist(masterUrl, topicName)) {
            LOGGER.warn("TubeMQ topic {} already exists in {}, skip to create", (Object)topicName, (Object)masterUrl);
            return;
        }
        this.createTopicOpt(masterUrl, topicName, tubeCluster.getToken(), operator);
        LOGGER.info("success to create TubeMQ topic {} in {}", (Object)topicName, (Object)masterUrl);
    }

    public void createConsumerGroup(TubeClusterInfo tubeCluster, String topic, String consumerGroup, String operator) {
        String masterUrl = tubeCluster.getMasterWebUrl();
        LOGGER.info("begin to create consumer group {} for topic {} in master {}", new Object[]{consumerGroup, topic, masterUrl});
        if (StringUtils.isEmpty((CharSequence)masterUrl) || StringUtils.isEmpty((CharSequence)consumerGroup) || StringUtils.isEmpty((CharSequence)topic)) {
            throw new BusinessException("TubeMQ master url, consumer group, or TubeMQ topic cannot be null");
        }
        if (!this.isTopicExist(masterUrl, topic)) {
            LOGGER.warn("cannot create TubeMQ consumer group {}, as the topic {} not exists in master {}", new Object[]{consumerGroup, topic, masterUrl});
            return;
        }
        if (this.isConsumerGroupExist(masterUrl, topic, consumerGroup)) {
            LOGGER.warn("TubeMQ consumer group {} already exists for topic {} in master {}, skip to create", new Object[]{consumerGroup, topic, masterUrl});
            return;
        }
        this.createConsumerGroupOpt(masterUrl, topic, consumerGroup, tubeCluster.getToken(), operator);
        LOGGER.info("success to create TubeMQ consumer group {} for topic {} in {}", new Object[]{consumerGroup, topic, masterUrl});
    }

    public boolean isTopicExist(String masterUrl, String topicName) {
        LOGGER.info("begin to check if the TubeMQ topic {} exists", (Object)topicName);
        String url = masterUrl + QUERY_TOPIC_PATH + TOPIC_NAME + topicName;
        try {
            TopicResponse topicView = (TopicResponse)HttpUtils.request((RestTemplate)this.restTemplate, (String)url, (HttpMethod)HttpMethod.GET, null, (HttpHeaders)new HttpHeaders(), TopicResponse.class);
            if (CollectionUtils.isEmpty((Collection)topicView.getData())) {
                LOGGER.warn("TubeMQ topic {} not exists in {}", (Object)topicName, (Object)url);
                return false;
            }
            LOGGER.info("TubeMQ topic {} exists in {}", (Object)topicName, (Object)url);
            return true;
        }
        catch (Exception e) {
            String msg = String.format("failed to check if the topic %s exist in ", topicName);
            LOGGER.error(msg + url, (Throwable)e);
            throw new BusinessException(msg + masterUrl + ", error: " + e.getMessage());
        }
    }

    public boolean isConsumerGroupExist(String masterUrl, String topicName, String consumerGroup) {
        LOGGER.info("begin to check if the consumer group {} exists on topic {}", (Object)consumerGroup, (Object)topicName);
        String url = masterUrl + QUERY_CONSUMER_PATH + TOPIC_NAME + topicName + GROUP_NAME + consumerGroup;
        try {
            ConsumerGroupResponse response = (ConsumerGroupResponse)HttpUtils.request((RestTemplate)this.restTemplate, (String)url, (HttpMethod)HttpMethod.GET, null, (HttpHeaders)new HttpHeaders(), ConsumerGroupResponse.class);
            if (CollectionUtils.isEmpty((Collection)response.getData())) {
                LOGGER.warn("TubeMQ consumer group {} not exists for topic {} in {}, response={}", new Object[]{consumerGroup, topicName, url, response});
                return false;
            }
            LOGGER.info("TubeMQ consumer group {} exists for topic {} in {}, response={}", new Object[]{consumerGroup, topicName, url, response});
            return true;
        }
        catch (Exception e) {
            String msg = String.format("failed to check if the consumer group %s for topic %s exist in ", consumerGroup, topicName);
            LOGGER.error(msg + url, (Throwable)e);
            throw new BusinessException(msg + masterUrl + ", error: " + e.getMessage());
        }
    }

    private TubeBrokerInfo getBrokerInfo(String masterUrl) {
        String url = masterUrl + QUERY_BROKER_PATH;
        try {
            TubeBrokerInfo brokerInfo = (TubeBrokerInfo)HttpUtils.request((RestTemplate)this.restTemplate, (String)url, (HttpMethod)HttpMethod.GET, null, (HttpHeaders)new HttpHeaders(), TubeBrokerInfo.class);
            if (brokerInfo.getErrCode() != SUCCESS_CODE.intValue()) {
                String msg = "failed to query TubeMQ broker from %s, error: %s";
                LOGGER.error(String.format(msg, url, brokerInfo.getErrMsg()));
                throw new BusinessException(String.format(msg, masterUrl, brokerInfo.getErrMsg()));
            }
            brokerInfo.divideBrokerListByStatus();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("success to query TubeMQ broker from {}, result {}", (Object)url, (Object)brokerInfo.getData());
            }
            return brokerInfo;
        }
        catch (Exception e) {
            String msg = "failed to query TubeMQ broker from %s";
            LOGGER.error(String.format(msg, url), (Throwable)e);
            throw new BusinessException(String.format(msg, masterUrl) + ", error: " + e.getMessage());
        }
    }

    private void createTopicOpt(String masterUrl, String topicName, String token, String operator) {
        LOGGER.info(String.format("begin to create TubeMQ topic %s in master %s", topicName, masterUrl));
        TubeBrokerInfo brokerView = this.getBrokerInfo(masterUrl);
        List allBrokers = brokerView.getAllBrokerIdList();
        if (CollectionUtils.isEmpty((Collection)allBrokers)) {
            String msg = String.format("cannot create topic %s, as not any brokers found in %s", topicName, masterUrl);
            LOGGER.error(msg);
            throw new BusinessException(msg);
        }
        String url = masterUrl + ADD_TOPIC_PATH + TOPIC_NAME + topicName + BROKER_ID + StringUtils.join((Iterable)allBrokers, (String)",") + CREATE_USER + operator + CONF_MOD_AUTH_TOKEN + token;
        try {
            TubeHttpResponse response = (TubeHttpResponse)HttpUtils.request((RestTemplate)this.restTemplate, (String)url, (HttpMethod)HttpMethod.GET, null, (HttpHeaders)new HttpHeaders(), TubeHttpResponse.class);
            if (response.getErrCode() != SUCCESS_CODE.intValue()) {
                String msg = String.format("failed to create TubeMQ topic %s, error: %s", topicName, response.getErrMsg());
                LOGGER.error(msg + " in {} for brokers {}", (Object)masterUrl, (Object)allBrokers);
                throw new BusinessException(msg);
            }
            LOGGER.info("success to create TubeMQ topic {} in {}", (Object)topicName, (Object)url);
        }
        catch (Exception e) {
            String msg = String.format("failed to create TubeMQ topic %s in %s", topicName, masterUrl);
            LOGGER.error(msg, (Throwable)e);
            throw new BusinessException(msg + ", error: " + e.getMessage());
        }
    }

    private void createConsumerGroupOpt(String masterUrl, String topicName, String consumerGroup, String token, String operator) {
        LOGGER.info(String.format("begin to create consumer group %s for topic %s in master %s", consumerGroup, topicName, masterUrl));
        String url = masterUrl + ADD_CONSUMER_PATH + TOPIC_NAME + topicName + GROUP_NAME + consumerGroup + CREATE_USER + operator + CONF_MOD_AUTH_TOKEN + token;
        try {
            TubeHttpResponse response = (TubeHttpResponse)HttpUtils.request((RestTemplate)this.restTemplate, (String)url, (HttpMethod)HttpMethod.GET, null, (HttpHeaders)new HttpHeaders(), TubeHttpResponse.class);
            if (response.getErrCode() != SUCCESS_CODE.intValue()) {
                String msg = String.format("failed to create TubeMQ consumer group %s for topic %s, error: %s", consumerGroup, topicName, response.getErrMsg());
                LOGGER.error(msg + ", url {}", (Object)url);
                throw new BusinessException(msg);
            }
            LOGGER.info("success to create TubeMQ topic {} in {}", (Object)topicName, (Object)url);
        }
        catch (Exception e) {
            String msg = String.format("failed to create TubeMQ topic %s in %s", topicName, masterUrl);
            LOGGER.error(msg, (Throwable)e);
            throw new BusinessException(msg + ", error: " + e.getMessage());
        }
    }

    public List<BriefMQMessage> queryLastMessage(TubeClusterInfo tubeCluster, String topicName, InlongStreamInfo streamInfo, QueryMessageRequest request) {
        LOGGER.info("begin to query message for topic {} in cluster: {}", (Object)topicName, (Object)tubeCluster);
        String masterUrl = tubeCluster.getMasterWebUrl();
        TubeBrokerInfo brokerView = this.getBrokerInfo(masterUrl);
        String brokerUrl = brokerView.getOnlineBrokerAddress();
        ArrayList<BriefMQMessage> messageList = new ArrayList<BriefMQMessage>();
        try {
            if (StringUtils.isEmpty((CharSequence)brokerUrl) || StringUtils.isEmpty((CharSequence)topicName)) {
                throw new BusinessException("TubeMQ master url or TubeMQ topic cannot be null");
            }
            if (!this.isTopicExist(masterUrl, topicName)) {
                LOGGER.error("TubeMQ topic {} not exists in {}, skip to query", (Object)topicName, (Object)masterUrl);
                throw new BusinessException("TubeMQ master url or TubeMQ topic cannot be null");
            }
            String url = "http://" + brokerUrl + QUERY_MESSAGE_PATH + TOPIC_NAME + topicName + MSG_COUNT + request.getMessageCount() + FILTER_CONDS + streamInfo.getInlongStreamId();
            TubeMessageResponse response = (TubeMessageResponse)HttpUtils.request((RestTemplate)this.restTemplate, (String)url, (HttpMethod)HttpMethod.GET, null, (HttpHeaders)new HttpHeaders(), TubeMessageResponse.class);
            if (response.getErrCode() != SUCCESS_CODE.intValue() && response.getErrCode() != 200) {
                String msg = String.format("failed to query message for topic %s, error: %s", topicName, response.getErrMsg());
                LOGGER.error(msg + " in {} for broker {}", (Object)masterUrl, (Object)brokerUrl);
                throw new BusinessException(msg);
            }
            int index = 0;
            for (TubeMessageResponse.TubeDataInfo tubeDataInfo : response.getDataSet()) {
                HashMap<String, String> map = new HashMap<String, String>();
                for (String kv : tubeDataInfo.getAttr().split(",")) {
                    map.put(kv.split("=")[0], kv.split("=")[1]);
                }
                MessageWrapType messageWrapType = MessageWrapType.forType((String)streamInfo.getWrapType());
                if (map.get("msgEnType") != null) {
                    messageWrapType = MessageWrapType.valueOf((int)Integer.parseInt((String)map.get("msgEnType")));
                }
                byte[] messageData = Base64.getDecoder().decode(tubeDataInfo.getData());
                DeserializeOperator deserializeOperator = this.deserializeOperatorFactory.getInstance(messageWrapType);
                deserializeOperator.decodeMsg(streamInfo, messageList, messageData, map, index, request);
            }
            LOGGER.info("success query messages for topic={}", (Object)topicName);
        }
        catch (Exception e) {
            String errMsg = "failed to query messages: ";
            LOGGER.error(errMsg, (Throwable)e);
            throw new BusinessException(errMsg + e.getMessage());
        }
        return messageList;
    }
}

