/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;

public class RocketMqAdminUtil {
    public static String createUniqInstance(String prefix) {
        return prefix.concat("-").concat(UUID.randomUUID().toString());
    }

    public static RPCHook getAclRpcHook(String accessKey, String secretKey) {
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }

    public static DefaultLitePullConsumer initDefaultLitePullConsumer(RocketMqBaseConfiguration config, boolean autoCommit) {
        ClientConfig consumer = null;
        if (Objects.isNull(consumer)) {
            consumer = StringUtils.isBlank(config.getAccessKey()) && StringUtils.isBlank(config.getSecretKey()) ? new DefaultLitePullConsumer(config.getGroupId()) : new DefaultLitePullConsumer(config.getGroupId(), RocketMqAdminUtil.getAclRpcHook(config.getAccessKey(), config.getSecretKey()));
        }
        consumer.setNamesrvAddr(config.getNamesrvAddr());
        String uniqueName = RocketMqAdminUtil.createUniqInstance(config.getNamesrvAddr());
        consumer.setInstanceName(uniqueName);
        consumer.setUnitName(uniqueName);
        ((DefaultLitePullConsumer)consumer).setAutoCommit(autoCommit);
        if (config.getBatchSize() != null) {
            ((DefaultLitePullConsumer)consumer).setPullBatchSize(config.getBatchSize());
        }
        return consumer;
    }

    public static TransactionMQProducer initTransactionMqProducer(RocketMqBaseConfiguration config, TransactionListener listener) {
        AclClientRPCHook rpcHook = null;
        if (config.isAclEnable()) {
            rpcHook = new AclClientRPCHook(new SessionCredentials(config.getAccessKey(), config.getSecretKey()));
        }
        TransactionMQProducer producer = new TransactionMQProducer(config.getGroupId(), rpcHook);
        producer.setNamesrvAddr(config.getNamesrvAddr());
        producer.setInstanceName(RocketMqAdminUtil.createUniqInstance(config.getNamesrvAddr()));
        producer.setLanguage(LanguageCode.JAVA);
        producer.setTransactionListener(listener);
        if (config.getMaxMessageSize() != null) {
            producer.setMaxMessageSize(config.getMaxMessageSize());
        }
        if (config.getSendMsgTimeout() != null) {
            producer.setSendMsgTimeout(config.getSendMsgTimeout());
        }
        return producer;
    }

    public static DefaultMQProducer initDefaultMqProducer(RocketMqBaseConfiguration config) {
        AclClientRPCHook rpcHook = null;
        if (config.isAclEnable()) {
            rpcHook = new AclClientRPCHook(new SessionCredentials(config.getAccessKey(), config.getSecretKey()));
        }
        DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
        producer.setNamesrvAddr(config.getNamesrvAddr());
        producer.setInstanceName(RocketMqAdminUtil.createUniqInstance(config.getNamesrvAddr()));
        producer.setProducerGroup(config.getGroupId());
        producer.setLanguage(LanguageCode.JAVA);
        if (config.getMaxMessageSize() != null && config.getMaxMessageSize() > 0) {
            producer.setMaxMessageSize(config.getMaxMessageSize());
        }
        if (config.getSendMsgTimeout() != null && config.getMaxMessageSize() > 0) {
            producer.setSendMsgTimeout(config.getSendMsgTimeout());
        }
        return producer;
    }

    private static DefaultMQAdminExt startMQAdminTool(RocketMqBaseConfiguration config) throws MQClientException {
        DefaultMQAdminExt admin = config.isAclEnable() ? new DefaultMQAdminExt(new AclClientRPCHook(new SessionCredentials(config.getAccessKey(), config.getSecretKey()))) : new DefaultMQAdminExt();
        admin.setNamesrvAddr(config.getNamesrvAddr());
        admin.setAdminExtGroup(config.getGroupId());
        admin.setInstanceName(RocketMqAdminUtil.createUniqInstance(config.getNamesrvAddr()));
        admin.start();
        return admin;
    }

