/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.coordination.heartbeat;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHeartbeatMonitor
implements HeartbeatMonitor {
    private final int heartbeatIntervalMillis;
    private final int missableHeartbeatCount;
    private static final Logger logger = LoggerFactory.getLogger(AbstractHeartbeatMonitor.class);
    protected final ClusterCoordinator clusterCoordinator;
    protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true);
    private volatile ScheduledFuture<?> future;
    private volatile boolean stopped = true;

    public AbstractHeartbeatMonitor(ClusterCoordinator clusterCoordinator, NiFiProperties nifiProperties) {
        this.clusterCoordinator = clusterCoordinator;
        String heartbeatInterval = nifiProperties.getProperty("nifi.cluster.protocol.heartbeat.interval", "5 sec");
        this.heartbeatIntervalMillis = (int)FormatUtils.getTimeDuration((String)heartbeatInterval, (TimeUnit)TimeUnit.MILLISECONDS);
        this.missableHeartbeatCount = nifiProperties.getIntegerProperty("nifi.cluster.protocol.heartbeat.missable.max", Integer.valueOf(8));
        clusterCoordinator.registerEventListener((ClusterTopologyEventListener)new ClusterChangeEventListener());
    }

    public final synchronized void start() {
        if (!this.stopped) {
            logger.info("Attempted to start Heartbeat Monitor but it is already started. Stopping heartbeat monitor and re-starting it.");
            this.stop();
        }
        this.stopped = false;
        logger.info("Heartbeat Monitor started");
        try {
            this.onStart();
        }
        catch (Exception e) {
            logger.error("Failed to start Heartbeat Monitor", (Throwable)e);
        }
        this.future = this.flowEngine.scheduleWithFixedDelay(() -> {
            try {
                this.monitorHeartbeats();
            }
            catch (Exception e) {
                this.clusterCoordinator.reportEvent(null, Severity.ERROR, "Failed to process heartbeats from nodes due to " + String.valueOf(e));
                logger.error("Failed to process heartbeats", (Throwable)e);
            }
        }, (long)this.heartbeatIntervalMillis, (long)this.heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
    }

    public final synchronized void stop() {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        logger.info("Heartbeat Monitor stopped");
        try {
            if (this.future != null) {
                this.future.cancel(true);
            }
        }
        finally {
            this.onStop();
        }
    }

    protected boolean isStopped() {
        return this.stopped;
    }

    public NodeHeartbeat getLatestHeartbeat(NodeIdentifier nodeId) {
        return this.getLatestHeartbeats().get(nodeId);
    }

    protected ClusterCoordinator getClusterCoordinator() {
        return this.clusterCoordinator;
    }

    protected synchronized void monitorHeartbeats() {
        if (!this.clusterCoordinator.isActiveClusterCoordinator()) {
            this.purgeHeartbeats();
            logger.debug("It appears that this node is no longer the actively elected cluster coordinator. Will not request that node disconnect.");
            return;
        }
        Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = this.getLatestHeartbeats();
        if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
            logger.debug("Received no new heartbeats. Will not disconnect any nodes due to lack of heartbeat");
            return;
        }
        StopWatch procStopWatch = new StopWatch(true);
        for (NodeHeartbeat heartbeat : latestHeartbeats.values()) {
            try {
                this.processHeartbeat(heartbeat);
            }
            catch (Exception e) {
                this.clusterCoordinator.reportEvent(null, Severity.ERROR, "Received heartbeat from " + String.valueOf(heartbeat.getNodeIdentifier()) + " but failed to process heartbeat due to " + String.valueOf(e));
                logger.error("Failed to process heartbeat from {}", (Object)heartbeat.getNodeIdentifier(), (Object)e);
            }
        }
        procStopWatch.stop();
        logger.info("Finished processing {} heartbeats in {}", (Object)latestHeartbeats.size(), (Object)procStopWatch.getDuration());
        long maxMillis = this.heartbeatIntervalMillis * this.missableHeartbeatCount;
        long currentTimestamp = System.currentTimeMillis();
        long threshold = currentTimestamp - maxMillis;
        for (NodeIdentifier nodeIdentifier : this.clusterCoordinator.getNodeIdentifiers(new NodeConnectionState[]{NodeConnectionState.CONNECTED})) {
            NodeHeartbeat heartbeat = latestHeartbeats.get(nodeIdentifier);
            if (heartbeat == null) {
                long purgeTimestamp = this.getPurgeTimestamp();
                if (purgeTimestamp >= threshold) continue;
                long secondsSinceLastPurge = TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - purgeTimestamp);
                this.clusterCoordinator.disconnectionRequestedByNode(nodeIdentifier, DisconnectionCode.LACK_OF_HEARTBEAT, "Have not received a heartbeat from node in " + secondsSinceLastPurge + " seconds");
                continue;
            }
            if (heartbeat.getTimestamp() >= threshold) continue;
            long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - heartbeat.getTimestamp());
            this.clusterCoordinator.disconnectionRequestedByNode(nodeIdentifier, DisconnectionCode.LACK_OF_HEARTBEAT, "Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds");
            try {
                this.removeHeartbeat(nodeIdentifier);
            }
            catch (Exception e) {
                logger.warn("Failed to remove heartbeat for {}", (Object)nodeIdentifier, (Object)e);
            }
        }
    }

    private void processHeartbeat(NodeHeartbeat heartbeat) {
        NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
        if (this.clusterCoordinator.isBlockedByFirewall(Collections.singleton(nodeId.getSocketAddress()))) {
            this.clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Firewall blocked received heartbeat. Issuing disconnection request.");
            this.clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.BLOCKED_BY_FIREWALL, "Blocked by Firewall");
            this.removeHeartbeat(nodeId);
            return;
        }
        NodeConnectionStatus connectionStatus = this.clusterCoordinator.getConnectionStatus(nodeId);
        if (connectionStatus == null) {
            this.clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from unknown node " + nodeId.getFullDescription() + ". Removing heartbeat and requesting that node connect to cluster.");
            this.removeHeartbeat(nodeId);
            this.clusterCoordinator.requestNodeConnect(nodeId, null);
            return;
        }
        NodeConnectionState connectionState = connectionStatus.getState();
        if (heartbeat.getConnectionStatus().getState() != NodeConnectionState.CONNECTED && connectionState == NodeConnectionState.CONNECTED) {
            this.clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Received heartbeat from node that thinks it is not yet part of the cluster,though the Cluster Coordinator thought it was (node claimed state was " + String.valueOf(heartbeat.getConnectionStatus().getState()) + "). Marking as Disconnected and requesting that Node reconnect to cluster");
            this.clusterCoordinator.requestNodeConnect(nodeId, null);
            return;
        }
        if (NodeConnectionState.OFFLOADED == connectionState || NodeConnectionState.OFFLOADING == connectionState) {
            this.clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node that is offloading or offloaded. Removing this heartbeat.  Offloaded nodes will only be reconnected to the cluster by an explicit connection request or restarting the node.");
            this.removeHeartbeat(nodeId);
        }
        if (NodeConnectionState.DISCONNECTED == connectionState) {
            DisconnectionCode disconnectionCode = connectionStatus.getDisconnectCode();
            switch (disconnectionCode) {
                case LACK_OF_HEARTBEAT: 
                case UNABLE_TO_COMMUNICATE: 
                case NOT_YET_CONNECTED: 
                case MISMATCHED_FLOWS: 
                case MISSING_BUNDLE: 
                case NODE_SHUTDOWN: 
                case FAILED_TO_SERVICE_REQUEST: 
                case STARTUP_FAILURE: {
                    this.clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously disconnected due to " + String.valueOf(disconnectionCode) + ". Issuing reconnection request.");
                    this.clusterCoordinator.requestNodeConnect(nodeId, null);
                    break;
                }
                default: {
                    logger.info("Ignoring received heartbeat from disconnected node {}. Node was disconnected due to [{}]. Issuing disconnection request.", (Object)nodeId, (Object)disconnectionCode);
                    this.clusterCoordinator.requestNodeDisconnect(nodeId, disconnectionCode, connectionStatus.getReason());
                    this.removeHeartbeat(nodeId);
                }
            }
            return;
        }
        if (NodeConnectionState.DISCONNECTING == connectionStatus.getState()) {
            this.removeHeartbeat(nodeId);
            return;
        }
        if (NodeConnectionState.CONNECTING == connectionState) {
            Long connectionRequestTime = connectionStatus.getConnectionRequestTime();
            if (connectionRequestTime != null && heartbeat.getTimestamp() < connectionRequestTime) {
                this.clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat but ignoring because it was reported before the node was last asked to reconnect.");
                this.removeHeartbeat(nodeId);
                return;
            }
            if (!this.clusterCoordinator.isApiReachable(nodeId)) {
                logger.info("Node API Address [{}] not reachable: cluster connection request deferred pending successful network connection", (Object)nodeId);
                return;
            }
            this.clusterCoordinator.finishNodeConnection(nodeId);
            this.clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");
        }
        this.clusterCoordinator.validateHeartbeat(heartbeat);
    }

    protected abstract Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats();

    protected abstract long getPurgeTimestamp();

    protected void onStart() {
    }

    protected void onStop() {
    }

    private class ClusterChangeEventListener
    implements ClusterTopologyEventListener {
        private ClusterChangeEventListener() {
        }

        public void onNodeAdded(NodeIdentifier nodeId) {
        }

        public void onNodeRemoved(NodeIdentifier nodeId) {
            AbstractHeartbeatMonitor.this.removeHeartbeat(nodeId);
        }

        public void onLocalNodeIdentifierSet(NodeIdentifier localNodeId) {
        }

        public void onNodeStateChange(NodeIdentifier nodeId, NodeConnectionState newState) {
        }
    }
}

