/*
 * Decompiled with CFR 0.152.
 */
package com.vmware.xenon.services.common;

import com.vmware.xenon.common.MurmurHash3;
import com.vmware.xenon.common.NodeSelectorService;
import com.vmware.xenon.common.NodeSelectorState;
import com.vmware.xenon.common.Operation;
import com.vmware.xenon.common.Service;
import com.vmware.xenon.common.ServiceErrorResponse;
import com.vmware.xenon.common.StatelessService;
import com.vmware.xenon.common.UriUtils;
import com.vmware.xenon.common.Utils;
import com.vmware.xenon.services.common.NodeGroupBroadcastResponse;
import com.vmware.xenon.services.common.NodeGroupService;
import com.vmware.xenon.services.common.NodeGroupUtils;
import com.vmware.xenon.services.common.NodeSelectorForwardingService;
import com.vmware.xenon.services.common.NodeSelectorReplicationService;
import com.vmware.xenon.services.common.NodeSelectorSynchronizationService;
import com.vmware.xenon.services.common.NodeState;
import java.net.URI;
import java.util.Collection;
import java.util.TreeMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

public class ConsistentHashingNodeSelectorService
extends StatelessService
implements NodeSelectorService {
    private ConcurrentSkipListMap<String, Long> hashedNodeIds = new ConcurrentSkipListMap();
    private ConcurrentLinkedQueue<NodeSelectorService.SelectAndForwardRequest> pendingRequests = new ConcurrentLinkedQueue();
    private NodeGroupService.NodeGroupState cachedGroupState;
    private NodeSelectorState cachedState;
    private NodeSelectorReplicationService replicationUtility;
    private volatile boolean isSynchronizationRequired;
    private boolean isNodeGroupConverged;
    private int synchQuorumWarningCount;

    public ConsistentHashingNodeSelectorService() {
        super(NodeSelectorState.class);
        super.toggleOption(Service.ServiceOption.PERIODIC_MAINTENANCE, true);
        super.toggleOption(Service.ServiceOption.INSTRUMENTATION, true);
    }

    @Override
    public void handleStart(Operation start) {
        NodeSelectorState state = null;
        if (!start.hasBody()) {
            state = new NodeSelectorState();
            state.nodeGroupLink = "/core/node-groups/default";
        } else {
            state = start.getBody(NodeSelectorState.class);
        }
        state.documentSelfLink = this.getSelfLink();
        state.documentKind = Utils.buildKind(NodeSelectorState.class);
        state.documentOwner = this.getHost().getId();
        this.cachedState = state;
        this.replicationUtility = new NodeSelectorReplicationService(this);
        this.startHelperServices(start);
    }

    private void startHelperServices(Operation op) {
        AtomicInteger remaining = new AtomicInteger(4);
        Operation.CompletionHandler h = (o, e) -> {
            if (e != null) {
                op.fail(e);
                return;
            }
            if (remaining.decrementAndGet() != 0) {
                return;
            }
            op.complete();
        };
        Operation subscribeToNodeGroup = Operation.createPost(UriUtils.buildSubscriptionUri(this.getHost(), this.cachedState.nodeGroupLink)).setCompletion(h).setReferer(this.getUri());
        this.getHost().startSubscriptionService(subscribeToNodeGroup, this.handleNodeGroupNotification());
        this.sendRequest(Operation.createGet(this, this.cachedState.nodeGroupLink).setCompletion((o, e) -> {
            if (e == null) {
                NodeGroupService.NodeGroupState ngs = o.getBody(NodeGroupService.NodeGroupState.class);
                this.updateCachedNodeGroupState(ngs, null);
            } else {
                this.logSevere(e);
            }
            h.handle(o, e);
        }));
        Operation startSynchPost = Operation.createPost(UriUtils.extendUri(this.getUri(), "synch")).setCompletion(h);
        Operation startForwardingPost = Operation.createPost(UriUtils.extendUri(this.getUri(), "forwarding")).setCompletion(h);
        this.getHost().startService(startSynchPost, new NodeSelectorSynchronizationService(this));
        this.getHost().startService(startForwardingPost, new NodeSelectorForwardingService(this));
    }

    private Consumer<Operation> handleNodeGroupNotification() {
        return notifyOp -> {
            notifyOp.complete();
            NodeGroupService.NodeGroupState ngs = null;
            if (notifyOp.getAction() == Service.Action.PATCH) {
                NodeGroupService.UpdateQuorumRequest bd = notifyOp.getBody(NodeGroupService.UpdateQuorumRequest.class);
                if (NodeGroupService.UpdateQuorumRequest.KIND.equals(bd.kind)) {
                    this.updateCachedNodeGroupState(null, bd);
                    return;
                }
            } else if (notifyOp.getAction() != Service.Action.POST) {
                return;
            }
            ngs = notifyOp.getBody(NodeGroupService.NodeGroupState.class);
            if (ngs.nodes == null || ngs.nodes.isEmpty()) {
                return;
            }
            this.updateCachedNodeGroupState(ngs, null);
        };
    }

    @Override
    public void handleRequest(Operation op) {
        if (op.getAction() == Service.Action.GET) {
            op.setBody(this.cachedState).complete();
            return;
        }
        if (op.getAction() == Service.Action.DELETE) {
            super.handleRequest(op);
            return;
        }
        if (op.getAction() != Service.Action.POST) {
            this.getHost().failRequestActionNotSupported(op);
            return;
        }
        if (!op.hasBody()) {
            op.fail(new IllegalArgumentException("Body is required"));
            return;
        }
        NodeSelectorService.SelectAndForwardRequest body = op.getBody(NodeSelectorService.SelectAndForwardRequest.class);
        if (body.key == null && body.targetPath == null) {
            op.fail(new IllegalArgumentException("key or targetPath is required"));
            return;
        }
        this.selectAndForward(op, body);
    }

    @Override
    public String getNodeGroup() {
        return this.cachedState.nodeGroupLink;
    }

    @Override
    public void selectAndForward(Operation op, NodeSelectorService.SelectAndForwardRequest body) {
        this.selectAndForward(body, op, this.cachedGroupState);
    }

    private void selectAndForward(NodeSelectorService.SelectAndForwardRequest body, Operation op, NodeGroupService.NodeGroupState localState) {
        String keyValue = body.key != null ? body.key : body.targetPath;
        NodeSelectorService.SelectOwnerResponse response = new NodeSelectorService.SelectOwnerResponse();
        response.key = keyValue;
        body.associatedOp = op;
        if (this.queueRequestIfNodeGroupIsUnavailable(localState, body)) {
            return;
        }
        if (this.cachedState.replicationFactor == null && body.options != null && body.options.contains((Object)NodeSelectorService.SelectAndForwardRequest.ForwardingOption.BROADCAST)) {
            response.selectedNodes = localState.nodes.values();
            if (body.options.contains((Object)NodeSelectorService.SelectAndForwardRequest.ForwardingOption.REPLICATE)) {
                this.replicateRequest(op, body, response);
                return;
            }
            this.broadcast(op, body, response);
            return;
        }
        this.selectNodes(op, response, localState);
        if (body.targetPath == null) {
            op.setBodyNoCloning(response).complete();
            return;
        }
        if (body.options != null && body.options.contains((Object)NodeSelectorService.SelectAndForwardRequest.ForwardingOption.BROADCAST)) {
            if (body.options.contains((Object)NodeSelectorService.SelectAndForwardRequest.ForwardingOption.REPLICATE)) {
                this.replicateRequest(op, body, response);
            } else {
                this.broadcast(op, body, response);
            }
            return;
        }
        URI remoteService = UriUtils.buildUri(response.ownerNodeGroupReference.getScheme(), response.ownerNodeGroupReference.getHost(), response.ownerNodeGroupReference.getPort(), body.targetPath, body.targetQuery);
        Operation fwdOp = op.clone().setCompletion((o, e) -> {
            op.transferResponseHeadersFrom(o).setStatusCode(o.getStatusCode()).setBodyNoCloning(o.getBodyRaw());
            if (e != null) {
                op.fail(e);
                return;
            }
            op.complete();
        });
        this.getHost().getClient().send(fwdOp.setUri(remoteService));
    }

    private void selectNodes(Operation op, NodeSelectorService.SelectOwnerResponse response, NodeGroupService.NodeGroupState localState) {
        NodeState self = localState.nodes.get(this.getHost().getId());
        int quorum = self.membershipQuorum;
        int availableNodes = localState.nodes.size();
        if (availableNodes == 1) {
            response.ownerNodeId = self.id;
            response.isLocalHostOwner = true;
            response.ownerNodeGroupReference = self.groupReference;
            response.selectedNodes = localState.nodes.values();
            return;
        }
        TreeMap<Long, NodeState> closestNodes = new TreeMap<Long, NodeState>();
        long neighbourCount = 1L;
        if (this.cachedState.replicationFactor != null) {
            neighbourCount = this.cachedState.replicationFactor;
        }
        boolean seed = false;
        int keyHash = MurmurHash3.murmurhash3_x86_32(response.key, 0, response.key.length(), 0);
        for (NodeState m : localState.nodes.values()) {
            if (NodeState.isUnAvailable(m)) {
                --availableNodes;
                continue;
            }
            ++response.availableNodeCount;
            int nodeIdHash = 0;
            Long nodeIdHashLong = this.hashedNodeIds.get(m.id);
            if (nodeIdHashLong == null) {
                nodeIdHash = MurmurHash3.murmurhash3_x86_32(m.id, 0, m.id.length(), 0);
                this.hashedNodeIds.put(m.id, Long.valueOf(nodeIdHash));
            } else {
                nodeIdHash = nodeIdHashLong.intValue();
            }
            long distance = nodeIdHash - keyHash;
            distance *= distance;
            closestNodes.put(distance, m);
            if ((long)closestNodes.size() <= neighbourCount) continue;
            closestNodes.remove(closestNodes.lastKey());
        }
        if (availableNodes < quorum) {
            op.fail(new IllegalStateException("Available nodes: " + availableNodes + ", quorum:" + quorum));
            return;
        }
        NodeState closest = (NodeState)closestNodes.get(closestNodes.firstKey());
        response.ownerNodeId = closest.id;
        response.isLocalHostOwner = response.ownerNodeId.equals(this.getHost().getId());
        response.ownerNodeGroupReference = closest.groupReference;
        response.selectedNodes = closestNodes.values();
        response.membershipUpdateTimeMicros = localState.membershipUpdateTimeMicros;
    }

    private void broadcast(Operation op, NodeSelectorService.SelectAndForwardRequest req, NodeSelectorService.SelectOwnerResponse selectRsp) {
        Collection<NodeState> members = selectRsp.selectedNodes;
        AtomicInteger remaining = new AtomicInteger(members.size());
        NodeGroupBroadcastResponse rsp = new NodeGroupBroadcastResponse();
        if (remaining.get() == 0) {
            op.setBody(rsp).complete();
            return;
        }
        rsp.membershipQuorum = this.cachedGroupState.nodes.get((Object)this.getHost().getId()).membershipQuorum;
        AtomicInteger availableNodeCount = new AtomicInteger();
        Operation.CompletionHandler c = (o, e) -> {
            if (e != null) {
                ServiceErrorResponse errorRsp = Utils.toServiceErrorResponse(e);
                rsp.failures.put(o.getUri(), errorRsp);
            } else if (o != null && o.hasBody()) {
                rsp.jsonResponses.put(o.getUri(), Utils.toJson(o.getBodyRaw()));
            }
            if (remaining.decrementAndGet() != 0) {
                return;
            }
            rsp.nodeCount = this.cachedGroupState.nodes.size();
            rsp.availableNodeCount = availableNodeCount.get();
            op.setBodyNoCloning(rsp).complete();
        };
        for (NodeState m : members) {
            boolean skipNode = false;
            if (req.options.contains((Object)NodeSelectorService.SelectAndForwardRequest.ForwardingOption.EXCLUDE_ENTRY_NODE) && m.id.equals(this.getHost().getId())) {
                skipNode = true;
            }
            if (skipNode = NodeState.isUnAvailable(m) | skipNode) {
                c.handle(null, null);
                continue;
            }
            URI remoteService = UriUtils.buildUri(m.groupReference.getScheme(), m.groupReference.getHost(), m.groupReference.getPort(), req.targetPath, req.targetQuery);
            Operation remoteOp = Operation.createPost(remoteService).transferRequestHeadersFrom(op).addPragmaDirective("xn-no-fwd").setAction(op.getAction()).setCompletion(c).setReferer(op.getReferer()).setExpiration(op.getExpirationMicrosUtc()).setBody(op.getBodyRaw());
            rsp.receivers.add(remoteService);
            rsp.selectedNodes.put(m.id, m.groupReference);
            availableNodeCount.incrementAndGet();
            this.getHost().sendRequest(remoteOp);
        }
    }

    private void replicateRequest(Operation op, NodeSelectorService.SelectAndForwardRequest body, NodeSelectorService.SelectOwnerResponse response) {
        if (this.cachedGroupState == null) {
            op.fail(null);
        }
        this.replicationUtility.replicateUpdate(this.cachedGroupState, op, body, response);
    }

    private boolean queueRequestIfNodeGroupIsUnavailable(NodeGroupService.NodeGroupState localState, NodeSelectorService.SelectAndForwardRequest body) {
        Operation op = body.associatedOp;
        if (this.getHost().isStopping()) {
            op.fail(new CancellationException("host is stopping"));
            return true;
        }
        if (op.getExpirationMicrosUtc() < Utils.getNowMicrosUtc()) {
            op.fail(new TimeoutException(String.format("Operation already expired, will not queue. Exp:%d, now:%d", op.getExpirationMicrosUtc(), Utils.getNowMicrosUtc())));
            return true;
        }
        if (NodeGroupUtils.isNodeGroupAvailable(this.getHost(), localState)) {
            return false;
        }
        this.adjustStat("queuedRequestCount", 1.0);
        this.pendingRequests.add(body);
        return true;
    }

    @Override
    public void handleMaintenance(Operation maintOp) {
        this.performPendingRequestMaintenance();
        this.checkAndScheduleSynchronization(this.cachedGroupState.membershipUpdateTimeMicros);
        maintOp.complete();
    }

    private void performPendingRequestMaintenance() {
        NodeSelectorService.SelectAndForwardRequest req;
        if (this.pendingRequests.isEmpty()) {
            return;
        }
        if (!NodeGroupUtils.isNodeGroupAvailable(this.getHost(), this.cachedGroupState)) {
            return;
        }
        while (!this.pendingRequests.isEmpty() && (req = this.pendingRequests.poll()) != null) {
            if (this.getHost().isStopping()) {
                req.associatedOp.fail(new CancellationException());
                continue;
            }
            this.selectAndForward(req, req.associatedOp, this.cachedGroupState);
        }
    }

    private void checkAndScheduleSynchronization(long membershipUpdateMicros) {
        if (this.getHost().isStopping()) {
            return;
        }
        if (!NodeGroupUtils.isMembershipSettled(this.getHost(), this.getHost().getMaintenanceIntervalMicros(), this.cachedGroupState)) {
            this.checkConvergence(membershipUpdateMicros);
            return;
        }
        if (!this.isNodeGroupConverged) {
            this.checkConvergence(membershipUpdateMicros);
            return;
        }
        if (!this.getHost().isPeerSynchronizationEnabled() || !this.isSynchronizationRequired) {
            return;
        }
        this.isSynchronizationRequired = false;
        this.logInfo("Scheduling synchronization (%d nodes)", this.cachedGroupState.nodes.size());
        this.adjustStat("synchronizationCount", 1.0);
        this.getHost().scheduleNodeGroupChangeMaintenance(this.getSelfLink());
    }

    private void checkConvergence(long membershipUpdateMicros) {
        Operation.CompletionHandler c = (o, e) -> {
            if (e != null) {
                this.logSevere(e);
                return;
            }
            int quorumWarningsBeforeQuiet = 10;
            NodeGroupService.NodeGroupState ngs = o.getBody(NodeGroupService.NodeGroupState.class);
            this.updateCachedNodeGroupState(ngs, null);
            Operation op = Operation.createPost(null).setReferer(this.getUri()).setExpiration(Utils.getNowMicrosUtc() + this.getHost().getOperationTimeoutMicros());
            NodeGroupUtils.checkConvergence(this.getHost(), ngs, op.setCompletion((o1, e1) -> {
                if (e1 != null) {
                    this.logWarning("Failed convergence check, will retry: %s", e1.getMessage());
                    return;
                }
                if (!NodeGroupUtils.hasMembershipQuorum(this.getHost(), this.cachedGroupState)) {
                    if (this.synchQuorumWarningCount < 10) {
                        this.logWarning("Synchronization quorum not met", new Object[0]);
                    } else if (this.synchQuorumWarningCount == 10) {
                        this.logWarning("Synchronization quorum not met, warning will be silenced", new Object[0]);
                    }
                    ++this.synchQuorumWarningCount;
                    return;
                }
                this.hashedNodeIds.clear();
                NodeSelectorState nodeSelectorState = this.cachedState;
                synchronized (nodeSelectorState) {
                    boolean bl = this.isNodeGroupConverged = membershipUpdateMicros == this.cachedGroupState.membershipUpdateTimeMicros;
                    if (this.isNodeGroupConverged) {
                        this.synchQuorumWarningCount = 0;
                    }
                }
            }));
        };
        this.sendRequest(Operation.createGet(this, this.cachedState.nodeGroupLink).setCompletion(c));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateCachedNodeGroupState(NodeGroupService.NodeGroupState ngs, NodeGroupService.UpdateQuorumRequest quorumUpdate) {
        if (ngs != null) {
            NodeGroupService.NodeGroupState currentState = this.cachedGroupState;
            if (currentState != null && currentState.nodes.size() != ngs.nodes.size()) {
                this.logInfo("Node count: %d", ngs.nodes.size());
            }
        } else {
            this.logInfo("Quorum update: %d", quorumUpdate.membershipQuorum);
        }
        NodeSelectorState nodeSelectorState = this.cachedState;
        synchronized (nodeSelectorState) {
            if (quorumUpdate != null) {
                if (this.cachedGroupState != null) {
                    NodeState selfNode = this.cachedGroupState.nodes.get(this.getHost().getId());
                    selfNode.membershipQuorum = quorumUpdate.membershipQuorum;
                }
                return;
            }
            if (this.cachedGroupState == null) {
                this.cachedGroupState = ngs;
            }
            if (this.cachedGroupState.documentUpdateTimeMicros > ngs.documentUpdateTimeMicros) {
                return;
            }
            this.cachedGroupState = ngs;
            this.isNodeGroupConverged = false;
            this.isSynchronizationRequired = true;
        }
    }

    @Override
    public Service getUtilityService(String uriPath) {
        if (uriPath.endsWith("/replication")) {
            return this.replicationUtility;
        }
        if (uriPath.endsWith("/stats")) {
            return super.getUtilityService(uriPath);
        }
        return null;
    }
}

