/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.clustercontroller.core;

import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.Communicator;
import com.yahoo.vespa.clustercontroller.core.FleetController;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.SetClusterStateRequest;
import com.yahoo.vespa.clustercontroller.core.Timer;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class SystemStateBroadcaster {
    public static Logger log = Logger.getLogger(SystemStateBroadcaster.class.getName());
    private final Timer timer;
    private final Object monitor;
    private ClusterState systemState;
    private final List<SetClusterStateRequest> replies = new LinkedList<SetClusterStateRequest>();
    private static final long minTimeBetweenNodeErrorLogging = 600000L;
    private final Map<Node, Long> lastErrorReported = new TreeMap<Node, Long>();
    private int lastClusterStateInSync = 0;
    private final ClusterStateWaiter waiter = new ClusterStateWaiter();

    public SystemStateBroadcaster(Timer timer, Object monitor) {
        this.timer = timer;
        this.monitor = monitor;
    }

    public void handleNewSystemState(ClusterState state) {
        this.systemState = state;
    }

    public ClusterState getClusterState() {
        return this.systemState;
    }

    private void reportNodeError(boolean nodeOk, NodeInfo info, String message) {
        long time = this.timer.getCurrentTimeInMillis();
        Long lastReported = this.lastErrorReported.get(info.getNode());
        boolean alreadySeen = lastReported != null && time - lastReported < 600000L;
        log.log(nodeOk && !alreadySeen ? LogLevel.WARNING : LogLevel.DEBUG, message);
        if (!alreadySeen) {
            this.lastErrorReported.put(info.getNode(), time);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean processResponses() {
        boolean anyResponsesFound = false;
        Object object = this.monitor;
        synchronized (object) {
            for (SetClusterStateRequest req : this.replies) {
                anyResponsesFound = true;
                NodeInfo info = req.getNodeInfo();
                boolean nodeOk = info.getReportedState().getState().oneOf("uir");
                int version = req.getSystemStateVersion();
                if (req.getReply().isError()) {
                    if (req.getReply().getReturnCode() == 9999) continue;
                    info.setSystemStateVersionAcknowledged(version, false);
                    if (info.getNewestSystemStateVersionSent() != version) continue;
                    this.reportNodeError(nodeOk, info, "Got error response " + req.getReply().getReturnCode() + ": " + req.getReply().getReturnMessage() + " from " + info + " setsystemstate request.");
                    continue;
                }
                info.setSystemStateVersionAcknowledged(version, true);
                log.log((Level)LogLevel.DEBUG, "Node " + info + " acked system state version " + version + ".");
                this.lastErrorReported.remove(info.getNode());
            }
            this.replies.clear();
        }
        return anyResponsesFound;
    }

    private boolean nodeNeedsClusterState(NodeInfo node) {
        if (node.getSystemStateVersionAcknowledged() == this.systemState.getVersion()) {
            return false;
        }
        if (node.getRpcAddress() == null || node.isRpcAddressOutdated()) {
            return false;
        }
        return node.getReportedState().getState() != State.MAINTENANCE && node.getReportedState().getState() != State.DOWN && node.getReportedState().getState() != State.STOPPING;
    }

    private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) {
        return dbContext.getCluster().getNodeInfo().stream().filter(this::nodeNeedsClusterState).filter(node -> !this.newestStateAlreadySentToNode((NodeInfo)node)).collect(Collectors.toList());
    }

    private boolean newestStateAlreadySentToNode(NodeInfo node) {
        return node.getNewestSystemStateVersionSent() == this.systemState.getVersion();
    }

    void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database, DatabaseHandler.Context dbContext, FleetController fleetController) throws InterruptedException {
        if (this.systemState == null || this.lastClusterStateInSync == this.systemState.getVersion()) {
            return;
        }
        boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream().filter(NodeInfo::isDistributor).anyMatch(this::nodeNeedsClusterState);
        if (!anyOutdatedDistributorNodes && this.systemState.getVersion() > this.lastClusterStateInSync) {
            log.log((Level)LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state");
            this.lastClusterStateInSync = this.systemState.getVersion();
            fleetController.handleAllDistributorsInSync(database, dbContext);
        }
    }

    public boolean broadcastNewState(DatabaseHandler database, DatabaseHandler.Context dbContext, Communicator communicator, FleetController fleetController) throws InterruptedException {
        if (this.systemState == null) {
            return false;
        }
        if (!this.systemState.isOfficial()) {
            log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", this.systemState.getVersion()));
            this.systemState.setOfficial(true);
        }
        List<NodeInfo> recipients = this.resolveStateVersionSendSet(dbContext);
        for (NodeInfo node : recipients) {
            if (this.nodeNeedsToObserveStartupTimestamps(node)) {
                ClusterState newState = this.buildModifiedClusterState(dbContext);
                log.log((Level)LogLevel.DEBUG, "Sending modified system state version " + this.systemState.getVersion() + " to node " + node + ": " + newState);
                communicator.setSystemState(newState, node, this.waiter);
                continue;
            }
            log.log((Level)LogLevel.DEBUG, "Sending system state version " + this.systemState.getVersion() + " to node " + node + ". (went down time " + node.getWentDownWithStartTime() + ", node start time " + node.getStartTimestamp() + ")");
            communicator.setSystemState(this.systemState, node, this.waiter);
        }
        return !recipients.isEmpty();
    }

    public int lastClusterStateVersionInSync() {
        return this.lastClusterStateInSync;
    }

    private boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
        return node.getStartTimestamp() != 0L && node.getWentDownWithStartTime() == node.getStartTimestamp();
    }

    private ClusterState buildModifiedClusterState(DatabaseHandler.Context dbContext) {
        ClusterState newState = this.systemState.clone();
        for (NodeInfo n : dbContext.getCluster().getNodeInfo()) {
            NodeState ns = newState.getNodeState(n.getNode());
            if (n.isDistributor() || ns.getStartTimestamp() != 0L) continue;
            ns.setStartTimestamp(n.getStartTimestamp());
            newState.setNodeState(n.getNode(), ns);
        }
        return newState;
    }

    private class ClusterStateWaiter
    implements Communicator.Waiter<SetClusterStateRequest> {
        private ClusterStateWaiter() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void done(SetClusterStateRequest reply) {
            Object object = SystemStateBroadcaster.this.monitor;
            synchronized (object) {
                SystemStateBroadcaster.this.replies.add(reply);
            }
        }
    }
}

