/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.confignode.manager.load.service;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq;
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.confignode.client.async.AsyncAINodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeartbeatService {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class);
    private static final long HEARTBEAT_INTERVAL = ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
    protected IManager configManager;
    private final LoadCache loadCache;
    private final Object heartbeatScheduleMonitor = new Object();
    private Future<?> currentHeartbeatFuture;
    private final ScheduledExecutorService heartBeatExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor((String)ThreadName.CONFIG_NODE_HEART_BEAT_SERVICE.getName());
    private final AtomicLong heartbeatCounter = new AtomicLong(0L);
    private static final int configNodeListPeriodicallySyncInterval = 100;

    public HeartbeatService(IManager configManager, LoadCache loadCache) {
        this.setConfigManager(configManager);
        this.loadCache = loadCache;
    }

    protected void setConfigManager(IManager configManager) {
        this.configManager = configManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startHeartbeatService() {
        Object object = this.heartbeatScheduleMonitor;
        synchronized (object) {
            if (this.currentHeartbeatFuture == null) {
                this.currentHeartbeatFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay((ScheduledExecutorService)this.heartBeatExecutor, this::heartbeatLoopBody, (long)0L, (long)HEARTBEAT_INTERVAL, (TimeUnit)TimeUnit.MILLISECONDS);
                LOGGER.info("Heartbeat service is started successfully.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopHeartbeatService() {
        Object object = this.heartbeatScheduleMonitor;
        synchronized (object) {
            if (this.currentHeartbeatFuture != null) {
                this.currentHeartbeatFuture.cancel(false);
                this.currentHeartbeatFuture = null;
                LOGGER.info("Heartbeat service is stopped successfully.");
            }
        }
    }

    private void heartbeatLoopBody() {
        Optional.ofNullable(this.getConsensusManager()).ifPresent(consensusManager -> {
            if (this.getConsensusManager().isLeader()) {
                this.pingRegisteredConfigNodes(this.genConfigNodeHeartbeatReq(), this.getNodeManager().getRegisteredConfigNodes());
                this.pingRegisteredDataNodes(this.genHeartbeatReq(), this.getNodeManager().getRegisteredDataNodes());
                this.pingRegisteredAINodes(this.genMLHeartbeatReq(), this.getNodeManager().getRegisteredAINodes());
            }
        });
    }

    private TDataNodeHeartbeatReq genHeartbeatReq() {
        Map<Integer, Set<Integer>> topologyMap;
        TDataNodeHeartbeatReq heartbeatReq = new TDataNodeHeartbeatReq();
        heartbeatReq.setHeartbeatTimestamp(System.nanoTime());
        heartbeatReq.setLogicalClock(this.configManager.getConsensusManager().getConsensusImpl().getLogicalClock((ConsensusGroupId)ConfigNodeInfo.CONFIG_REGION_ID));
        heartbeatReq.setNeedJudgeLeader(true);
        heartbeatReq.setNeedSamplingLoad(this.heartbeatCounter.get() % 10L == 0L);
        Pair<Long, Long> schemaQuotaRemain = this.configManager.getClusterSchemaManager().getSchemaQuotaRemain();
        heartbeatReq.setTimeSeriesQuotaRemain(((Long)schemaQuotaRemain.left).longValue());
        heartbeatReq.setDeviceQuotaRemain(((Long)schemaQuotaRemain.right).longValue());
        heartbeatReq.setNeedPipeMetaList(!PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() && this.heartbeatCounter.get() % (long)PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() == 0L);
        if (!this.configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
            heartbeatReq.setSchemaRegionIds(this.configManager.getClusterQuotaManager().getSchemaRegionIds());
            heartbeatReq.setDataRegionIds(this.configManager.getClusterQuotaManager().getDataRegionIds());
            heartbeatReq.setSpaceQuotaUsage(this.configManager.getClusterQuotaManager().getSpaceQuotaUsage());
        }
        if ((topologyMap = this.configManager.getLoadManager().getLoadCache().getTopology()) != null) {
            heartbeatReq.setTopology(topologyMap);
            heartbeatReq.setDataNodes(this.configManager.getNodeManager().getRegisteredDataNodeLocations());
        }
        if (this.heartbeatCounter.get() % 100L == 0L) {
            heartbeatReq.setCurrentRegionOperations(this.configManager.getProcedureManager().getRegionOperationConsensusIds());
        }
        this.heartbeatCounter.getAndIncrement();
        return heartbeatReq;
    }

    private void addConfigNodeLocationsToReq(int dataNodeId, TDataNodeHeartbeatReq req) {
        Set<TEndPoint> confirmedConfigNodes = this.loadCache.getConfirmedConfigNodeEndPoints(dataNodeId);
        Set actualConfigNodes = this.getNodeManager().getRegisteredConfigNodes().stream().map(TConfigNodeLocation::getInternalEndPoint).collect(Collectors.toSet());
        if (!actualConfigNodes.equals(confirmedConfigNodes) || this.heartbeatCounter.get() % 100L == 0L) {
            req.setConfigNodeEndPoints(actualConfigNodes);
        }
    }

    private TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq() {
        TConfigNodeHeartbeatReq req = new TConfigNodeHeartbeatReq();
        req.setTimestamp(System.nanoTime());
        return req;
    }

    private TAIHeartbeatReq genMLHeartbeatReq() {
        TAIHeartbeatReq heartbeatReq = new TAIHeartbeatReq();
        heartbeatReq.setHeartbeatTimestamp(System.nanoTime());
        heartbeatReq.setNeedSamplingLoad(this.heartbeatCounter.get() % 10L == 0L);
        return heartbeatReq;
    }

    private void pingRegisteredConfigNodes(TConfigNodeHeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
        for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
            int configNodeId = configNodeLocation.getConfigNodeId();
            if (configNodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID || this.loadCache.checkAndSetHeartbeatProcessing(configNodeId)) continue;
            ConfigNodeHeartbeatHandler handler = this.getConfigNodeHeartbeatHandler(configNodeId);
            AsyncConfigNodeHeartbeatClientPool.getInstance().getConfigNodeHeartBeat(configNodeLocation.getInternalEndPoint(), heartbeatReq, handler);
        }
    }

    protected ConfigNodeHeartbeatHandler getConfigNodeHeartbeatHandler(int configNodeId) {
        return new ConfigNodeHeartbeatHandler(configNodeId, this.configManager.getLoadManager());
    }

    private void pingRegisteredDataNodes(TDataNodeHeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
        for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
            int dataNodeId = dataNodeInfo.getLocation().getDataNodeId();
            if (this.loadCache.checkAndSetHeartbeatProcessing(dataNodeId)) continue;
            DataNodeHeartbeatHandler handler = new DataNodeHeartbeatHandler(dataNodeId, this.configManager.getLoadManager(), this.configManager.getClusterQuotaManager().getDeviceNum(), this.configManager.getClusterQuotaManager().getTimeSeriesNum(), this.configManager.getClusterQuotaManager().getRegionDisk(), this.configManager.getClusterSchemaManager()::updateTimeSeriesUsage, this.configManager.getClusterSchemaManager()::updateDeviceUsage, this.configManager.getPipeManager().getPipeRuntimeCoordinator());
            this.configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
            this.addConfigNodeLocationsToReq(dataNodeId, heartbeatReq);
            AsyncDataNodeHeartbeatClientPool.getInstance().getDataNodeHeartBeat(dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
        }
    }

    private void pingRegisteredAINodes(TAIHeartbeatReq heartbeatReq, List<TAINodeConfiguration> registeredAINodes) {
        for (TAINodeConfiguration aiNodeInfo : registeredAINodes) {
            AINodeHeartbeatHandler handler = new AINodeHeartbeatHandler(aiNodeInfo.getLocation().getAiNodeId(), this.configManager.getLoadManager());
            AsyncAINodeHeartbeatClientPool.getInstance().getAINodeHeartBeat(aiNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
        }
    }

    private ConsensusManager getConsensusManager() {
        return this.configManager.getConsensusManager();
    }

    private NodeManager getNodeManager() {
        return this.configManager.getNodeManager();
    }
}

