/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.confignode.manager.node;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.UpdateAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.response.ainode.AINodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.ainode.AINodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ClusterManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.TTLManager;
import org.apache.iotdb.confignode.manager.TriggerManager;
import org.apache.iotdb.confignode.manager.UDFManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.env.RemoveDataNodeHandler;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRestartResp;
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NodeManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
    private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
    public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
    private final IManager configManager;
    protected final NodeInfo nodeInfo;
    private final ReentrantLock removeConfigNodeLock;
    private static final String CONSENSUS_WRITE_ERROR = "Failed in the write API executing the consensus layer due to: ";

    public NodeManager(IManager configManager, NodeInfo nodeInfo) {
        this.configManager = configManager;
        this.nodeInfo = nodeInfo;
        this.removeConfigNodeLock = new ReentrantLock();
    }

    public DataSet getSystemConfiguration() {
        ConfigurationResp dataSet = new ConfigurationResp();
        dataSet.setStatus(RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS));
        this.setGlobalConfig(dataSet);
        this.setRatisConfig(dataSet);
        this.setCQConfig(dataSet);
        return dataSet;
    }

    private void setGlobalConfig(ConfigurationResp dataSet) {
        ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
        CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
        TGlobalConfig globalConfig = new TGlobalConfig();
        globalConfig.setDataRegionConsensusProtocolClass(configNodeConfig.getDataRegionConsensusProtocolClass());
        globalConfig.setSchemaRegionConsensusProtocolClass(configNodeConfig.getSchemaRegionConsensusProtocolClass());
        globalConfig.setSeriesPartitionSlotNum(configNodeConfig.getSeriesSlotNum());
        globalConfig.setSeriesPartitionExecutorClass(configNodeConfig.getSeriesPartitionExecutorClass());
        globalConfig.setTimePartitionInterval(commonConfig.getTimePartitionInterval());
        globalConfig.setTimePartitionOrigin(commonConfig.getTimePartitionOrigin());
        globalConfig.setReadConsistencyLevel(configNodeConfig.getReadConsistencyLevel());
        globalConfig.setDiskSpaceWarningThreshold(commonConfig.getDiskSpaceWarningThreshold());
        globalConfig.setTimestampPrecision(commonConfig.getTimestampPrecision());
        globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode());
        globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize());
        dataSet.setGlobalConfig(globalConfig);
    }

    private void setRatisConfig(ConfigurationResp dataSet) {
        ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
        TRatisConfig ratisConfig = new TRatisConfig();
        ratisConfig.setDataAppenderBufferSize(conf.getDataRegionRatisConsensusLogAppenderBufferSize());
        ratisConfig.setSchemaAppenderBufferSize(conf.getSchemaRegionRatisConsensusLogAppenderBufferSize());
        ratisConfig.setDataSnapshotTriggerThreshold(conf.getDataRegionRatisSnapshotTriggerThreshold());
        ratisConfig.setSchemaSnapshotTriggerThreshold(conf.getSchemaRegionRatisSnapshotTriggerThreshold());
        ratisConfig.setDataLogUnsafeFlushEnable(conf.isDataRegionRatisLogUnsafeFlushEnable());
        ratisConfig.setSchemaLogUnsafeFlushEnable(conf.isSchemaRegionRatisLogUnsafeFlushEnable());
        ratisConfig.setDataLogSegmentSizeMax(conf.getDataRegionRatisLogSegmentSizeMax());
        ratisConfig.setSchemaLogSegmentSizeMax(conf.getSchemaRegionRatisLogSegmentSizeMax());
        ratisConfig.setDataGrpcFlowControlWindow(conf.getDataRegionRatisGrpcFlowControlWindow());
        ratisConfig.setSchemaGrpcFlowControlWindow(conf.getSchemaRegionRatisGrpcFlowControlWindow());
        ratisConfig.setDataRegionGrpcLeaderOutstandingAppendsMax(conf.getDataRegionRatisGrpcLeaderOutstandingAppendsMax());
        ratisConfig.setSchemaRegionGrpcLeaderOutstandingAppendsMax(conf.getSchemaRegionRatisGrpcLeaderOutstandingAppendsMax());
        ratisConfig.setDataRegionLogForceSyncNum(conf.getDataRegionRatisLogForceSyncNum());
        ratisConfig.setSchemaRegionLogForceSyncNum(conf.getSchemaRegionRatisLogForceSyncNum());
        ratisConfig.setDataLeaderElectionTimeoutMin(conf.getDataRegionRatisRpcLeaderElectionTimeoutMinMs());
        ratisConfig.setSchemaLeaderElectionTimeoutMin(conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMinMs());
        ratisConfig.setDataLeaderElectionTimeoutMax(conf.getDataRegionRatisRpcLeaderElectionTimeoutMaxMs());
        ratisConfig.setSchemaLeaderElectionTimeoutMax(conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMaxMs());
        ratisConfig.setDataRequestTimeout(conf.getDataRegionRatisRequestTimeoutMs());
        ratisConfig.setSchemaRequestTimeout(conf.getSchemaRegionRatisRequestTimeoutMs());
        ratisConfig.setDataMaxRetryAttempts(conf.getDataRegionRatisMaxRetryAttempts());
        ratisConfig.setDataInitialSleepTime(conf.getDataRegionRatisInitialSleepTimeMs());
        ratisConfig.setDataMaxSleepTime(conf.getDataRegionRatisMaxSleepTimeMs());
        ratisConfig.setSchemaMaxRetryAttempts(conf.getSchemaRegionRatisMaxRetryAttempts());
        ratisConfig.setSchemaInitialSleepTime(conf.getSchemaRegionRatisInitialSleepTimeMs());
        ratisConfig.setSchemaMaxSleepTime(conf.getSchemaRegionRatisMaxSleepTimeMs());
        ratisConfig.setSchemaPreserveWhenPurge(conf.getSchemaRegionRatisPreserveLogsWhenPurge());
        ratisConfig.setDataPreserveWhenPurge(conf.getDataRegionRatisPreserveLogsWhenPurge());
        ratisConfig.setFirstElectionTimeoutMin(conf.getRatisFirstElectionTimeoutMinMs());
        ratisConfig.setFirstElectionTimeoutMax(conf.getRatisFirstElectionTimeoutMaxMs());
        ratisConfig.setSchemaRegionRatisLogMax(conf.getSchemaRegionRatisLogMax());
        ratisConfig.setDataRegionRatisLogMax(conf.getDataRegionRatisLogMax());
        ratisConfig.setSchemaRegionPeriodicSnapshotInterval(conf.getSchemaRegionRatisPeriodicSnapshotInterval());
        ratisConfig.setDataRegionPeriodicSnapshotInterval(conf.getDataRegionRatisPeriodicSnapshotInterval());
        dataSet.setRatisConfig(ratisConfig);
    }

    private void setCQConfig(ConfigurationResp dataSet) {
        ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
        TCQConfig cqConfig = new TCQConfig();
        cqConfig.setCqMinEveryIntervalInMs(conf.getCqMinEveryIntervalInMs());
        dataSet.setCqConfig(cqConfig);
    }

    /*
     * Exception decompiling
     */
    private TRuntimeConfiguration getRuntimeConfiguration() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public DataSet registerDataNode(TDataNodeRegisterReq req) {
        DataNodeRegisterResp resp = new DataNodeRegisterResp();
        resp.setConfigNodeList(this.getRegisteredConfigNodes());
        int dataNodeId = this.nodeInfo.generateNextNodeId();
        this.getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode, dataNodeId);
        RegisterDataNodePlan registerDataNodePlan = new RegisterDataNodePlan(req.getDataNodeConfiguration());
        registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
        try {
            this.getConsensusManager().write(registerDataNodePlan);
        }
        catch (ConsensusException e) {
            LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
        }
        UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId);
        try {
            this.getConsensusManager().write(updateVersionInfoPlan);
        }
        catch (ConsensusException e) {
            LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
        }
        PartitionMetrics.bindDataNodePartitionMetricsWhenUpdate((AbstractMetricService)MetricService.getInstance(), this.configManager, dataNodeId);
        this.getClusterSchemaManager().adjustMaxRegionGroupNum();
        resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
        resp.setDataNodeId(registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
        resp.setRuntimeConfiguration(this.getRuntimeConfiguration());
        return resp;
    }

    public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {
        String clusterId = this.configManager.getClusterManager().getClusterIdWithRetry(CommonDescriptor.getInstance().getConfig().getCnConnectionTimeoutInMS() / 2);
        TDataNodeRestartResp resp = new TDataNodeRestartResp();
        resp.setConfigNodeList(this.getRegisteredConfigNodes());
        if (clusterId == null) {
            resp.setStatus(new TSStatus(TSStatusCode.GET_CLUSTER_ID_ERROR.getStatusCode()).setMessage("clusterId has not generated"));
            return resp;
        }
        int nodeId = req.getDataNodeConfiguration().getLocation().getDataNodeId();
        TDataNodeConfiguration dataNodeConfiguration = this.getRegisteredDataNode(nodeId);
        if (!req.getDataNodeConfiguration().equals(dataNodeConfiguration)) {
            UpdateDataNodePlan updateDataNodePlan = new UpdateDataNodePlan(req.getDataNodeConfiguration());
            try {
                this.getConsensusManager().write(updateDataNodePlan);
            }
            catch (ConsensusException e) {
                LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
            }
        }
        TNodeVersionInfo versionInfo = this.nodeInfo.getVersionInfo(nodeId);
        if (!req.getVersionInfo().equals(versionInfo)) {
            UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
            try {
                this.getConsensusManager().write(updateVersionInfoPlan);
            }
            catch (ConsensusException e) {
                LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
            }
        }
        resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
        resp.setRuntimeConfiguration(this.getRuntimeConfiguration());
        resp.setCorrectConsensusGroups(this.getPartitionManager().getAllReplicaSets(nodeId));
        return resp;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
        this.configManager.getProcedureManager().getEnv().getSubmitRegionMigrateLock().lock();
        LOGGER.info("NodeManager start to remove DataNode {}", (Object)removeDataNodePlan);
        try {
            TSStatus status;
            RemoveDataNodeHandler removeDataNodeHandler = this.configManager.getProcedureManager().getEnv().getRemoveDataNodeHandler();
            DataNodeToStatusResp preCheckStatus = removeDataNodeHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
            if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                LOGGER.error("The remove DataNode request check failed. req: {}, check result: {}", (Object)removeDataNodePlan, (Object)preCheckStatus.getStatus());
                DataNodeToStatusResp dataNodeToStatusResp = preCheckStatus;
                return dataNodeToStatusResp;
            }
            DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
            if (this.configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                dataSet.setStatus(new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode()).setMessage("Migrate the service on the removed DataNodes failed"));
                DataNodeToStatusResp dataNodeToStatusResp = dataSet;
                return dataNodeToStatusResp;
            }
            boolean removeSucceed = this.configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
            if (removeSucceed) {
                status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
                status.setMessage("Server accepted the request");
            } else {
                status = new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
                status.setMessage("Server rejected the request, maybe requests are too many");
            }
            dataSet.setStatus(status);
            LOGGER.info("NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}", (Object)removeDataNodePlan);
            DataNodeToStatusResp dataNodeToStatusResp = dataSet;
            return dataNodeToStatusResp;
        }
        finally {
            this.configManager.getProcedureManager().getEnv().getSubmitRegionMigrateLock().unlock();
        }
    }

    public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
        int nodeId = this.nodeInfo.generateNextNodeId();
        req.getConfigNodeLocation().setConfigNodeId(nodeId);
        this.configManager.getProcedureManager().addConfigNode(req);
        return new TConfigNodeRegisterResp().setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION).setConfigNodeId(nodeId);
    }

    public TSStatus updateConfigNodeIfNecessary(int configNodeId, TNodeVersionInfo versionInfo) {
        TNodeVersionInfo recordVersionInfo = this.nodeInfo.getVersionInfo(configNodeId);
        if (!recordVersionInfo.equals(versionInfo)) {
            UpdateVersionInfoPlan updateConfigNodePlan = new UpdateVersionInfoPlan(versionInfo, configNodeId);
            try {
                return this.getConsensusManager().write(updateConfigNodePlan);
            }
            catch (ConsensusException e) {
                return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
            }
        }
        return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
    }

    public List<TAINodeInfo> getRegisteredAINodeInfoList() {
        ArrayList<TAINodeInfo> aiNodeInfoList = new ArrayList<TAINodeInfo>();
        for (TAINodeConfiguration aiNodeConfiguration : this.getRegisteredAINodes()) {
            TAINodeInfo aiNodeInfo = new TAINodeInfo();
            aiNodeInfo.setAiNodeId(aiNodeConfiguration.getLocation().getAiNodeId());
            aiNodeInfo.setStatus(this.getLoadManager().getNodeStatusWithReason(aiNodeInfo.getAiNodeId()));
            aiNodeInfo.setInternalAddress(aiNodeConfiguration.getLocation().getInternalEndPoint().ip);
            aiNodeInfo.setInternalPort(aiNodeConfiguration.getLocation().getInternalEndPoint().port);
            aiNodeInfoList.add(aiNodeInfo);
        }
        return aiNodeInfoList;
    }

    public List<TAINodeConfiguration> getRegisteredAINodes() {
        return this.nodeInfo.getRegisteredAINodes();
    }

    public TAINodeConfiguration getRegisteredAINode(int aiNodeId) {
        return this.nodeInfo.getRegisteredAINode(aiNodeId);
    }

    public synchronized DataSet registerAINode(TAINodeRegisterReq req) {
        if (!this.nodeInfo.getRegisteredAINodes().isEmpty()) {
            AINodeRegisterResp dataSet = new AINodeRegisterResp();
            dataSet.setConfigNodeList(Collections.emptyList());
            dataSet.setStatus(new TSStatus(TSStatusCode.REGISTER_AI_NODE_ERROR.getStatusCode()).setMessage("There is already one AINode in the cluster."));
            return dataSet;
        }
        int aiNodeId = this.nodeInfo.generateNextNodeId();
        this.getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode, aiNodeId);
        RegisterAINodePlan registerAINodePlan = new RegisterAINodePlan(req.getAiNodeConfiguration());
        registerAINodePlan.getAINodeConfiguration().getLocation().setAiNodeId(aiNodeId);
        try {
            this.getConsensusManager().write(registerAINodePlan);
        }
        catch (ConsensusException e) {
            LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
        }
        UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), aiNodeId);
        try {
            this.getConsensusManager().write(updateVersionInfoPlan);
        }
        catch (ConsensusException e) {
            LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
        }
        AINodeRegisterResp resp = new AINodeRegisterResp();
        resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
        resp.setConfigNodeList(this.getRegisteredConfigNodes());
        resp.setAINodeId(registerAINodePlan.getAINodeConfiguration().getLocation().getAiNodeId());
        return resp;
    }

    public TSStatus removeAINode(RemoveAINodePlan removeAINodePlan) {
        TSStatus status;
        LOGGER.info("NodeManager start to remove AINode {}", (Object)removeAINodePlan);
        if (!this.nodeInfo.containsAINode(removeAINodePlan.getAINodeLocation().getAiNodeId())) {
            return new TSStatus(TSStatusCode.REMOVE_AI_NODE_ERROR.getStatusCode()).setMessage("AINode doesn't exist.");
        }
        boolean removeSucceed = this.configManager.getProcedureManager().removeAINode(removeAINodePlan);
        if (removeSucceed) {
            status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            status.setMessage("Server accepted the request");
        } else {
            status = new TSStatus(TSStatusCode.REMOVE_AI_NODE_ERROR.getStatusCode());
            status.setMessage("Server rejected the request, maybe requests are too many");
        }
        LOGGER.info("NodeManager submit RemoveAINodePlan finished, removeAINodePlan: {}", (Object)removeAINodePlan);
        return status;
    }

    public TAINodeRestartResp updateAINodeIfNecessary(TAINodeRestartReq req) {
        int nodeId = req.getAiNodeConfiguration().getLocation().getAiNodeId();
        TAINodeConfiguration aiNodeConfiguration = this.getRegisteredAINode(nodeId);
        if (!req.getAiNodeConfiguration().equals(aiNodeConfiguration)) {
            UpdateAINodePlan updateAINodePlan = new UpdateAINodePlan(req.getAiNodeConfiguration());
            try {
                this.getConsensusManager().write(updateAINodePlan);
            }
            catch (ConsensusException e) {
                LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
            }
        }
        TNodeVersionInfo versionInfo = this.nodeInfo.getVersionInfo(nodeId);
        if (!req.getVersionInfo().equals(versionInfo)) {
            UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
            try {
                this.getConsensusManager().write(updateVersionInfoPlan);
            }
            catch (ConsensusException e) {
                LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
            }
        }
        TAINodeRestartResp resp = new TAINodeRestartResp();
        resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
        resp.setConfigNodeList(this.getRegisteredConfigNodes());
        return resp;
    }

    public AINodeConfigurationResp getAINodeConfiguration(GetAINodeConfigurationPlan req) {
        try {
            return (AINodeConfigurationResp)this.getConsensusManager().read(req);
        }
        catch (ConsensusException e) {
            LOGGER.warn("Failed in the read API executing the consensus layer due to: ", (Throwable)e);
            TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
            res.setMessage(e.getMessage());
            AINodeConfigurationResp response = new AINodeConfigurationResp();
            response.setStatus(res);
            return response;
        }
    }

    public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
        try {
            return (DataNodeConfigurationResp)this.getConsensusManager().read(req);
        }
        catch (ConsensusException e) {
            LOGGER.warn("Failed in the read API executing the consensus layer due to: ", (Throwable)e);
            TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
            res.setMessage(e.getMessage());
            DataNodeConfigurationResp response = new DataNodeConfigurationResp();
            response.setStatus(res);
            return response;
        }
    }

    public int getRegisteredNodeCount() {
        return this.nodeInfo.getRegisteredNodeCount();
    }

    public int getRegisteredDataNodeCount() {
        return this.nodeInfo.getRegisteredDataNodeCount();
    }

    public List<TDataNodeConfiguration> getRegisteredDataNodes() {
        return this.nodeInfo.getRegisteredDataNodes();
    }

    public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
        return this.nodeInfo.getRegisteredDataNode(dataNodeId);
    }

    public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
        ConcurrentHashMap<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<Integer, TDataNodeLocation>();
        this.nodeInfo.getRegisteredDataNodes().forEach(dataNodeConfiguration -> dataNodeLocations.put(dataNodeConfiguration.getLocation().getDataNodeId(), dataNodeConfiguration.getLocation()));
        return dataNodeLocations;
    }

    public Map<Integer, TConfigNodeLocation> getRegisteredConfigNodeLocations() {
        return this.nodeInfo.getRegisteredConfigNodes().stream().collect(Collectors.toMap(TConfigNodeLocation::getConfigNodeId, location -> location));
    }

    public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
        ArrayList<TDataNodeInfo> dataNodeInfoList = new ArrayList<TDataNodeInfo>();
        List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
        if (registeredDataNodes != null) {
            registeredDataNodes.forEach(registeredDataNode -> {
                TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
                int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
                dataNodeInfo.setDataNodeId(dataNodeId);
                dataNodeInfo.setStatus(this.getLoadManager().getNodeStatusWithReason(dataNodeId));
                dataNodeInfo.setRpcAddresss(registeredDataNode.getLocation().getClientRpcEndPoint().getIp());
                dataNodeInfo.setRpcPort(registeredDataNode.getLocation().getClientRpcEndPoint().getPort());
                dataNodeInfo.setDataRegionNum(0);
                dataNodeInfo.setSchemaRegionNum(0);
                dataNodeInfo.setCpuCoreNum(registeredDataNode.getResource().getCpuCoreNum());
                dataNodeInfoList.add(dataNodeInfo);
            });
        }
        HashMap dataRegionNumMap = new HashMap();
        HashMap schemaRegionNumMap = new HashMap();
        List<TRegionReplicaSet> regionReplicaSets = this.getPartitionManager().getAllReplicaSets();
        regionReplicaSets.forEach(regionReplicaSet -> regionReplicaSet.getDataNodeLocations().forEach(dataNodeLocation -> {
            switch (regionReplicaSet.getRegionId().getType()) {
                case SchemaRegion: {
                    schemaRegionNumMap.computeIfAbsent(dataNodeLocation.getDataNodeId(), key -> new AtomicInteger()).getAndIncrement();
                    break;
                }
                default: {
                    dataRegionNumMap.computeIfAbsent(dataNodeLocation.getDataNodeId(), key -> new AtomicInteger()).getAndIncrement();
                }
            }
        }));
        AtomicInteger zero = new AtomicInteger(0);
        dataNodeInfoList.forEach(dataNodesInfo -> {
            dataNodesInfo.setSchemaRegionNum(schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
            dataNodesInfo.setDataRegionNum(dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
        });
        dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
        return dataNodeInfoList;
    }

    public int getDataNodeCpuCoreCount() {
        return this.nodeInfo.getDataNodeTotalCpuCoreCount();
    }

    public List<TConfigNodeLocation> getRegisteredConfigNodes() {
        return this.nodeInfo.getRegisteredConfigNodes();
    }

    public Map<Integer, TNodeVersionInfo> getNodeVersionInfo() {
        return this.nodeInfo.getNodeVersionInfo();
    }

    public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
        ArrayList<TConfigNodeInfo> configNodeInfoList = new ArrayList<TConfigNodeInfo>();
        List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes();
        if (registeredConfigNodes != null) {
            registeredConfigNodes.forEach(configNodeLocation -> {
                TConfigNodeInfo info = new TConfigNodeInfo();
                int configNodeId = configNodeLocation.getConfigNodeId();
                info.setConfigNodeId(configNodeId);
                info.setStatus(this.getLoadManager().getNodeStatusWithReason(configNodeId));
                info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
                info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
                info.setRoleType(configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID ? RegionRoleType.Leader.name() : RegionRoleType.Follower.name());
                configNodeInfoList.add(info);
            });
        }
        configNodeInfoList.sort(Comparator.comparingInt(TConfigNodeInfo::getConfigNodeId));
        return configNodeInfoList;
    }

    public void applyConfigNode(TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) {
        ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
        try {
            this.getConsensusManager().write(applyConfigNodePlan);
        }
        catch (ConsensusException e) {
            LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
        }
        UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId());
        try {
            this.getConsensusManager().write(updateVersionInfoPlan);
        }
        catch (ConsensusException e) {
            LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TSStatus checkConfigNodeBeforeRemove(RemoveConfigNodePlan removeConfigNodePlan) {
        this.removeConfigNodeLock.lock();
        try {
            if (this.getRegisteredConfigNodes().size() <= 1) {
                TSStatus tSStatus = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()).setMessage("Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
                return tSStatus;
            }
            long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() / 2);
            while (this.filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
                if (System.nanoTime() > deadline) {
                    TSStatus tSStatus = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()).setMessage("Remove ConfigNode failed because there is no other ConfigNode in Running status in current Cluster.");
                    return tSStatus;
                }
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    TSStatus tSStatus = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()).setMessage("Remove ConfigNode failed due to thread interruption.");
                    this.removeConfigNodeLock.unlock();
                    return tSStatus;
                }
            }
            if (!this.getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) {
                TSStatus e = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()).setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster.");
                return e;
            }
            TConfigNodeLocation leader = this.getConsensusManager().getLeaderLocation();
            if (leader == null) {
                TSStatus tSStatus = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()).setMessage("Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry.");
                return tSStatus;
            }
            if (leader.getInternalEndPoint().equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) {
                TSStatus tSStatus = this.transferLeader(removeConfigNodePlan, this.getConsensusManager().getConsensusGroupId());
                return tSStatus;
            }
        }
        finally {
            this.removeConfigNodeLock.unlock();
        }
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()).setMessage("Successfully remove confignode.");
    }

    private TSStatus transferLeader(RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
        Optional<TConfigNodeLocation> optional = this.filterConfigNodeThroughStatus(NodeStatus.Running).stream().filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation())).findAny();
        TConfigNodeLocation newLeader = null;
        if (!optional.isPresent()) {
            return new TSStatus(TSStatusCode.TRANSFER_LEADER_ERROR.getStatusCode()).setMessage("Transfer ConfigNode leader failed because can not find any running ConfigNode.");
        }
        newLeader = optional.get();
        try {
            this.getConsensusManager().getConsensusImpl().transferLeader(groupId, new Peer(groupId, newLeader.getConfigNodeId(), newLeader.getConsensusEndPoint()));
        }
        catch (ConsensusException e2) {
            return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()).setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed.");
        }
        return new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()).setRedirectNode(newLeader.getInternalEndPoint()).setMessage("The ConfigNode to be removed is leader, already transfer Leader to " + newLeader + ".");
    }

    public List<TSStatus> merge() {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.MERGE, dataNodeLocationMap);
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public List<TSStatus> flush(TFlushReq req) {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.FLUSH, req, dataNodeLocationMap);
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public List<TSStatus> flushOnSpecificDN(TFlushReq req, Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
        DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.FLUSH, req, dataNodeLocationMap);
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public List<TSStatus> clearCache(Set<Integer> clearCacheOptions) {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.CLEAR_CACHE, clearCacheOptions, dataNodeLocationMap);
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public List<TSStatus> setConfiguration(TSetConfigurationReq req) {
        ArrayList<TSStatus> responseList = new ArrayList<TSStatus>();
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        HashMap<Integer, TDataNodeLocation> targetDataNodes = new HashMap<Integer, TDataNodeLocation>();
        int nodeId = req.getNodeId();
        if (dataNodeLocationMap.containsKey(nodeId)) {
            targetDataNodes.put(nodeId, dataNodeLocationMap.get(nodeId));
        } else if (nodeId < 0) {
            targetDataNodes.putAll(dataNodeLocationMap);
        }
        if (!targetDataNodes.isEmpty()) {
            DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.SET_CONFIGURATION, req, dataNodeLocationMap);
            CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
            responseList.addAll(clientHandler.getResponseList());
        }
        List<TConfigNodeLocation> configNodes = this.getRegisteredConfigNodes();
        for (TConfigNodeLocation configNode : configNodes) {
            if (configNode.getConfigNodeId() == CONF.getConfigNodeId() || nodeId >= 0 && nodeId != configNode.getConfigNodeId()) continue;
            TSStatus status = null;
            try {
                status = (TSStatus)SyncConfigNodeClientPool.getInstance().sendSyncRequestToConfigNodeWithRetry(configNode.getInternalEndPoint(), new TSetConfigurationReq(req.getConfigs(), configNode.getConfigNodeId()), CnToCnNodeRequestType.SET_CONFIGURATION);
            }
            catch (Exception e) {
                status = RpcUtils.getStatus((int)TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), (String)e.getMessage());
            }
            responseList.add(status);
        }
        return responseList;
    }

    public List<TSStatus> startRpairData() {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.START_REPAIR_DATA, dataNodeLocationMap);
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public List<TSStatus> stopRepairData() {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.STOP_REPAIR_DATA, dataNodeLocationMap);
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public List<TSStatus> submitLoadConfigurationTask() {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        DataNodeAsyncRequestContext dataNodeRequestContext = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.LOAD_CONFIGURATION, dataNodeLocationMap);
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(dataNodeRequestContext);
        return dataNodeRequestContext.getResponseList();
    }

    public TShowConfigurationResp showConfiguration(int nodeId) {
        TShowConfigurationResp resp = new TShowConfigurationResp();
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        if (dataNodeLocationMap.containsKey(nodeId)) {
            TDataNodeLocation dataNodeLocation = dataNodeLocationMap.get(nodeId);
            return (TShowConfigurationResp)SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(dataNodeLocation.getInternalEndPoint(), null, CnToDnSyncRequestType.SHOW_CONFIGURATION);
        }
        for (TConfigNodeLocation registeredConfigNode : this.getRegisteredConfigNodes()) {
            if (registeredConfigNode.getConfigNodeId() != nodeId) continue;
            resp = (TShowConfigurationResp)SyncConfigNodeClientPool.getInstance().sendSyncRequestToConfigNodeWithRetry(registeredConfigNode.getInternalEndPoint(), nodeId, CnToCnNodeRequestType.SHOW_CONFIGURATION);
            return resp;
        }
        return resp;
    }

    public List<TSStatus> setSystemStatus(String status) {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.SET_SYSTEM_STATUS, status, dataNodeLocationMap);
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public TSStatus setDataNodeStatus(TSetDataNodeStatusReq setDataNodeStatusReq) {
        return (TSStatus)SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(setDataNodeStatusReq.getTargetDataNode().getInternalEndPoint(), setDataNodeStatusReq.getStatus(), CnToDnSyncRequestType.SET_SYSTEM_STATUS);
    }

    public TSStatus killQuery(String queryId, int dataNodeId) {
        if (dataNodeId < 0) {
            return this.killAllQueries();
        }
        return this.killSpecificQuery(queryId, this.getRegisteredDataNodeLocations().get(dataNodeId));
    }

    private TSStatus killAllQueries() {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.KILL_QUERY_INSTANCE, dataNodeLocationMap);
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
        return RpcUtils.squashResponseStatusList((List)clientHandler.getResponseList());
    }

    private TSStatus killSpecificQuery(String queryId, TDataNodeLocation dataNodeLocation) {
        if (dataNodeLocation == null) {
            return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()).setMessage("The target DataNode is not existed, please ensure your input <queryId> is correct");
        }
        return (TSStatus)SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(dataNodeLocation.getInternalEndPoint(), queryId, CnToDnSyncRequestType.KILL_QUERY_INSTANCE);
    }

    public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus ... status) {
        return this.nodeInfo.getRegisteredConfigNodes(this.getLoadManager().filterConfigNodeThroughStatus(status));
    }

    public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus ... status) {
        return this.nodeInfo.getRegisteredDataNodes(this.getLoadManager().filterDataNodeThroughStatus(status));
    }

    public List<TDataNodeConfiguration> filterDataNodeThroughStatus(Function<NodeStatus, Boolean> statusPredicate) {
        return this.nodeInfo.getRegisteredDataNodes(this.getLoadManager().filterDataNodeThroughStatus(statusPredicate));
    }

    public Optional<TDataNodeLocation> getLowestLoadDataNode() {
        int dataNodeId = this.getLoadManager().getLowestLoadDataNode();
        return dataNodeId < 0 ? Optional.empty() : Optional.of(this.getRegisteredDataNode(dataNodeId).getLocation());
    }

    public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
        int dataNodeId = this.getLoadManager().getLowestLoadDataNode(new ArrayList<Integer>(nodes));
        return this.getRegisteredDataNode(dataNodeId).getLocation();
    }

    private ConsensusManager getConsensusManager() {
        return this.configManager.getConsensusManager();
    }

    private ClusterSchemaManager getClusterSchemaManager() {
        return this.configManager.getClusterSchemaManager();
    }

    private ClusterManager getClusterManager() {
        return this.configManager.getClusterManager();
    }

    private PartitionManager getPartitionManager() {
        return this.configManager.getPartitionManager();
    }

    private LoadManager getLoadManager() {
        return this.configManager.getLoadManager();
    }

    private TriggerManager getTriggerManager() {
        return this.configManager.getTriggerManager();
    }

    private PipeManager getPipeManager() {
        return this.configManager.getPipeManager();
    }

    private UDFManager getUDFManager() {
        return this.configManager.getUDFManager();
    }

    private TTLManager getTTLManager() {
        return this.configManager.getTTLManager();
    }
}

