/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.clustercontroller.core;

import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest;
import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.ClusterStateVersionSpecificRequest;
import com.yahoo.vespa.clustercontroller.core.Communicator;
import com.yahoo.vespa.clustercontroller.core.FleetController;
import com.yahoo.vespa.clustercontroller.core.FleetControllerContext;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.SetClusterStateRequest;
import com.yahoo.vespa.clustercontroller.core.Timer;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class SystemStateBroadcaster {
    private static Logger log = Logger.getLogger(SystemStateBroadcaster.class.getName());
    private final FleetControllerContext context;
    private final Timer timer;
    private final Object monitor;
    private ClusterStateBundle clusterStateBundle;
    private final List<SetClusterStateRequest> setClusterStateReplies = new LinkedList<SetClusterStateRequest>();
    private final List<ActivateClusterStateVersionRequest> activateClusterStateVersionReplies = new LinkedList<ActivateClusterStateVersionRequest>();
    private static final long minTimeBetweenNodeErrorLogging = 600000L;
    private final Map<Node, Long> lastErrorReported = new TreeMap<Node, Long>();
    private int lastOfficialStateVersion = -1;
    private int lastStateVersionBundleAcked = 0;
    private int lastClusterStateVersionConverged = 0;
    private ClusterStateBundle lastClusterStateBundleConverged;
    private final SetClusterStateWaiter setClusterStateWaiter = new SetClusterStateWaiter();
    private final ActivateClusterStateVersionWaiter activateClusterStateVersionWaiter = new ActivateClusterStateVersionWaiter();

    public SystemStateBroadcaster(FleetControllerContext context, Timer timer, Object monitor) {
        this.context = context;
        this.timer = timer;
        this.monitor = monitor;
    }

    public void handleNewClusterStates(ClusterStateBundle state) {
        this.clusterStateBundle = state;
    }

    public ClusterState getClusterState() {
        return this.clusterStateBundle.getBaselineClusterState();
    }

    public boolean hasBroadcastedClusterStateBundle() {
        return this.clusterStateBundle != null;
    }

    public void resetBroadcastedClusterStateBundle() {
        this.clusterStateBundle = null;
    }

    public ClusterStateBundle getClusterStateBundle() {
        return this.clusterStateBundle;
    }

    public ClusterStateBundle getLastClusterStateBundleConverged() {
        return this.lastClusterStateBundleConverged;
    }

    private void reportNodeError(boolean nodeOk, NodeInfo info, String message) {
        long time = this.timer.getCurrentTimeInMillis();
        Long lastReported = this.lastErrorReported.get(info.getNode());
        boolean alreadySeen = lastReported != null && time - lastReported < 600000L;
        this.context.log(log, nodeOk && !alreadySeen ? Level.WARNING : Level.FINE, message);
        if (!alreadySeen) {
            this.lastErrorReported.put(info.getNode(), time);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean processResponses() {
        boolean anyResponsesFound = false;
        Object object = this.monitor;
        synchronized (object) {
            anyResponsesFound = !this.setClusterStateReplies.isEmpty() || !this.activateClusterStateVersionReplies.isEmpty();
            this.processSetClusterStateResponses();
            this.processActivateClusterStateVersionResponses();
        }
        return anyResponsesFound;
    }

    private void processActivateClusterStateVersionResponses() {
        for (ActivateClusterStateVersionRequest req : this.activateClusterStateVersionReplies) {
            NodeInfo info = req.getNodeInfo();
            int version = req.getClusterStateVersion();
            boolean success = true;
            ClusterStateVersionSpecificRequest.Reply reply = req.getReply();
            if (reply.isError()) {
                if (reply.getReturnCode() != 106) {
                    this.context.log(log, Level.FINE, () -> String.format("Activation NACK for node %s with version %d, message %s", info, version, reply.getReturnMessage()));
                    success = false;
                } else {
                    this.context.log(log, Level.FINE, () -> String.format("Node %s did not understand state activation RPC; implicitly treating state %d as activated on node", info, version));
                }
            } else if (reply.getActualVersion() != version) {
                boolean nodeOk = SystemStateBroadcaster.nodeReportsSelfAsAvailable(info);
                this.reportNodeError(nodeOk, info, String.format("Activation of version %d did not take effect, node %s reports it has an actual pending version of %d. Racing with another controller?", version, info, reply.getActualVersion()));
                success = false;
            } else {
                this.context.log(log, Level.FINE, () -> String.format("Node %s reports successful activation of state version %d", info, version));
            }
            info.setSystemStateVersionActivationAcked(version, success);
        }
        this.activateClusterStateVersionReplies.clear();
    }

    private static boolean nodeReportsSelfAsAvailable(NodeInfo info) {
        return info.getReportedState().getState().oneOf("uir");
    }

    private void processSetClusterStateResponses() {
        for (SetClusterStateRequest req : this.setClusterStateReplies) {
            NodeInfo info = req.getNodeInfo();
            int version = req.getClusterStateVersion();
            if (req.getReply().isError()) {
                info.setClusterStateBundleVersionAcknowledged(version, false);
                if (req.getReply().getReturnCode() == 9999 || info.getNewestSystemStateVersionSent() != version) continue;
                boolean nodeOk = SystemStateBroadcaster.nodeReportsSelfAsAvailable(info);
                this.reportNodeError(nodeOk, info, String.format("Got error response %d: %s from %s setdistributionstates request.", req.getReply().getReturnCode(), req.getReply().getReturnMessage(), info));
                continue;
            }
            info.setClusterStateBundleVersionAcknowledged(version, true);
            this.context.log(log, Level.FINE, () -> String.format("Node %s ACKed system state version %d.", info, version));
            this.lastErrorReported.remove(info.getNode());
        }
        this.setClusterStateReplies.clear();
    }

    private static boolean nodeIsReachable(NodeInfo node) {
        if (node.getRpcAddress() == null || node.isNotInSlobrok()) {
            return false;
        }
        return node.getReportedState().getState() != State.MAINTENANCE && node.getReportedState().getState() != State.DOWN && node.getReportedState().getState() != State.STOPPING;
    }

    private boolean nodeNeedsClusterStateBundle(NodeInfo node) {
        if (node.getClusterStateVersionBundleAcknowledged() == this.clusterStateBundle.getVersion()) {
            return false;
        }
        return SystemStateBroadcaster.nodeIsReachable(node);
    }

    private boolean nodeNeedsClusterStateActivation(NodeInfo node) {
        if (node.getClusterStateVersionActivationAcked() == this.clusterStateBundle.getVersion()) {
            return false;
        }
        return SystemStateBroadcaster.nodeIsReachable(node);
    }

    private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.DatabaseContext dbContext) {
        return dbContext.getCluster().getNodeInfos().stream().filter(this::nodeNeedsClusterStateBundle).filter(node -> !this.newestStateBundleAlreadySentToNode((NodeInfo)node)).collect(Collectors.toList());
    }

    private List<NodeInfo> resolveStateActivationSendSet(DatabaseHandler.DatabaseContext dbContext) {
        return dbContext.getCluster().getNodeInfos().stream().filter(this::nodeNeedsClusterStateActivation).filter(node -> !this.newestStateActivationAlreadySentToNode((NodeInfo)node)).collect(Collectors.toList());
    }

    private boolean newestStateBundleAlreadySentToNode(NodeInfo node) {
        return node.getNewestSystemStateVersionSent() == this.clusterStateBundle.getVersion();
    }

    private boolean newestStateActivationAlreadySentToNode(NodeInfo node) {
        return node.getClusterStateVersionActivationSent() == this.clusterStateBundle.getVersion();
    }

    void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database, DatabaseHandler.DatabaseContext dbContext, FleetController fleetController) throws InterruptedException {
        if (this.clusterStateBundle == null || this.currentClusterStateIsConverged()) {
            return;
        }
        int currentStateVersion = this.clusterStateBundle.getVersion();
        boolean anyDistributorsNeedStateBundle = dbContext.getCluster().getNodeInfos().stream().filter(NodeInfo::isDistributor).anyMatch(this::nodeNeedsClusterStateBundle);
        if (!anyDistributorsNeedStateBundle && currentStateVersion > this.lastStateVersionBundleAcked) {
            this.markCurrentClusterStateBundleAsReceivedByAllDistributors();
            if (this.clusterStateBundle.deferredActivation()) {
                this.context.log(log, Level.FINE, () -> String.format("All distributors have ACKed cluster state version %d, sending activation", currentStateVersion));
            } else {
                this.markCurrentClusterStateAsConverged(database, dbContext, fleetController);
            }
            return;
        }
        if (anyDistributorsNeedStateBundle || !this.clusterStateBundle.deferredActivation()) {
            return;
        }
        boolean anyDistributorsNeedActivation = dbContext.getCluster().getNodeInfos().stream().filter(NodeInfo::isDistributor).anyMatch(this::nodeNeedsClusterStateActivation);
        if (!anyDistributorsNeedActivation && currentStateVersion > this.lastClusterStateVersionConverged) {
            this.markCurrentClusterStateAsConverged(database, dbContext, fleetController);
        } else {
            this.context.log(log, Level.FINE, () -> String.format("distributors still need activation in state %d (last converged: %d)", currentStateVersion, this.lastClusterStateVersionConverged));
        }
    }

    private void markCurrentClusterStateBundleAsReceivedByAllDistributors() {
        this.lastStateVersionBundleAcked = this.clusterStateBundle.getVersion();
    }

    private void markCurrentClusterStateAsConverged(DatabaseHandler database, DatabaseHandler.DatabaseContext dbContext, FleetController fleetController) throws InterruptedException {
        this.context.log(log, Level.FINE, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state");
        this.lastClusterStateVersionConverged = this.clusterStateBundle.getVersion();
        this.lastClusterStateBundleConverged = this.clusterStateBundle;
        fleetController.handleAllDistributorsInSync(database, dbContext);
    }

    private boolean currentClusterStateIsConverged() {
        return this.lastClusterStateVersionConverged == this.clusterStateBundle.getVersion();
    }

    private boolean currentBundleVersionIsTaggedOfficial() {
        return this.clusterStateBundle.getVersion() == this.lastOfficialStateVersion;
    }

    private void tagCurrentBundleVersionAsOfficial() {
        this.lastOfficialStateVersion = this.clusterStateBundle.getVersion();
    }

    public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.DatabaseContext dbContext, Communicator communicator, int lastClusterStateVersionWrittenToZooKeeper) {
        if (this.clusterStateBundle == null || this.clusterStateBundle.getVersion() == 0) {
            return false;
        }
        if (this.clusterStateBundle.getVersion() != lastClusterStateVersionWrittenToZooKeeper) {
            return false;
        }
        ClusterState baselineState = this.clusterStateBundle.getBaselineClusterState();
        if (!this.currentBundleVersionIsTaggedOfficial()) {
            this.context.log(log, Level.INFO, "Publishing cluster state version " + baselineState.getVersion());
            this.tagCurrentBundleVersionAsOfficial();
        }
        List<NodeInfo> recipients = this.resolveStateVersionSendSet(dbContext);
        for (NodeInfo node : recipients) {
            if (SystemStateBroadcaster.nodeNeedsToObserveStartupTimestamps(node)) {
                ClusterStateBundle modifiedBundle = this.clusterStateBundle.cloneWithMapper(state -> SystemStateBroadcaster.buildModifiedClusterState(state, dbContext));
                this.context.log(log, Level.FINE, () -> "Sending modified cluster state version " + baselineState.getVersion() + " to node " + node + ": " + modifiedBundle);
                communicator.setSystemState(modifiedBundle, node, this.setClusterStateWaiter);
                continue;
            }
            this.context.log(log, Level.FINE, () -> "Sending system state version " + baselineState.getVersion() + " to node " + node + ". (went down time " + node.getWentDownWithStartTime() + ", node start time " + node.getStartTimestamp() + ")");
            communicator.setSystemState(this.clusterStateBundle, node, this.setClusterStateWaiter);
        }
        return !recipients.isEmpty();
    }

    public boolean broadcastStateActivationsIfRequired(DatabaseHandler.DatabaseContext dbContext, Communicator communicator) {
        if (this.clusterStateBundle == null || this.clusterStateBundle.getVersion() == 0 || !this.currentBundleVersionIsTaggedOfficial()) {
            return false;
        }
        if (!this.clusterStateBundle.deferredActivation() || !this.allDistributorsHaveAckedSentClusterStateBundle()) {
            return false;
        }
        List<NodeInfo> recipients = this.resolveStateActivationSendSet(dbContext);
        for (NodeInfo node : recipients) {
            this.context.log(log, Level.FINE, () -> "Sending cluster state activation to node " + node + " for version " + this.clusterStateBundle.getVersion());
            communicator.activateClusterStateVersion(this.clusterStateBundle.getVersion(), node, this.activateClusterStateVersionWaiter);
        }
        return !recipients.isEmpty();
    }

    private boolean allDistributorsHaveAckedSentClusterStateBundle() {
        return this.lastStateVersionBundleAcked == this.clusterStateBundle.getVersion();
    }

    public int lastClusterStateVersionInSync() {
        return this.lastClusterStateVersionConverged;
    }

    private static boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
        return node.getStartTimestamp() != 0L && node.getWentDownWithStartTime() == node.getStartTimestamp();
    }

    private static ClusterState buildModifiedClusterState(ClusterState sourceState, DatabaseHandler.DatabaseContext dbContext) {
        ClusterState newState = sourceState.clone();
        for (NodeInfo n : dbContext.getCluster().getNodeInfos()) {
            NodeState ns = newState.getNodeState(n.getNode());
            if (n.isDistributor() || ns.getStartTimestamp() != 0L) continue;
            ns.setStartTimestamp(n.getStartTimestamp());
            newState.setNodeState(n.getNode(), ns);
        }
        return newState;
    }

    private class SetClusterStateWaiter
    implements Communicator.Waiter<SetClusterStateRequest> {
        private SetClusterStateWaiter() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void done(SetClusterStateRequest reply) {
            Object object = SystemStateBroadcaster.this.monitor;
            synchronized (object) {
                SystemStateBroadcaster.this.setClusterStateReplies.add(reply);
            }
        }
    }

    private class ActivateClusterStateVersionWaiter
    implements Communicator.Waiter<ActivateClusterStateVersionRequest> {
        private ActivateClusterStateVersionWaiter() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void done(ActivateClusterStateVersionRequest reply) {
            Object object = SystemStateBroadcaster.this.monitor;
            synchronized (object) {
                SystemStateBroadcaster.this.activateClusterStateVersionReplies.add(reply);
            }
        }
    }
}

