/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.it.env.cluster.env;

import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.exception.PortOccupiedException;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.pool.ISessionPool;
import org.apache.iotdb.isession.pool.ITableSessionPool;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.ClusterConstant;
import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.env.cluster.config.MppClusterConfig;
import org.apache.iotdb.it.env.cluster.config.MppCommonConfig;
import org.apache.iotdb.it.env.cluster.config.MppConfigNodeConfig;
import org.apache.iotdb.it.env.cluster.config.MppDataNodeConfig;
import org.apache.iotdb.it.env.cluster.config.MppJVMConfig;
import org.apache.iotdb.it.env.cluster.env.MultiClusterEnv;
import org.apache.iotdb.it.env.cluster.node.AINodeWrapper;
import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestLogger;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.itbase.env.BaseNodeWrapper;
import org.apache.iotdb.itbase.env.ClusterConfig;
import org.apache.iotdb.itbase.runtime.ClusterTestConnection;
import org.apache.iotdb.itbase.runtime.NodeConnection;
import org.apache.iotdb.itbase.runtime.ParallelRequestDelegate;
import org.apache.iotdb.itbase.runtime.RequestDelegate;
import org.apache.iotdb.itbase.runtime.SerialRequestDelegate;
import org.apache.iotdb.jdbc.Constant;
import org.apache.iotdb.jdbc.IoTDBConnection;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.TableSessionBuilder;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.session.pool.TableSessionPoolBuilder;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;

