/*
 * Decompiled with CFR 0.152.
 */
package com.vmware.xenon.services.common;

import com.vmware.xenon.common.FNVHash;
import com.vmware.xenon.common.NodeSelectorService;
import com.vmware.xenon.common.NodeSelectorState;
import com.vmware.xenon.common.Operation;
import com.vmware.xenon.common.Service;
import com.vmware.xenon.common.ServiceConfigUpdateRequest;
import com.vmware.xenon.common.ServiceConfiguration;
import com.vmware.xenon.common.ServiceDocument;
import com.vmware.xenon.common.ServiceErrorResponse;
import com.vmware.xenon.common.StatelessService;
import com.vmware.xenon.common.UriUtils;
import com.vmware.xenon.common.Utils;
import com.vmware.xenon.services.common.NodeGroupBroadcastResponse;
import com.vmware.xenon.services.common.NodeGroupService;
import com.vmware.xenon.services.common.NodeGroupUtils;
import com.vmware.xenon.services.common.NodeSelectorForwardingService;
import com.vmware.xenon.services.common.NodeSelectorReplicationService;
import com.vmware.xenon.services.common.NodeSelectorSynchronizationService;
import com.vmware.xenon.services.common.NodeState;
import java.net.URI;
import java.util.Collection;
import java.util.TreeMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class ConsistentHashingNodeSelectorService
extends StatelessService
implements NodeSelectorService {
    private long operationQueueLimit = 10000L;
    private AtomicLong pendingOperationCount = new AtomicLong();
    private ConcurrentLinkedQueue<NodeSelectorService.SelectAndForwardRequest> pendingRequestQueue = new ConcurrentLinkedQueue();
    private NodeGroupService.NodeGroupState cachedGroupState;
    private NodeSelectorState cachedState;
    private NodeSelectorReplicationService replicationUtility;
    private volatile boolean isSynchronizationRequired;
    private boolean isNodeGroupConverged;
    private int synchQuorumWarningCount;

    public ConsistentHashingNodeSelectorService() {
        super(NodeSelectorState.class);
        super.toggleOption(Service.ServiceOption.CORE, true);
        super.toggleOption(Service.ServiceOption.PERIODIC_MAINTENANCE, true);
        super.toggleOption(Service.ServiceOption.INSTRUMENTATION, true);
    }

    @Override
    public void handleStart(Operation start) {
        NodeSelectorState state = null;
        if (!start.hasBody()) {
            state = new NodeSelectorState();
            state.nodeGroupLink = "/core/node-groups/default";
        } else {
            state = start.getBody(NodeSelectorState.class);
        }
        this.getHost().getClient().setConnectionLimitPerTag("xn-cnx-tag-replication", NodeSelectorService.REPLICATION_TAG_CONNECTION_LIMIT);
        this.getHost().getClient().setConnectionLimitPerTag("xn-cnx-tag-synch", NodeSelectorService.SYNCHRONIZATION_TAG_CONNECTION_LIMIT);
        this.getHost().getClient().setConnectionLimitPerTag("xn-cnx-tag-p2p-fwd", FORWARDING_TAG_CONNECTION_LIMIT);
        state.documentSelfLink = this.getSelfLink();
        state.documentKind = Utils.buildKind(NodeSelectorState.class);
        state.documentOwner = this.getHost().getId();
        this.cachedState = state;
        this.replicationUtility = new NodeSelectorReplicationService(this);
        this.startHelperServices(start);
    }

    private void startHelperServices(Operation op) {
        this.allocateUtilityService();
        AtomicInteger remaining = new AtomicInteger(4);
        Operation.CompletionHandler h = (o, e) -> {
            if (e != null) {
                op.fail(e);
                return;
            }
            if (remaining.decrementAndGet() != 0) {
                return;
            }
            op.complete();
        };
        Operation subscribeToNodeGroup = Operation.createPost(UriUtils.buildSubscriptionUri(this.getHost(), this.cachedState.nodeGroupLink)).setCompletion(h).setReferer(this.getUri());
        this.getHost().startSubscriptionService(subscribeToNodeGroup, this.handleNodeGroupNotification());
        this.sendRequest(Operation.createGet(this, this.cachedState.nodeGroupLink).setCompletion((o, e) -> {
            if (e == null) {
                NodeGroupService.NodeGroupState ngs = o.getBody(NodeGroupService.NodeGroupState.class);
                this.updateCachedNodeGroupState(ngs, null);
            } else if (!this.getHost().isStopping()) {
                this.logSevere(e);
            }
            h.handle(o, e);
        }));
        Operation startSynchPost = Operation.createPost(UriUtils.extendUri(this.getUri(), "synch")).setCompletion(h);
        Operation startForwardingPost = Operation.createPost(UriUtils.extendUri(this.getUri(), "forwarding")).setCompletion(h);
        this.getHost().startService(startSynchPost, new NodeSelectorSynchronizationService(this));
        this.getHost().startService(startForwardingPost, new NodeSelectorForwardingService(this));
    }

    private Consumer<Operation> handleNodeGroupNotification() {
        return notifyOp -> {
            notifyOp.complete();
            NodeGroupService.NodeGroupState ngs = null;
            if (notifyOp.getAction() == Service.Action.PATCH) {
                NodeGroupService.UpdateQuorumRequest bd = notifyOp.getBody(NodeGroupService.UpdateQuorumRequest.class);
                if (NodeGroupService.UpdateQuorumRequest.KIND.equals(bd.kind)) {
                    this.updateCachedNodeGroupState(null, bd);
                    return;
                }
            } else if (notifyOp.getAction() != Service.Action.POST) {
                return;
            }
            ngs = notifyOp.getBody(NodeGroupService.NodeGroupState.class);
            if (ngs.nodes == null || ngs.nodes.isEmpty()) {
                return;
            }
            this.updateCachedNodeGroupState(ngs, null);
        };
    }

    @Override
    public void authorizeRequest(Operation op) {
        if (op.getAction() != Service.Action.POST && op.getAction() != Service.Action.GET) {
            super.authorizeRequest(op);
            return;
        }
        op.complete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleRequest(Operation op) {
        if (op.getAction() == Service.Action.GET) {
            if (!NodeSelectorState.isAvailable(this.cachedState)) {
                NodeSelectorState nodeSelectorState = this.cachedState;
                synchronized (nodeSelectorState) {
                    NodeSelectorState.updateStatus(this.getHost(), this.cachedGroupState, this.cachedState);
                }
            }
            op.setBody(this.cachedState).complete();
            return;
        }
        if (op.getAction() == Service.Action.DELETE) {
            super.handleRequest(op);
            return;
        }
        if (op.getAction() == Service.Action.PATCH) {
            super.handleRequest(op);
            return;
        }
        if (op.getAction() != Service.Action.POST) {
            Operation.failActionNotSupported(op);
            return;
        }
        if (!op.hasBody()) {
            op.fail(new IllegalArgumentException("Body is required"));
            return;
        }
        NodeSelectorService.SelectAndForwardRequest body = op.getBody(NodeSelectorService.SelectAndForwardRequest.class);
        if (body.key == null && body.targetPath == null) {
            op.fail(new IllegalArgumentException("key or targetPath is required"));
            return;
        }
        this.selectAndForward(op, body);
    }

    @Override
    public void handlePatch(Operation patch) {
        if (!patch.hasBody()) {
            patch.fail(new IllegalArgumentException("Body is required"));
            return;
        }
        ServiceDocument s = patch.getBody(ServiceDocument.class);
        if (s.documentKind == null) {
            patch.fail(new IllegalArgumentException("Kind is required"));
            return;
        }
        if (NodeSelectorService.UpdateReplicationQuorumRequest.KIND.equals(s.documentKind)) {
            this.updateReplicationQuorum(patch, patch.getBody(NodeSelectorService.UpdateReplicationQuorumRequest.class));
            return;
        }
        patch.fail(new IllegalArgumentException("Unexpected request kind " + s.documentKind));
    }

    @Override
    public void handleConfigurationRequest(Operation op) {
        if (op.getAction() == Service.Action.PATCH && op.hasBody()) {
            ServiceConfigUpdateRequest body = op.getBody(ServiceConfigUpdateRequest.class);
            if (body.operationQueueLimit != null) {
                this.operationQueueLimit = body.operationQueueLimit.intValue();
            }
        }
        if (op.getAction() == Service.Action.GET) {
            ServiceConfiguration cfg = Utils.buildServiceConfig(new ServiceConfiguration(), this);
            cfg.epoch = 0L;
            cfg.operationQueueLimit = (int)this.operationQueueLimit;
            op.setBodyNoCloning(cfg).complete();
            return;
        }
        super.handleConfigurationRequest(op);
    }

    @Override
    public String getNodeGroupPath() {
        return this.cachedState.nodeGroupLink;
    }

    @Override
    public void selectAndForward(Operation op, NodeSelectorService.SelectAndForwardRequest body) {
        this.selectAndForward(body, op, this.cachedGroupState);
    }

    @Override
    public NodeSelectorService.SelectOwnerResponse findOwnerNode(String path) {
        return this.selectNodes(path, this.cachedGroupState);
    }

    private void selectAndForward(NodeSelectorService.SelectAndForwardRequest forwardRequest, Operation op, NodeGroupService.NodeGroupState localState) {
        String keyValue = forwardRequest.key != null ? forwardRequest.key : forwardRequest.targetPath;
        forwardRequest.associatedOp = op;
        if (this.queueRequestIfNodeGroupIsUnavailable(localState, forwardRequest)) {
            return;
        }
        if (this.cachedState.replicationFactor == null && forwardRequest.options != null && forwardRequest.options.contains((Object)NodeSelectorService.SelectAndForwardRequest.ForwardingOption.BROADCAST)) {
            NodeSelectorService.SelectOwnerResponse response = new NodeSelectorService.SelectOwnerResponse();
            response.key = keyValue;
            Collection<NodeState> collection = response.selectedNodes = forwardRequest.candidateNodes == null ? localState.nodes.values() : (Collection<NodeState>)localState.nodes.values().stream().filter(ns -> forwardRequest.candidateNodes.contains(ns.id)).collect(Collectors.toList());
            if (forwardRequest.options.contains((Object)NodeSelectorService.SelectAndForwardRequest.ForwardingOption.REPLICATE)) {
                this.replicateRequest(op, forwardRequest, response);
                return;
            }
            this.broadcast(op, forwardRequest, response);
            return;
        }
        NodeSelectorService.SelectOwnerResponse response = this.selectNodes(keyValue, localState);
        int availableNodeCount = response.availableNodeCount;
        int quorum = this.cachedState.membershipQuorum;
        if (availableNodeCount < quorum) {
            op.fail(new IllegalStateException("Available nodes: " + availableNodeCount + ", quorum:" + quorum));
            return;
        }
        if (forwardRequest.targetPath == null) {
            op.setBodyNoCloning(response).complete();
            return;
        }
        if (forwardRequest.options != null && forwardRequest.options.contains((Object)NodeSelectorService.SelectAndForwardRequest.ForwardingOption.BROADCAST)) {
            if (forwardRequest.options.contains((Object)NodeSelectorService.SelectAndForwardRequest.ForwardingOption.REPLICATE)) {
                if (op.getAction() == Service.Action.DELETE) {
                    response.selectedNodes = localState.nodes.values();
                }
                this.replicateRequest(op, forwardRequest, response);
            } else {
                this.broadcast(op, forwardRequest, response);
            }
            return;
        }
        URI remoteService = UriUtils.buildServiceUri(response.ownerNodeGroupReference.getScheme(), response.ownerNodeGroupReference.getHost(), response.ownerNodeGroupReference.getPort(), forwardRequest.targetPath, forwardRequest.targetQuery, null);
        Operation fwdOp = op.clone().setCompletion((o, e) -> {
            op.transferResponseHeadersFrom(o).setStatusCode(o.getStatusCode()).setBodyNoCloning(o.getBodyRaw());
            if (e != null) {
                op.fail(e);
                return;
            }
            op.complete();
        });
        this.getHost().getClient().send(fwdOp.setUri(remoteService));
    }

    private NodeSelectorService.SelectOwnerResponse selectNodes(String key, NodeGroupService.NodeGroupState localState) {
        return this.selectNodes(key, localState, null);
    }

    private NodeSelectorService.SelectOwnerResponse selectNodes(String key, NodeGroupService.NodeGroupState localState, Collection<String> candidateNodes) {
        NodeState self = localState.nodes.get(this.getHost().getId());
        NodeSelectorService.SelectOwnerResponse response = new NodeSelectorService.SelectOwnerResponse();
        response.key = key;
        if (localState.nodes.size() == 1) {
            response.ownerNodeId = self.id;
            response.isLocalHostOwner = true;
            response.ownerNodeGroupReference = self.groupReference;
            response.selectedNodes = localState.nodes.values();
            response.membershipUpdateTimeMicros = localState.membershipUpdateTimeMicros;
            response.availableNodeCount = 1;
            return response;
        }
        int neighbourCount = 1;
        if (this.cachedState.replicationFactor != null) {
            neighbourCount = this.cachedState.replicationFactor.intValue();
        }
        ClosestNNeighbours closestNodes = new ClosestNNeighbours(neighbourCount);
        long keyHash = FNVHash.compute(response.key);
        Collection<String> nodeIds = candidateNodes != null ? candidateNodes : localState.nodes.keySet();
        for (String nodeId : nodeIds) {
            NodeState m = localState.nodes.get(nodeId);
            if (NodeState.isUnAvailable(m)) continue;
            ++response.availableNodeCount;
            long distance = m.getNodeIdHash() - keyHash;
            distance *= distance;
            distance = Math.abs(distance);
            closestNodes.put(distance, m);
        }
        NodeState closest = (NodeState)closestNodes.firstEntry().getValue();
        response.ownerNodeId = closest.id;
        response.isLocalHostOwner = response.ownerNodeId.equals(this.getHost().getId());
        response.ownerNodeGroupReference = closest.groupReference;
        response.selectedNodes = closestNodes.values();
        response.membershipUpdateTimeMicros = localState.membershipUpdateTimeMicros;
        return response;
    }

    private void broadcast(Operation op, NodeSelectorService.SelectAndForwardRequest req, NodeSelectorService.SelectOwnerResponse selectRsp) {
        Collection<NodeState> members = selectRsp.selectedNodes;
        AtomicInteger remaining = new AtomicInteger(members.size());
        NodeGroupBroadcastResponse rsp = new NodeGroupBroadcastResponse();
        if (remaining.get() == 0) {
            op.setBody(rsp).complete();
            return;
        }
        rsp.membershipQuorum = this.cachedState.membershipQuorum;
        AtomicInteger availableNodeCount = new AtomicInteger();
        Operation.CompletionHandler c = (o, e) -> {
            if (e != null) {
                ServiceErrorResponse errorRsp = Utils.toServiceErrorResponse(e);
                errorRsp.statusCode = o.getStatusCode();
                rsp.failures.put(o.getUri(), errorRsp);
            } else if (o != null && o.hasBody()) {
                rsp.jsonResponses.put(o.getUri(), Utils.toJson(o.getBodyRaw()));
            }
            if (remaining.decrementAndGet() != 0) {
                return;
            }
            rsp.nodeCount = this.cachedGroupState.nodes.size();
            rsp.availableNodeCount = availableNodeCount.get();
            op.setBodyNoCloning(rsp).complete();
        };
        for (NodeState m : members) {
            boolean skipNode = false;
            if (req.options.contains((Object)NodeSelectorService.SelectAndForwardRequest.ForwardingOption.EXCLUDE_ENTRY_NODE) && m.id.equals(this.getHost().getId())) {
                skipNode = true;
            }
            if (skipNode = NodeState.isUnAvailable(m) | skipNode) {
                c.handle(null, null);
                continue;
            }
            URI remoteService = UriUtils.buildUri(m.groupReference.getScheme(), m.groupReference.getHost(), m.groupReference.getPort(), req.targetPath, req.targetQuery);
            Operation remoteOp = Operation.createPost(remoteService).transferRequestHeadersFrom(op).addPragmaDirective("xn-no-fwd").setAction(op.getAction()).setCompletion(c).transferRefererFrom(op).setExpiration(op.getExpirationMicrosUtc()).setBody(op.getBodyRaw());
            rsp.receivers.add(remoteService);
            rsp.selectedNodes.put(m.id, m.groupReference);
            availableNodeCount.incrementAndGet();
            this.getHost().sendRequest(remoteOp);
        }
    }

    private void replicateRequest(Operation op, NodeSelectorService.SelectAndForwardRequest body, NodeSelectorService.SelectOwnerResponse response) {
        if (this.cachedGroupState == null) {
            op.fail(null);
        }
        this.replicationUtility.replicateUpdate(this.cachedGroupState, op, body, response, this.cachedState.replicationQuorum);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean queueRequestIfNodeGroupIsUnavailable(NodeGroupService.NodeGroupState localState, NodeSelectorService.SelectAndForwardRequest body) {
        Operation op = body.associatedOp;
        if (this.getHost().isStopping()) {
            op.fail(new CancellationException("host is stopping"));
            return true;
        }
        if (op.getExpirationMicrosUtc() < Utils.getSystemNowMicrosUtc()) {
            op.fail(new CancellationException(String.format("Operation already expired, will not queue. Exp:%d, now:%d", op.getExpirationMicrosUtc(), Utils.getSystemNowMicrosUtc())));
            return true;
        }
        if (!NodeSelectorState.isAvailable(this.cachedState)) {
            NodeSelectorState nodeSelectorState = this.cachedState;
            synchronized (nodeSelectorState) {
                NodeSelectorState.updateStatus(this.getHost(), localState, this.cachedState);
            }
        }
        if (NodeSelectorState.isAvailable(this.cachedState)) {
            return false;
        }
        if (this.operationQueueLimit <= this.pendingOperationCount.get()) {
            this.adjustStat("limitExceededFailedRequestCount", 1.0);
            Operation.failLimitExceeded(op, -2147483641, "pendingRequestQueue on " + this.getSelfLink());
            return true;
        }
        this.adjustStat("queuedRequestCount", 1.0);
        body.associatedOp = null;
        body = Utils.clone(body);
        body.associatedOp = op;
        this.pendingOperationCount.incrementAndGet();
        this.pendingRequestQueue.add(body);
        return true;
    }

    @Override
    public void handleMaintenance(Operation maintOp) {
        this.performPendingRequestMaintenance();
        if (this.checkAndScheduleSynchronization(this.cachedGroupState.membershipUpdateTimeMicros, maintOp)) {
            return;
        }
        maintOp.complete();
    }

    private void performPendingRequestMaintenance() {
        if (this.pendingRequestQueue.isEmpty()) {
            return;
        }
        while (!this.pendingRequestQueue.isEmpty()) {
            if (!NodeSelectorState.isAvailable(this.cachedState)) {
                NodeSelectorState.updateStatus(this.getHost(), this.cachedGroupState, this.cachedState);
                return;
            }
            NodeSelectorService.SelectAndForwardRequest req = this.pendingRequestQueue.poll();
            if (req == null) break;
            this.pendingOperationCount.decrementAndGet();
            if (this.getHost().isStopping()) {
                req.associatedOp.fail(new CancellationException("Host is stopping"));
                continue;
            }
            this.selectAndForward(req, req.associatedOp, this.cachedGroupState);
        }
    }

    private boolean checkAndScheduleSynchronization(long membershipUpdateMicros, Operation maintOp) {
        if (this.getHost().isStopping()) {
            return false;
        }
        if (!this.isSynchronizationRequired) {
            return false;
        }
        if (!NodeGroupUtils.isMembershipSettled(this.getHost(), this.getHost().getMaintenanceIntervalMicros(), this.cachedGroupState)) {
            this.checkConvergence(membershipUpdateMicros, maintOp);
            return true;
        }
        if (!this.isNodeGroupConverged) {
            this.checkConvergence(membershipUpdateMicros, maintOp);
            return true;
        }
        if (!this.getHost().isPeerSynchronizationEnabled() || !this.isSynchronizationRequired) {
            return false;
        }
        this.isSynchronizationRequired = false;
        this.logInfo("Scheduling synchronization (%d nodes)", this.cachedGroupState.nodes.size());
        this.adjustStat("synchronizationCount", 1.0);
        this.getHost().scheduleNodeGroupChangeMaintenance(this.getSelfLink());
        return false;
    }

    private void checkConvergence(long membershipUpdateMicros, Operation maintOp) {
        Operation.CompletionHandler c = (o, e) -> {
            if (e != null) {
                if (!this.getHost().isStopping()) {
                    this.logSevere(e);
                }
                maintOp.complete();
                return;
            }
            int quorumWarningsBeforeQuiet = 10;
            if (!o.hasBody()) {
                this.logWarning("Missing node group state", new Object[0]);
                maintOp.complete();
                return;
            }
            NodeGroupService.NodeGroupState ngs = o.getBody(NodeGroupService.NodeGroupState.class);
            this.updateCachedNodeGroupState(ngs, null);
            Operation op = Operation.createPost(null).setReferer(this.getUri()).setExpiration(Utils.fromNowMicrosUtc(this.getHost().getOperationTimeoutMicros()));
            NodeGroupUtils.checkConvergence(this.getHost(), ngs, op.setCompletion((o1, e1) -> {
                if (e1 != null) {
                    this.logWarning("Failed convergence check, will retry: %s", e1.getMessage());
                    maintOp.complete();
                    return;
                }
                if (!NodeGroupUtils.hasMembershipQuorum(this.getHost(), this.cachedGroupState)) {
                    if (this.synchQuorumWarningCount < 10) {
                        this.logWarning("Synchronization quorum not met", new Object[0]);
                    } else if (this.synchQuorumWarningCount == 10) {
                        this.logWarning("Synchronization quorum not met, warning will be silenced", new Object[0]);
                    }
                    ++this.synchQuorumWarningCount;
                    maintOp.complete();
                    return;
                }
                NodeSelectorState nodeSelectorState = this.cachedState;
                synchronized (nodeSelectorState) {
                    boolean bl = this.isNodeGroupConverged = membershipUpdateMicros == this.cachedGroupState.membershipUpdateTimeMicros;
                    if (this.isNodeGroupConverged) {
                        this.synchQuorumWarningCount = 0;
                    }
                }
                maintOp.complete();
            }));
        };
        this.sendRequest(Operation.createGet(this, this.cachedState.nodeGroupLink).setCompletion(c));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateCachedNodeGroupState(NodeGroupService.NodeGroupState ngs, NodeGroupService.UpdateQuorumRequest quorumUpdate) {
        if (ngs != null) {
            boolean logMsg;
            NodeGroupService.NodeGroupState currentState = this.cachedGroupState;
            boolean isAvailable = NodeSelectorState.isAvailable(this.getHost(), ngs);
            boolean isCurrentlyAvailable = currentState != null && NodeSelectorState.isAvailable(this.getHost(), currentState);
            boolean bl = logMsg = isAvailable != isCurrentlyAvailable || currentState != null && currentState.nodes.size() != ngs.nodes.size();
            if (currentState != null && logMsg) {
                this.logInfo("Node count: %d, available: %s, update time: %d (%d)", ngs.nodes.size(), isAvailable, ngs.membershipUpdateTimeMicros, ngs.localMembershipUpdateTimeMicros);
            }
        } else if (quorumUpdate.membershipQuorum != null) {
            this.logInfo("Quorum update: %d", quorumUpdate.membershipQuorum);
        }
        long now = Utils.getNowMicrosUtc();
        NodeSelectorState nodeSelectorState = this.cachedState;
        synchronized (nodeSelectorState) {
            this.cachedState.status = NodeSelectorState.Status.UNAVAILABLE;
            if (quorumUpdate != null) {
                this.cachedState.documentUpdateTimeMicros = now;
                if (quorumUpdate.membershipQuorum != null) {
                    this.cachedState.membershipQuorum = quorumUpdate.membershipQuorum;
                }
                if (this.cachedGroupState != null) {
                    if (quorumUpdate.membershipQuorum != null) {
                        this.cachedGroupState.nodes.get((Object)this.getHost().getId()).membershipQuorum = quorumUpdate.membershipQuorum;
                    }
                    if (quorumUpdate.locationQuorum != null) {
                        this.cachedGroupState.nodes.get((Object)this.getHost().getId()).locationQuorum = quorumUpdate.locationQuorum;
                    }
                }
                return;
            }
            if (this.cachedGroupState == null) {
                this.cachedGroupState = ngs;
            }
            if (this.cachedGroupState.documentUpdateTimeMicros > ngs.documentUpdateTimeMicros) {
                return;
            }
            NodeSelectorState.updateStatus(this.getHost(), ngs, this.cachedState);
            this.cachedState.documentUpdateTimeMicros = now;
            this.cachedState.membershipUpdateTimeMicros = ngs.membershipUpdateTimeMicros;
            this.cachedGroupState = ngs;
            this.isNodeGroupConverged = false;
            this.isSynchronizationRequired = true;
        }
    }

    @Override
    public void updateReplicationQuorum(Operation op, NodeSelectorService.UpdateReplicationQuorumRequest r) {
        int replicationFactor;
        if (r.replicationQuorum == null) {
            op.fail(new IllegalArgumentException("replication quorum is required"));
            return;
        }
        int replicationQuorum = r.replicationQuorum;
        int n = replicationFactor = this.cachedState.replicationFactor != null ? this.cachedState.replicationFactor.intValue() : this.cachedGroupState.nodes.size();
        if (replicationQuorum > replicationFactor) {
            String errorMsg = String.format("replicationQuorum %d > replicationFactor %d", replicationQuorum, replicationFactor);
            op.fail(new IllegalArgumentException(errorMsg));
            return;
        }
        this.logInfo("replicationQuorum update from %d to %d", this.cachedState.replicationQuorum, replicationQuorum);
        this.cachedState.replicationQuorum = replicationQuorum;
        if (!r.isGroupUpdate) {
            op.complete();
            return;
        }
        r.isGroupUpdate = false;
        AtomicInteger pending = new AtomicInteger(this.cachedGroupState.nodes.size());
        Operation.CompletionHandler c = (o, e) -> {
            if (e != null) {
                op.fail(e);
                return;
            }
            int p = pending.decrementAndGet();
            if (p != 0) {
                return;
            }
            op.complete();
        };
        for (NodeState node : this.cachedGroupState.nodes.values()) {
            if (!NodeState.isAvailable(node, this.getHost().getId(), true)) {
                c.handle(null, null);
                continue;
            }
            URI peerNodeSelectorService = UriUtils.buildUri(node.groupReference.getScheme(), node.groupReference.getHost(), node.groupReference.getPort(), this.getSelfLink(), null);
            Operation p = Operation.createPatch(peerNodeSelectorService).setBody(r).setCompletion(c);
            this.sendRequest(p);
        }
    }

    @Override
    public Service getUtilityService(String uriPath) {
        if (uriPath.endsWith("/replication")) {
            return this.replicationUtility;
        }
        return super.getUtilityService(uriPath);
    }

    private static final class ClosestNNeighbours
    extends TreeMap<Long, NodeState> {
        private static final long serialVersionUID = 0L;
        private final int maxN;

        public ClosestNNeighbours(int maxN) {
            super(Long::compare);
            this.maxN = maxN;
        }

        @Override
        public NodeState put(Long key, NodeState value) {
            if (this.size() < this.maxN) {
                return super.put(key, value);
            }
            if (this.comparator().compare(key, this.lastKey()) <= 0) {
                NodeState old = super.put(key, value);
                if (old == null) {
                    this.remove(this.lastKey());
                }
                return old;
            }
            return null;
        }
    }
}