    public static void createTopic(RocketMqBaseConfiguration config, TopicConfig topicConfig) {
        DefaultMQAdminExt defaultMQAdminExt = null;
        try {
            defaultMQAdminExt = RocketMqAdminUtil.startMQAdminTool(config);
            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
            HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
            Set<String> clusterNameSet = clusterAddrTable.keySet();
            for (String clusterName : clusterNameSet) {
                Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
                for (String addr : masterSet) {
                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
                }
            }
        }
        catch (Exception e) {
            throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.CREATE_TOPIC_ERROR, e);
        }
        finally {
            if (defaultMQAdminExt != null) {
                defaultMQAdminExt.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static boolean topicExist(RocketMqBaseConfiguration config, String topic) {
        boolean foundTopicRouteInfo;
        block9: {
            DefaultMQAdminExt defaultMQAdminExt = null;
            foundTopicRouteInfo = false;
            try {
                defaultMQAdminExt = RocketMqAdminUtil.startMQAdminTool(config);
                TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
                if (topicRouteData != null) {
                    foundTopicRouteInfo = true;
                }
            }
            catch (Exception e) {
                if (e instanceof MQClientException) {
                    if (((MQClientException)e).getResponseCode() == 17) {
                        foundTopicRouteInfo = false;
                        break block9;
                    }
                    throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.TOPIC_NOT_EXIST_ERROR, e);
                }
                throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.TOPIC_NOT_EXIST_ERROR, e);
            }
            finally {
                if (defaultMQAdminExt != null) {
                    defaultMQAdminExt.shutdown();
                }
            }
        }
        return foundTopicRouteInfo;
    }

    public static List<Map<MessageQueue, TopicOffset>> offsetTopics(RocketMqBaseConfiguration config, List<String> topics) {
        ArrayList offsets = Lists.newArrayList();
        DefaultMQAdminExt adminClient = null;
        try {
            adminClient = RocketMqAdminUtil.startMQAdminTool(config);
            for (String topic : topics) {
                TopicStatsTable topicStatsTable = adminClient.examineTopicStats(topic);
                offsets.add(topicStatsTable.getOffsetTable());
            }
            ArrayList arrayList = offsets;
            return arrayList;
        }
        catch (InterruptedException | MQBrokerException | MQClientException | RemotingException e) {
            throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.GET_MIN_AND_MAX_OFFSETS_ERROR, e);
        }
        finally {
            if (adminClient != null) {
                adminClient.shutdown();
            }
        }
    }

    public static Map<MessageQueue, TopicOffset> flatOffsetTopics(RocketMqBaseConfiguration config, List<String> topics) {
        ConcurrentMap messageQueueTopicOffsets = Maps.newConcurrentMap();
        RocketMqAdminUtil.offsetTopics(config, topics).forEach(offsetTopic -> messageQueueTopicOffsets.putAll(offsetTopic));
        return messageQueueTopicOffsets;
    }

    public static Map<MessageQueue, Long> searchOffsetsByTimestamp(RocketMqBaseConfiguration config, Collection<MessageQueue> messageQueues, Long timestamp) {
        ConcurrentMap offsets = Maps.newConcurrentMap();
        DefaultMQAdminExt adminClient = null;
        try {
            adminClient = RocketMqAdminUtil.startMQAdminTool(config);
            for (MessageQueue messageQueue : messageQueues) {
                long offset = adminClient.searchOffset(messageQueue, timestamp);
                offsets.put(messageQueue, offset);
            }
            ConcurrentMap concurrentMap = offsets;
            return concurrentMap;
        }
        catch (MQClientException e) {
            throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.GET_CONSUMER_GROUP_OFFSETS_TIMESTAMP_ERROR, e);
        }
        finally {
            if (adminClient != null) {
                adminClient.shutdown();
            }
        }
    }

    public static Map<MessageQueue, Long> currentOffsets(RocketMqBaseConfiguration config, List<String> topics, Set<MessageQueue> messageQueues) {
        DefaultMQAdminExt adminClient = null;
        try {
            adminClient = RocketMqAdminUtil.startMQAdminTool(config);
            ConcurrentMap consumerOffsets = Maps.newConcurrentMap();
            for (String topic : topics) {
                ConsumeStats consumeStats = adminClient.examineConsumeStats(config.getGroupId(), topic);
                consumerOffsets.putAll(consumeStats.getOffsetTable());
            }
            Map<MessageQueue, Long> map = consumerOffsets.keySet().stream().filter(messageQueue -> messageQueues.contains(messageQueue)).collect(Collectors.toMap(messageQueue -> messageQueue, messageQueue -> ((OffsetWrapper)consumerOffsets.get(messageQueue)).getConsumerOffset()));
            return map;
        }
        catch (InterruptedException | MQBrokerException | MQClientException | RemotingException e) {
            if (e instanceof MQClientException) {
                if (((MQClientException)e).getResponseCode() == 17) {
                    Map<MessageQueue, Long> map = Collections.emptyMap();
                    return map;
                }
                throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.GET_CONSUMER_GROUP_OFFSETS_ERROR, e);
            }
            throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.GET_CONSUMER_GROUP_OFFSETS_ERROR, e);
        }
        finally {
            if (adminClient != null) {
                adminClient.shutdown();
            }
        }
    }
}