public abstract class AbstractEnv
implements BaseEnv {
    private static final Logger logger = IoTDBTestLogger.logger;
    private final Random rand = new Random();
    protected List<ConfigNodeWrapper> configNodeWrapperList = Collections.emptyList();
    protected List<DataNodeWrapper> dataNodeWrapperList = Collections.emptyList();
    protected List<AINodeWrapper> aiNodeWrapperList = Collections.emptyList();
    protected String testMethodName = null;
    protected int index = 0;
    protected long startTime;
    protected int retryCount = 30;
    private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
    private List<String> configNodeKillPoints = new ArrayList<String>();
    private List<String> dataNodeKillPoints = new ArrayList<String>();
    private MppClusterConfig clusterConfig;

    protected AbstractEnv() {
        this.startTime = System.currentTimeMillis();
        this.clusterConfig = new MppClusterConfig();
    }

    protected AbstractEnv(long startTime) {
        this.startTime = startTime;
        this.clusterConfig = new MppClusterConfig();
    }

    @Override
    public ClusterConfig getConfig() {
        return this.clusterConfig;
    }

    @Override
    public List<String> getMetricPrometheusReporterContents(String authHeader) {
        ArrayList<String> result = new ArrayList<String>();
        for (ConfigNodeWrapper configNode : this.configNodeWrapperList) {
            String configNodeMetricContent = this.getUrlContent("http://" + configNode.getIp() + ":" + configNode.getMetricPort() + "/metrics", authHeader);
            result.add(configNodeMetricContent);
        }
        for (DataNodeWrapper dataNode : this.dataNodeWrapperList) {
            String dataNodeMetricContent = this.getUrlContent("http://" + dataNode.getIp() + ":" + dataNode.getMetricPort() + "/metrics", authHeader);
            result.add(dataNodeMetricContent);
        }
        return result;
    }

    protected void initEnvironment(int configNodesNum, int dataNodesNum) {
        this.initEnvironment(configNodesNum, dataNodesNum, this.retryCount);
    }

    protected void initEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
        this.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, false);
    }

    protected void initEnvironment(int configNodesNum, int dataNodesNum, int retryCount, boolean addAINode) {
        this.retryCount = retryCount;
        this.configNodeWrapperList = new ArrayList<ConfigNodeWrapper>();
        this.dataNodeWrapperList = new ArrayList<DataNodeWrapper>();
        this.clientManager = new IClientManager.Factory().createClientManager((IClientPoolFactory)new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
        String testClassName = this.getTestClassName();
        ConfigNodeWrapper seedConfigNodeWrapper = new ConfigNodeWrapper(true, "", testClassName, this.testMethodName, EnvUtils.searchAvailablePorts(), this.index, this instanceof MultiClusterEnv, this.startTime);
        seedConfigNodeWrapper.createNodeDir();
        seedConfigNodeWrapper.changeConfig((MppConfigNodeConfig)this.clusterConfig.getConfigNodeConfig(), (MppCommonConfig)this.clusterConfig.getConfigNodeCommonConfig(), (MppJVMConfig)this.clusterConfig.getConfigNodeJVMConfig());
        seedConfigNodeWrapper.createLogDir();
        seedConfigNodeWrapper.setKillPoints(this.configNodeKillPoints);
        seedConfigNodeWrapper.start();
        String seedConfigNode = seedConfigNodeWrapper.getIpAndPortString();
        this.configNodeWrapperList.add(seedConfigNodeWrapper);
        try (SyncConfigNodeIServiceClient ignored = (SyncConfigNodeIServiceClient)this.getLeaderConfigNodeConnection();){
            logger.info("The Seed-ConfigNode started successfully!");
        }
        catch (Exception e) {
            logger.error("Failed to get connection to the Seed-ConfigNode", (Throwable)e);
        }
        ArrayList<String> configNodeEndpoints = new ArrayList<String>();
        SerialRequestDelegate<Void> configNodesDelegate = new SerialRequestDelegate<Void>(configNodeEndpoints);
        for (int i = 1; i < configNodesNum; ++i) {
            ConfigNodeWrapper configNodeWrapper = this.newConfigNode();
            this.configNodeWrapperList.add(configNodeWrapper);
            configNodeEndpoints.add(configNodeWrapper.getIpAndPortString());
            configNodesDelegate.addRequest(() -> {
                configNodeWrapper.start();
                return null;
            });
        }
        try {
            ((RequestDelegate)configNodesDelegate).requestAll();
        }
        catch (SQLException e) {
            logger.error("Start configNodes failed", (Throwable)e);
            throw new AssertionError();
        }
        ArrayList<String> dataNodeEndpoints = new ArrayList<String>();
        ParallelRequestDelegate<Void> dataNodesDelegate = new ParallelRequestDelegate<Void>(dataNodeEndpoints, 100);
        for (int i = 0; i < dataNodesNum; ++i) {
            DataNodeWrapper dataNodeWrapper = this.newDataNode();
            dataNodeEndpoints.add(dataNodeWrapper.getIpAndPortString());
            this.dataNodeWrapperList.add(dataNodeWrapper);
            dataNodesDelegate.addRequest(() -> {
                dataNodeWrapper.start();
                return null;
            });
        }
        try {
            ((RequestDelegate)dataNodesDelegate).requestAll();
        }
        catch (SQLException e) {
            logger.error("Start dataNodes failed", (Throwable)e);
            throw new AssertionError();
        }
        if (addAINode) {
            this.aiNodeWrapperList = new ArrayList<AINodeWrapper>();
            this.startAINode(seedConfigNode, testClassName);
        }
        this.checkClusterStatusWithoutUnknown();
    }

    private ConfigNodeWrapper newConfigNode() {
        ConfigNodeWrapper configNodeWrapper = new ConfigNodeWrapper(false, this.configNodeWrapperList.get(0).getIpAndPortString(), this.getTestClassName(), this.testMethodName, EnvUtils.searchAvailablePorts(), this.index, this instanceof MultiClusterEnv, this.startTime);
        configNodeWrapper.createNodeDir();
        configNodeWrapper.changeConfig((MppConfigNodeConfig)this.clusterConfig.getConfigNodeConfig(), (MppCommonConfig)this.clusterConfig.getConfigNodeCommonConfig(), (MppJVMConfig)this.clusterConfig.getConfigNodeJVMConfig());
        configNodeWrapper.createLogDir();
        configNodeWrapper.setKillPoints(this.configNodeKillPoints);
        return configNodeWrapper;
    }

    private DataNodeWrapper newDataNode() {
        DataNodeWrapper dataNodeWrapper = new DataNodeWrapper(this.configNodeWrapperList.get(0).getIpAndPortString(), this.getTestClassName(), this.testMethodName, EnvUtils.searchAvailablePorts(), this.index, this instanceof MultiClusterEnv, this.startTime);
        dataNodeWrapper.createNodeDir();
        dataNodeWrapper.changeConfig((MppDataNodeConfig)this.clusterConfig.getDataNodeConfig(), (MppCommonConfig)this.clusterConfig.getDataNodeCommonConfig(), (MppJVMConfig)this.clusterConfig.getDataNodeJVMConfig());
        dataNodeWrapper.createLogDir();
        dataNodeWrapper.setKillPoints(this.dataNodeKillPoints);
        return dataNodeWrapper;
    }

    private void startAINode(String seedConfigNode, String testClassName) {
        AINodeWrapper aiNodeWrapper = new AINodeWrapper(seedConfigNode, testClassName, this.testMethodName, this.index, EnvUtils.searchAvailablePorts(), this.startTime);
        this.aiNodeWrapperList.add(aiNodeWrapper);
        String aiNodeEndPoint = aiNodeWrapper.getIpAndPortString();
        aiNodeWrapper.createNodeDir();
        aiNodeWrapper.createLogDir();
        ParallelRequestDelegate<Void> aiNodesDelegate = new ParallelRequestDelegate<Void>(Collections.singletonList(aiNodeEndPoint), 100);
        aiNodesDelegate.addRequest(() -> {
            aiNodeWrapper.start();
            return null;
        });
        try {
            ((RequestDelegate)aiNodesDelegate).requestAll();
        }
        catch (SQLException e) {
            logger.error("Start aiNodes failed", (Throwable)e);
        }
    }

    public String getTestClassName() {
        StackTraceElement[] stack;
        for (StackTraceElement stackTraceElement : stack = Thread.currentThread().getStackTrace()) {
            String result;
            String className = stackTraceElement.getClassName();
            if (!className.endsWith("IT") || (result = className.substring(className.lastIndexOf(".") + 1)).startsWith("Abstract")) continue;
            return result;
        }
        return "UNKNOWN-IT";
    }

    private Map<String, Integer> countNodeStatus(Map<Integer, String> nodeStatus) {
        HashMap<String, Integer> result = new HashMap<String, Integer>();
        nodeStatus.values().forEach(status -> result.put((String)status, result.getOrDefault(status, 0) + 1));
        return result;
    }

    public void checkNodeInStatus(int nodeId, NodeStatus expectation) {
        this.checkClusterStatus(nodeStatusMap -> expectation.getStatus().equals(nodeStatusMap.get(nodeId)), m -> true);
    }

    public void checkClusterStatusWithoutUnknown() {
        this.checkClusterStatus(nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals), processStatus -> processStatus.values().stream().noneMatch(i -> i != 0));
        this.testJDBCConnection();
    }

    public void checkClusterStatusOneUnknownOtherRunning() {
        this.checkClusterStatus(nodeStatus -> {
            Map<String, Integer> count = this.countNodeStatus((Map<Integer, String>)nodeStatus);
            return count.getOrDefault("Unknown", 0) == 1 && count.getOrDefault("Running", 0) == nodeStatus.size() - 1;
        }, processStatus -> {
            long aliveProcessCount = processStatus.values().stream().filter(i -> i == 0).count();
            return aliveProcessCount == (long)(processStatus.size() - 1);
        });
        this.testJDBCConnection();
    }

    public void checkClusterStatus(Predicate<Map<Integer, String>> nodeStatusCheck, Predicate<Map<AbstractNodeWrapper, Integer>> processStatusCheck) {
        logger.info("Testing cluster environment...");
        Exception lastException = null;
        boolean showClusterPassed = true;
        boolean nodeSizePassed = true;
        boolean nodeStatusPassed = true;
        boolean processStatusPassed = true;
        TSStatus showClusterStatus = null;
        int actualNodeSize = 0;
        Map lastNodeStatus = null;
        HashMap<AbstractNodeWrapper, Integer> processStatusMap = new HashMap<AbstractNodeWrapper, Integer>();
        for (int i = 0; i < this.retryCount; ++i) {
            try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient)this.getLeaderConfigNodeConnection();){
                boolean alive;
                boolean passed = true;
                showClusterPassed = true;
                nodeSizePassed = true;
                nodeStatusPassed = true;
                processStatusPassed = true;
                processStatusMap.clear();
                TShowClusterResp showClusterResp = client.showCluster();
                if (showClusterResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    passed = false;
                    showClusterPassed = false;
                    showClusterStatus = showClusterResp.getStatus();
                }
                if (showClusterResp.getNodeStatus().size() != this.configNodeWrapperList.size() + this.dataNodeWrapperList.size() + this.aiNodeWrapperList.size()) {
                    passed = false;
                    nodeSizePassed = false;
                    actualNodeSize = showClusterResp.getNodeStatusSize();
                }
                if (passed && !(passed = nodeStatusCheck.test(showClusterResp.getNodeStatus()))) {
                    nodeStatusPassed = false;
                    lastNodeStatus = showClusterResp.getNodeStatus();
                }
                for (DataNodeWrapper dataNodeWrapper : this.dataNodeWrapperList) {
                    alive = dataNodeWrapper.getInstance().isAlive();
                    if (!alive) {
                        processStatusMap.put(dataNodeWrapper, dataNodeWrapper.getInstance().waitFor());
                        continue;
                    }
                    processStatusMap.put(dataNodeWrapper, 0);
                }
                for (ConfigNodeWrapper configNodeWrapper : this.configNodeWrapperList) {
                    alive = configNodeWrapper.getInstance().isAlive();
                    if (!alive) {
                        processStatusMap.put(configNodeWrapper, configNodeWrapper.getInstance().waitFor());
                        continue;
                    }
                    processStatusMap.put(configNodeWrapper, 0);
                }
                for (AINodeWrapper aINodeWrapper : this.aiNodeWrapperList) {
                    alive = aINodeWrapper.getInstance().isAlive();
                    if (!alive) {
                        processStatusMap.put(aINodeWrapper, aINodeWrapper.getInstance().waitFor());
                        continue;
                    }
                    processStatusMap.put(aINodeWrapper, 0);
                }
                processStatusPassed = processStatusCheck.test(processStatusMap);
                if (!processStatusPassed) {
                    passed = false;
                }
                if (!processStatusPassed) {
                    this.handleProcessStatus(processStatusMap);
                }
                if (passed) {
                    logger.info("The cluster is now ready for testing!");
                    return;
                }
                logger.info("Retry {}: showClusterPassed={}, nodeSizePassed={}, nodeStatusPassed={}, processStatusPassed={}", new Object[]{i, showClusterPassed, nodeSizePassed, nodeStatusPassed, processStatusPassed});
            }
            catch (Exception e2) {
                lastException = e2;
            }
            try {
                TimeUnit.SECONDS.sleep(1L);
                continue;
            }
            catch (InterruptedException e3) {
                lastException = e3;
                Thread.currentThread().interrupt();
            }
        }
        if (lastException != null) {
            logger.error("exception in test Cluster with RPC, message: {}", (Object)lastException.getMessage(), (Object)lastException);
        }
        if (!showClusterPassed) {
            logger.error("Show cluster failed: {}", showClusterStatus);
        }
        if (!nodeSizePassed) {
            logger.error("Only {} nodes detected", (Object)actualNodeSize);
        }
        if (!nodeStatusPassed) {
            logger.error("Some node status incorrect: {}", lastNodeStatus);
        }
        if (!processStatusPassed) {
            logger.error("Some process status incorrect: {}", processStatusMap.entrySet().stream().collect(Collectors.toMap(e -> ((AbstractNodeWrapper)e.getKey()).getId(), Map.Entry::getValue)));
            if (processStatusMap.containsValue(TSStatusCode.PORT_OCCUPIED.getStatusCode())) {
                throw new PortOccupiedException();
            }
        }
        throw new AssertionError((Object)String.format("After %d times retry, the cluster can't work!", this.retryCount));
    }

    private void handleProcessStatus(Map<AbstractNodeWrapper, Integer> processStatusMap) {
        for (Map.Entry<AbstractNodeWrapper, Integer> entry : processStatusMap.entrySet()) {
            Integer statusCode = entry.getValue();
            AbstractNodeWrapper nodeWrapper = entry.getKey();
            if (statusCode != 0) {
                logger.info("Node {} is not running due to {}", (Object)nodeWrapper.getId(), (Object)statusCode);
            }
            if (statusCode.intValue() != TSStatusCode.PORT_OCCUPIED.getStatusCode()) continue;
            try {
                Map<Integer, Long> portOccupationMap = EnvUtils.listPortOccupation(Arrays.stream(nodeWrapper.getPortList()).boxed().collect(Collectors.toList()));
                logger.info("Check port result: {}", portOccupationMap);
                for (DataNodeWrapper dataNodeWrapper : this.dataNodeWrapperList) {
                    if (!portOccupationMap.containsValue(dataNodeWrapper.getPid())) continue;
                    logger.info("A port is occupied by another DataNode {}-{}, restart it", (Object)dataNodeWrapper.getIpAndPortString(), (Object)dataNodeWrapper.getPid());
                    dataNodeWrapper.stop();
                    dataNodeWrapper.start();
                }
                for (ConfigNodeWrapper configNodeWrapper : this.configNodeWrapperList) {
                    if (!portOccupationMap.containsValue(configNodeWrapper.getPid())) continue;
                    logger.info("A port is occupied by another ConfigNode {}-{}, restart it", (Object)configNodeWrapper.getIpAndPortString(), (Object)configNodeWrapper.getPid());
                    configNodeWrapper.stop();
                    configNodeWrapper.start();
                }
                for (AINodeWrapper aiNodeWrapper : this.aiNodeWrapperList) {
                    if (!portOccupationMap.containsValue(aiNodeWrapper.getPid())) continue;
                    logger.info("A port is occupied by another AINode {}-{}, restart it", (Object)aiNodeWrapper.getIpAndPortString(), (Object)aiNodeWrapper.getPid());
                    aiNodeWrapper.stop();
                    aiNodeWrapper.start();
                }
            }
            catch (IOException e) {
                logger.error("Cannot check port occupation", (Throwable)e);
            }
            logger.info("Restarting it");
            nodeWrapper.stop();
            nodeWrapper.start();
        }
    }

    @Override
    public void cleanClusterEnvironment() {
        List allNodeWrappers = Stream.concat(this.dataNodeWrapperList.stream(), Stream.concat(this.configNodeWrapperList.stream(), this.aiNodeWrapperList.stream())).collect(Collectors.toList());
        allNodeWrappers.stream().findAny().ifPresent(nodeWrapper -> logger.info("You can find logs at {}", (Object)nodeWrapper.getLogDirPath()));
        for (AbstractNodeWrapper nodeWrapper2 : allNodeWrappers) {
            nodeWrapper2.stopForcibly();
            nodeWrapper2.destroyDir();
            String lockPath = EnvUtils.getLockFilePath(nodeWrapper2.getPort());
            if (new File(lockPath).delete()) continue;
            logger.error("Delete lock file {} failed", (Object)lockPath);
        }
        if (this.clientManager != null) {
            this.clientManager.close();
        }
        this.testMethodName = null;
        this.clusterConfig = new MppClusterConfig();
    }

    @Override
    public Connection getConnection(String username, String password, String sqlDialect) throws SQLException {
        return new ClusterTestConnection(this.getWriteConnection(null, username, password, sqlDialect), this.getReadConnections(null, username, password, sqlDialect));
    }

    @Override
    public Connection getConnection(DataNodeWrapper dataNodeWrapper, String username, String password, String sqlDialect) throws SQLException {
        return new ClusterTestConnection(this.getWriteConnectionWithSpecifiedDataNode(dataNodeWrapper, null, username, password, sqlDialect), this.getReadConnections(null, dataNodeWrapper, username, password, sqlDialect));
    }

    @Override
    public Connection getWriteOnlyConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode, String username, String password, String sqlDialect) throws SQLException {
        return new ClusterTestConnection(this.getWriteConnectionWithSpecifiedDataNode(dataNode, null, username, password, "table".equals(sqlDialect) ? "table" : "tree"), Collections.emptyList());
    }

    @Override
    public Connection getConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode, String username, String password) throws SQLException {
        return new ClusterTestConnection(this.getWriteConnectionWithSpecifiedDataNode(dataNode, null, username, password, "tree"), this.getReadConnections(null, username, password, "tree"));
    }

    @Override
    public Connection getConnection(Constant.Version version, String username, String password, String sqlDialect) throws SQLException {
        return System.getProperty("ReadAndVerifyWithMultiNode", "true").equalsIgnoreCase("true") ? new ClusterTestConnection(this.getWriteConnection(version, username, password, sqlDialect), this.getReadConnections(version, username, password, sqlDialect)) : this.getWriteConnection(version, username, password, sqlDialect).getUnderlyingConnecton();
    }

    @Override
    public ISession getSessionConnection() throws IoTDBConnectionException {
        DataNodeWrapper dataNode = this.dataNodeWrapperList.get(this.rand.nextInt(this.dataNodeWrapperList.size()));
        Session session = new Session.Builder().host(dataNode.getIp()).port(dataNode.getPort()).build();
        session.open();
        return session;
    }

    @Override
    public ISession getSessionConnection(ZoneId zoneId) throws IoTDBConnectionException {
        DataNodeWrapper dataNode = this.dataNodeWrapperList.get(this.rand.nextInt(this.dataNodeWrapperList.size()));
        Session session = new Session.Builder().host(dataNode.getIp()).port(dataNode.getPort()).zoneId(zoneId).build();
        session.open();
        return session;
    }

    @Override
    public ISession getSessionConnection(String userName, String password) throws IoTDBConnectionException {
        DataNodeWrapper dataNode = this.dataNodeWrapperList.get(this.rand.nextInt(this.dataNodeWrapperList.size()));
        Session session = new Session.Builder().host(dataNode.getIp()).port(dataNode.getPort()).username(userName).password(password).build();
        session.open();
        return session;
    }

    @Override
    public ISession getSessionConnection(List<String> nodeUrls) throws IoTDBConnectionException {
        Session session = new Session.Builder().nodeUrls(nodeUrls).username("root").password("root").fetchSize(5000).zoneId(null).thriftDefaultBufferSize(1024).thriftMaxFrameSize(0x4000000).enableRedirection(true).version(SessionConfig.DEFAULT_VERSION).build();
        session.open();
        return session;
    }

    @Override
    public ITableSession getTableSessionConnection() throws IoTDBConnectionException {
        DataNodeWrapper dataNode = this.dataNodeWrapperList.get(this.rand.nextInt(this.dataNodeWrapperList.size()));
        return new TableSessionBuilder().nodeUrls(Collections.singletonList(dataNode.getIpAndPortString())).build();
    }

    @Override
    public ITableSession getTableSessionConnection(String userName, String password) throws IoTDBConnectionException {
        DataNodeWrapper dataNode = this.dataNodeWrapperList.get(this.rand.nextInt(this.dataNodeWrapperList.size()));
        return new TableSessionBuilder().nodeUrls(Collections.singletonList(dataNode.getIpAndPortString())).username(userName).password(password).build();
    }

    @Override
    public ITableSession getTableSessionConnectionWithDB(String database) throws IoTDBConnectionException {
        DataNodeWrapper dataNode = this.dataNodeWrapperList.get(this.rand.nextInt(this.dataNodeWrapperList.size()));
        return new TableSessionBuilder().nodeUrls(Collections.singletonList(dataNode.getIpAndPortString())).database(database).build();
    }

    @Override
    public ITableSession getTableSessionConnection(List<String> nodeUrls) throws IoTDBConnectionException {
        return new TableSessionBuilder().nodeUrls(nodeUrls).username("root").password("root").fetchSize(5000).zoneId(null).thriftDefaultBufferSize(1024).thriftMaxFrameSize(0x4000000).enableRedirection(true).build();
    }

    @Override
    public ISessionPool getSessionPool(int maxSize) {
        DataNodeWrapper dataNode = this.dataNodeWrapperList.get(this.rand.nextInt(this.dataNodeWrapperList.size()));
        return new SessionPool.Builder().host(dataNode.getIp()).port(dataNode.getPort()).user("root").password("root").maxSize(maxSize).build();
    }

    @Override
    public ITableSessionPool getTableSessionPool(int maxSize) {
        DataNodeWrapper dataNode = this.dataNodeWrapperList.get(this.rand.nextInt(this.dataNodeWrapperList.size()));
        return new TableSessionPoolBuilder().nodeUrls(Collections.singletonList(dataNode.getIpAndPortString())).user("root").password("root").maxSize(maxSize).build();
    }

    @Override
    public ITableSessionPool getTableSessionPool(int maxSize, String database) {
        DataNodeWrapper dataNode = this.dataNodeWrapperList.get(this.rand.nextInt(this.dataNodeWrapperList.size()));
        return new TableSessionPoolBuilder().nodeUrls(Collections.singletonList(dataNode.getIpAndPortString())).user("root").password("root").database(database).maxSize(maxSize).build();
    }

    protected NodeConnection getWriteConnection(Constant.Version version, String username, String password, String sqlDialect) throws SQLException {
        if (System.getProperty("RandomSelectWriteNode", "true").equalsIgnoreCase("true")) {
            DataNodeWrapper dataNode = this.dataNodeWrapperList.get(this.rand.nextInt(this.dataNodeWrapperList.size()));
        } else {
            DataNodeWrapper dataNode = this.dataNodeWrapperList.get(0);
        }
        return this.getWriteConnectionFromDataNodeList(this.dataNodeWrapperList, version, username, password, sqlDialect);
    }

    protected NodeConnection getWriteConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode, Constant.Version version, String username, String password, String sqlDialect) throws SQLException {
        String endpoint = dataNode.getIp() + ":" + dataNode.getPort();
        Connection writeConnection = DriverManager.getConnection("jdbc:iotdb://" + endpoint + this.getParam(version, 0, "GMT+0"), BaseEnv.constructProperties(username, password, sqlDialect));
        return new NodeConnection(endpoint, NodeConnection.NodeRole.DATA_NODE, NodeConnection.ConnectionRole.WRITE, writeConnection);
    }

    protected NodeConnection getWriteConnectionFromDataNodeList(List<DataNodeWrapper> dataNodeList, Constant.Version version, String username, String password, String sqlDialect) throws SQLException {
        ArrayList<DataNodeWrapper> dataNodeWrapperListCopy = new ArrayList<DataNodeWrapper>(dataNodeList);
        Collections.shuffle(dataNodeWrapperListCopy);
        Throwable lastException = null;
        for (DataNodeWrapper dataNode : dataNodeWrapperListCopy) {
            try {
                return this.getWriteConnectionWithSpecifiedDataNode(dataNode, version, username, password, sqlDialect);
            }
            catch (SQLException e) {
                lastException = e;
            }
        }
        if (!(lastException.getCause() instanceof TTransportException)) {
            logger.error("Failed to get connection from any DataNode, last exception is ", lastException);
        }
        throw lastException;
    }

    protected List<NodeConnection> getReadConnections(Constant.Version version, String username, String password, String sqlDialect) throws SQLException {
        ArrayList<String> endpoints = new ArrayList<String>();
        ParallelRequestDelegate readConnRequestDelegate = new ParallelRequestDelegate(endpoints, 100);
        this.dataNodeWrapperList.stream().map(AbstractNodeWrapper::getIpAndPortString).forEach(endpoint -> {
            endpoints.add((String)endpoint);
            readConnRequestDelegate.addRequest(() -> new NodeConnection((String)endpoint, NodeConnection.NodeRole.DATA_NODE, NodeConnection.ConnectionRole.READ, DriverManager.getConnection("jdbc:iotdb://" + endpoint + this.getParam(version, 0, "GMT+0"), BaseEnv.constructProperties(username, password, sqlDialect))));
        });
        return readConnRequestDelegate.requestAll();
    }

    protected List<NodeConnection> getReadConnections(Constant.Version version, DataNodeWrapper dataNode, String username, String password, String sqlDialect) throws SQLException {
        ArrayList<String> endpoints = new ArrayList<String>();
        ParallelRequestDelegate<NodeConnection> readConnRequestDelegate = new ParallelRequestDelegate<NodeConnection>(endpoints, 100);
        endpoints.add(dataNode.getIpAndPortString());
        readConnRequestDelegate.addRequest(() -> new NodeConnection(dataNode.getIpAndPortString(), NodeConnection.NodeRole.DATA_NODE, NodeConnection.ConnectionRole.READ, DriverManager.getConnection("jdbc:iotdb://" + dataNode.getIpAndPortString() + this.getParam(version, 0, "GMT+0"), BaseEnv.constructProperties(username, password, sqlDialect))));
        return readConnRequestDelegate.requestAll();
    }

    protected void testJDBCConnection() {
        logger.info("Testing JDBC connection...");
        List<String> endpoints = this.dataNodeWrapperList.stream().map(AbstractNodeWrapper::getIpAndPortString).collect(Collectors.toList());
        ParallelRequestDelegate<Void> testDelegate = new ParallelRequestDelegate<Void>(endpoints, 100);
        for (DataNodeWrapper dataNode : this.dataNodeWrapperList) {
            String dataNodeEndpoint = dataNode.getIpAndPortString();
            testDelegate.addRequest(() -> {
                Exception lastException = null;
                int i = 0;
                while (i < this.retryCount) {
                    Void void_;
                    block10: {
                        IoTDBConnection ignored = (IoTDBConnection)DriverManager.getConnection("jdbc:iotdb://" + dataNodeEndpoint + this.getParam(null, 0, "GMT+0"), System.getProperty("User", "root"), System.getProperty("Password", "root"));
                        try {
                            logger.info("Successfully connecting to DataNode: {}.", (Object)dataNodeEndpoint);
                            void_ = null;
                            if (ignored == null) break block10;
                        }
                        catch (Throwable throwable) {
                            try {
                                if (ignored != null) {
                                    try {
                                        ignored.close();
                                    }
                                    catch (Throwable throwable2) {
                                        throwable.addSuppressed(throwable2);
                                    }
                                }
                                throw throwable;
                            }
                            catch (Exception e) {
                                lastException = e;
                                TimeUnit.SECONDS.sleep(1L);
                                ++i;
                            }
                        }
                        ignored.close();
                    }
                    return void_;
                }
                if (lastException != null) {
                    throw lastException;
                }
                return null;
            });
        }
        try {
            ((RequestDelegate)testDelegate).requestAll();
        }
        catch (Exception e) {
            logger.error("exception in test Cluster with RPC, message: {}", (Object)e.getMessage(), (Object)e);
            throw new AssertionError((Object)String.format("After %d times retry, the cluster can't work!", this.retryCount));
        }
    }

    private String getParam(Constant.Version version, int timeout, String timeZone) {
        StringBuilder sb = new StringBuilder("?");
        sb.append("network_timeout").append("=").append(timeout);
        if (version != null) {
            sb.append("&").append("version").append("=").append(version);
        }
        if (timeZone != null) {
            sb.append("&").append("time_zone").append("=").append(timeZone);
        }
        return sb.toString();
    }

    public String getTestMethodName() {
        return this.testMethodName;
    }

    @Override
    public void setTestMethodName(String testMethodName) {
        this.testMethodName = testMethodName;
    }

    @Override
    public void dumpTestJVMSnapshot() {
        this.configNodeWrapperList.forEach(configNodeWrapper -> configNodeWrapper.executeJstack(this.testMethodName));
        this.dataNodeWrapperList.forEach(dataNodeWrapper -> dataNodeWrapper.executeJstack(this.testMethodName));
    }

    @Override
    public List<AbstractNodeWrapper> getNodeWrapperList() {
        ArrayList<AbstractNodeWrapper> result = new ArrayList<AbstractNodeWrapper>(this.configNodeWrapperList);
        result.addAll(this.dataNodeWrapperList);
        return result;
    }

    @Override
    public List<ConfigNodeWrapper> getConfigNodeWrapperList() {
        return this.configNodeWrapperList;
    }

    @Override
    public List<DataNodeWrapper> getDataNodeWrapperList() {
        return this.dataNodeWrapperList;
    }

    @Override
    public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException, InterruptedException {
        Exception lastException = null;
        AbstractNodeWrapper lastErrorNode = null;
        for (int i = 0; i < this.retryCount; ++i) {
            for (ConfigNodeWrapper configNodeWrapper : this.configNodeWrapperList) {
                try {
                    if (!configNodeWrapper.getInstance().isAlive()) {
                        throw new IOException("ConfigNode " + configNodeWrapper.getId() + " is not alive");
                    }
                    lastErrorNode = configNodeWrapper;
                    SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient)this.clientManager.borrowClient((Object)new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()));
                    TShowClusterResp resp = client.showCluster();
                    if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                        return client;
                    }
                    client.close();
                    throw new Exception("Bad status: " + resp.getStatus().getCode() + " message: " + resp.getStatus().getMessage());
                }
                catch (Exception e) {
                    lastException = e;
                    TimeUnit.SECONDS.sleep(1L);
                }
            }
        }
        if (lastErrorNode != null) {
            throw new IOException("Failed to get connection to ConfigNode-Leader. Last error configNode: " + lastErrorNode.getIpAndPortString(), lastException);
        }
        throw new IOException("Empty configNode set");
    }

    @Override
    public IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws Exception {
        Exception lastException = null;
        ConfigNodeWrapper configNodeWrapper = this.configNodeWrapperList.get(index);
        for (int i = 0; i < 30; ++i) {
            try {
                return (IConfigNodeRPCService.Iface)this.clientManager.borrowClient((Object)new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()));
            }
            catch (Exception e) {
                lastException = e;
                TimeUnit.SECONDS.sleep(1L);
                continue;
            }
        }
        throw new IOException("Failed to get connection to this ConfigNode. Last error: " + lastException);
    }

    /*
     * Enabled aggressive exception aggregation
     */
    @Override
    public int getFirstLeaderSchemaRegionDataNodeIndex() throws IOException, InterruptedException {
        Exception lastException = null;
        AbstractNodeWrapper lastErrorNode = null;
        for (int retry = 0; retry < 30; ++retry) {
            for (int configNodeId = 0; configNodeId < this.configNodeWrapperList.size(); ++configNodeId) {
                ConfigNodeWrapper configNodeWrapper = this.configNodeWrapperList.get(configNodeId);
                lastErrorNode = configNodeWrapper;
                try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient)this.clientManager.borrowClient((Object)new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()));){
                    TShowRegionResp resp = client.showRegion(new TShowRegionReq().setConsensusGroupType(TConsensusGroupType.SchemaRegion));
                    if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                        for (TRegionInfo tRegionInfo : resp.getRegionInfoList()) {
                            if (!tRegionInfo.getRoleType().equals("Leader")) continue;
                            String ip = tRegionInfo.getClientRpcIp();
                            int port = tRegionInfo.getClientRpcPort();
                            for (int dataNodeId = 0; dataNodeId < this.dataNodeWrapperList.size(); ++dataNodeId) {
                                DataNodeWrapper dataNodeWrapper = this.dataNodeWrapperList.get(dataNodeId);
                                if (!dataNodeWrapper.getIp().equals(ip) || dataNodeWrapper.getPort() != port) continue;
                                int n = dataNodeId;
                                return n;
                            }
                        }
                        logger.error("No leaders in all schemaRegions.");
                        int n = -1;
                        return n;
                    }
                    throw new Exception("Bad status: " + resp.getStatus().getCode() + " message: " + resp.getStatus().getMessage());
                }
                catch (Exception e) {
                    lastException = e;
                    TimeUnit.SECONDS.sleep(1L);
                    continue;
                }
            }
        }
        if (lastErrorNode != null) {
            throw new IOException("Failed to get the index of SchemaRegion-Leader from configNode. Last error configNode: " + lastErrorNode.getIpAndPortString(), lastException);
        }
        throw new IOException("Empty configNode set");
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public int getLeaderConfigNodeIndex() throws IOException, InterruptedException {
        Exception lastException = null;
        AbstractNodeWrapper lastErrorNode = null;
        int retry = 0;
        while (true) {
            if (retry >= this.retryCount) {
                if (lastErrorNode == null) throw new IOException("Empty configNode set");
                throw new IOException("Failed to get the index of ConfigNode-Leader. Last error configNode: " + lastErrorNode.getIpAndPortString(), lastException);
            }
            for (int configNodeId = 0; configNodeId < this.configNodeWrapperList.size(); ++configNodeId) {
                ConfigNodeWrapper configNodeWrapper = this.configNodeWrapperList.get(configNodeId);
                lastErrorNode = configNodeWrapper;
                try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient)this.clientManager.borrowClient((Object)new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()));){
                    TShowClusterResp resp = client.showCluster();
                    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) throw new Exception("Bad status: " + resp.getStatus().getCode() + " message: " + resp.getStatus().getMessage());
                    int n = configNodeId;
                    return n;
                }
                catch (Exception e) {
                    lastException = e;
                    TimeUnit.SECONDS.sleep(1L);
                    continue;
                }
            }
            ++retry;
        }
    }

    @Override
    public void startConfigNode(int index) {
        this.configNodeWrapperList.get(index).start();
    }

    @Override
    public void startAllConfigNodes() {
        this.configNodeWrapperList.forEach(AbstractNodeWrapper::start);
    }

    @Override
    public void shutdownConfigNode(int index) {
        this.configNodeWrapperList.get(index).stop();
    }

    @Override
    public void shutdownAllConfigNodes() {
        this.configNodeWrapperList.forEach(AbstractNodeWrapper::stop);
    }

    @Override
    public void shutdownForciblyAllConfigNodes() {
        this.configNodeWrapperList.forEach(AbstractNodeWrapper::stopForcibly);
    }

    @Override
    public ConfigNodeWrapper getConfigNodeWrapper(int index) {
        return this.configNodeWrapperList.get(index);
    }

    @Override
    public DataNodeWrapper getDataNodeWrapper(int index) {
        return this.dataNodeWrapperList.get(index);
    }

    @Override
    public ConfigNodeWrapper generateRandomConfigNodeWrapper() {
        ConfigNodeWrapper newConfigNodeWrapper = new ConfigNodeWrapper(false, this.configNodeWrapperList.get(0).getIpAndPortString(), this.getTestClassName(), this.getTestMethodName(), EnvUtils.searchAvailablePorts(), this.index, this instanceof MultiClusterEnv, this.startTime);
        this.configNodeWrapperList.add(newConfigNodeWrapper);
        newConfigNodeWrapper.createNodeDir();
        newConfigNodeWrapper.changeConfig((MppConfigNodeConfig)this.clusterConfig.getConfigNodeConfig(), (MppCommonConfig)this.clusterConfig.getConfigNodeCommonConfig(), (MppJVMConfig)this.clusterConfig.getConfigNodeJVMConfig());
        newConfigNodeWrapper.createLogDir();
        return newConfigNodeWrapper;
    }

    @Override
    public DataNodeWrapper generateRandomDataNodeWrapper() {
        DataNodeWrapper newDataNodeWrapper = new DataNodeWrapper(this.configNodeWrapperList.get(0).getIpAndPortString(), this.getTestClassName(), this.getTestMethodName(), EnvUtils.searchAvailablePorts(), this.index, this instanceof MultiClusterEnv, this.startTime);
        this.dataNodeWrapperList.add(newDataNodeWrapper);
        newDataNodeWrapper.createNodeDir();
        newDataNodeWrapper.changeConfig((MppDataNodeConfig)this.clusterConfig.getDataNodeConfig(), (MppCommonConfig)this.clusterConfig.getDataNodeCommonConfig(), (MppJVMConfig)this.clusterConfig.getDataNodeJVMConfig());
        newDataNodeWrapper.createLogDir();
        return newDataNodeWrapper;
    }

    @Override
    public void registerNewDataNode(boolean isNeedVerify) {
        this.registerNewDataNode(this.generateRandomDataNodeWrapper(), isNeedVerify);
    }

    @Override
    public void registerNewConfigNode(boolean isNeedVerify) {
        this.registerNewConfigNode(this.generateRandomConfigNodeWrapper(), isNeedVerify);
    }

    @Override
    public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, boolean isNeedVerify) {
        ParallelRequestDelegate<Void> configNodeDelegate = new ParallelRequestDelegate<Void>(Collections.singletonList(newConfigNodeWrapper.getIpAndPortString()), 100);
        configNodeDelegate.addRequest(() -> {
            newConfigNodeWrapper.start();
            return null;
        });
        try {
            ((RequestDelegate)configNodeDelegate).requestAll();
        }
        catch (SQLException e) {
            logger.error("Start configNode failed", (Throwable)e);
            throw new AssertionError();
        }
        if (isNeedVerify) {
            this.checkClusterStatusWithoutUnknown();
        }
    }

    @Override
    public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean isNeedVerify) {
        List<String> dataNodeEndpoints = Collections.singletonList(newDataNodeWrapper.getIpAndPortString());
        ParallelRequestDelegate<Void> dataNodesDelegate = new ParallelRequestDelegate<Void>(dataNodeEndpoints, 100);
        dataNodesDelegate.addRequest(() -> {
            newDataNodeWrapper.start();
            return null;
        });
        try {
            ((RequestDelegate)dataNodesDelegate).requestAll();
        }
        catch (SQLException e) {
            logger.error("Start dataNodes failed", (Throwable)e);
            throw new AssertionError();
        }
        if (isNeedVerify) {
            this.checkClusterStatusWithoutUnknown();
        }
    }

    @Override
    public void startDataNode(int index) {
        this.dataNodeWrapperList.get(index).start();
    }

    @Override
    public void startAllDataNodes() {
        this.dataNodeWrapperList.forEach(AbstractNodeWrapper::start);
    }

    @Override
    public void shutdownDataNode(int index) {
        this.dataNodeWrapperList.get(index).stop();
    }

    @Override
    public void shutdownAllDataNodes() {
        this.dataNodeWrapperList.forEach(AbstractNodeWrapper::stop);
    }

    @Override
    public void shutdownForciblyAllDataNodes() {
        this.dataNodeWrapperList.forEach(AbstractNodeWrapper::stopForcibly);
    }

    @Override
    public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus) throws IllegalStateException {
        Throwable lastException = null;
        for (int i = 0; i < this.retryCount; ++i) {
            try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient)EnvFactory.getEnv().getLeaderConfigNodeConnection();){
                ArrayList<String> errorMessages = new ArrayList<String>(nodes.size());
                HashMap nodeIds = new HashMap(nodes.size());
                TShowClusterResp showClusterResp = client.showCluster();
                showClusterResp.getConfigNodeList().forEach(node -> nodeIds.put(node.getInternalEndPoint().getIp() + ":" + node.getInternalEndPoint().getPort(), node.getConfigNodeId()));
                showClusterResp.getDataNodeList().forEach(node -> nodeIds.put(node.getClientRpcEndPoint().getIp() + ":" + node.getClientRpcEndPoint().getPort(), node.getDataNodeId()));
                for (int j = 0; j < nodes.size(); ++j) {
                    String endpoint = nodes.get(j).getIpAndPortString();
                    if (!nodeIds.containsKey(endpoint)) {
                        errorMessages.add("The node " + nodes.get(j).getIpAndPortString() + " is not found!");
                        continue;
                    }
                    String status = (String)showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
                    if (targetStatus.get(j).getStatus().equals(status)) continue;
                    errorMessages.add(String.format("Node %s is in status %s, but expected %s", endpoint, status, targetStatus.get(j)));
                }
                if (errorMessages.isEmpty()) {
                    return;
                }
                lastException = new IllegalStateException(String.join((CharSequence)". ", errorMessages));
            }
            catch (IOException | InterruptedException | ClientManagerException | TException e) {
                lastException = e;
            }
            try {
                TimeUnit.SECONDS.sleep(1L);
                continue;
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        throw new IllegalStateException(lastException);
    }

    @Override
    public int getMqttPort() {
        return this.dataNodeWrapperList.get(new Random(System.currentTimeMillis()).nextInt(this.dataNodeWrapperList.size())).getMqttPort();
    }

    @Override
    public String getIP() {
        return this.dataNodeWrapperList.get(0).getIp();
    }

    @Override
    public String getPort() {
        return String.valueOf(this.dataNodeWrapperList.get(0).getPort());
    }

    @Override
    public String getSbinPath() {
        return ClusterConstant.TEMPLATE_NODE_PATH + File.separator + "sbin";
    }

    @Override
    public String getToolsPath() {
        return ClusterConstant.TEMPLATE_NODE_PATH + File.separator + "tools";
    }

    @Override
    public String getLibPath() {
        return ClusterConstant.TEMPLATE_NODE_LIB_PATH;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Optional<DataNodeWrapper> dataNodeIdToWrapper(int nodeId) {
        try (SyncConfigNodeIServiceClient leaderClient = (SyncConfigNodeIServiceClient)this.getLeaderConfigNodeConnection();){
            TShowDataNodesResp resp = leaderClient.showDataNodes();
            for (TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) {
                if (dataNodeInfo.getDataNodeId() != nodeId) continue;
                Optional<DataNodeWrapper> optional = this.dataNodeWrapperList.stream().filter(dataNodeWrapper -> dataNodeWrapper.getPort() == dataNodeInfo.getRpcPort()).findAny();
                return optional;
            }
            Optional optional = Optional.empty();
            return optional;
        }
        catch (Exception e) {
            return Optional.empty();
        }
    }

    @Override
    public void registerConfigNodeKillPoints(List<String> killPoints) {
        this.configNodeKillPoints = killPoints;
    }

    @Override
    public void registerDataNodeKillPoints(List<String> killPoints) {
        this.dataNodeKillPoints = killPoints;
    }

    public void clearClientManager() {
        this.clientManager.clearAll();
    }
}

