/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.google.common.base.Objects;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.streaming.StreamOutSession;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AntiEntropyService {
    private static final Logger logger = LoggerFactory.getLogger(AntiEntropyService.class);
    public static final AntiEntropyService instance = new AntiEntropyService();
    private static final ThreadPoolExecutor executor = new JMXConfigurableThreadPoolExecutor(4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AntiEntropySessions"), "internal");
    private final ConcurrentMap<String, RepairSession> sessions = new ConcurrentHashMap<String, RepairSession>();

    protected AntiEntropyService() {
    }

    public RepairFuture submitRepairSession(Range range, String tablename, String ... cfnames) {
        RepairFuture futureTask = new RepairSession(range, tablename, cfnames).getFuture();
        executor.execute(futureTask);
        return futureTask;
    }

    RepairFuture submitArtificialRepairSession(TreeRequest req, String tablename, String ... cfnames) {
        RepairFuture futureTask = new RepairSession(req, tablename, cfnames).getFuture();
        executor.execute(futureTask);
        return futureTask;
    }

    static Set<InetAddress> getNeighbors(String table, Range range) {
        StorageService ss = StorageService.instance;
        Map<Range, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
        if (!replicaSets.containsKey(range)) {
            return Collections.emptySet();
        }
        HashSet<InetAddress> neighbors = new HashSet<InetAddress>((Collection)replicaSets.get(range));
        neighbors.remove(FBUtilities.getLocalAddress());
        Iterator iter = neighbors.iterator();
        while (iter.hasNext()) {
            InetAddress endpoint = (InetAddress)iter.next();
            if (Gossiper.instance.getVersion(endpoint) > 1) continue;
            logger.info("Excluding " + endpoint + " from repair because it is on version 0.7 or sooner. You should consider updating this node before running repair again.");
            iter.remove();
        }
        return neighbors;
    }

    private void rendezvous(TreeRequest request, MerkleTree tree) {
        RepairSession session = (RepairSession)this.sessions.get(request.sessionid);
        if (session == null) {
            logger.warn("Got a merkle tree response for unknown repair session {}: either this node has been restarted since the session was started, or the session has been interrupted for an unknown reason. ", (Object)request.sessionid);
            return;
        }
        RepairSession.RepairJob job = session.jobs.peek();
        assert (job != null) : "A repair should have at least some jobs scheduled";
        if (job.addTree(request, tree) == 0) {
            logger.debug("All trees received for " + session.getName() + "/" + (String)request.cf.right);
            job.submitDifferencers();
            session.jobs.poll();
            RepairSession.RepairJob nextJob = session.jobs.peek();
            if (nextJob == null) {
                session.differencingDone.signalAll();
            } else {
                nextJob.sendTreeRequests();
            }
        }
    }

    TreeRequest request(String sessionid, InetAddress remote, Range range, String ksname, String cfname) {
        TreeRequest request = new TreeRequest(sessionid, remote, range, new CFPair(ksname, cfname));
        MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request, Gossiper.instance.getVersion(remote)), remote);
        return request;
    }

    void respond(Validator validator, InetAddress local) {
        MessagingService ms = MessagingService.instance();
        try {
            Message message = TreeResponseVerbHandler.makeVerb(local, validator);
            logger.info("Sending AEService tree for " + validator.request);
            ms.sendOneWay(message, validator.request.endpoint);
        }
        catch (Exception e) {
            logger.error("Could not send valid tree for request " + validator.request, (Throwable)e);
        }
    }

    public static class RepairFuture
    extends FutureTask {
        public final RepairSession session;

        RepairFuture(RepairSession session) {
            super(session, null);
            this.session = session;
        }
    }

    class RepairSession
    extends WrappedRunnable
    implements IEndpointStateChangeSubscriber,
    IFailureDetectionEventListener {
        private final String sessionName;
        private final String tablename;
        private final String[] cfnames;
        private final Range range;
        private volatile Exception exception;
        private final AtomicBoolean isFailed = new AtomicBoolean(false);
        private final Set<InetAddress> endpoints;
        final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>();
        final Map<String, RepairJob> activeJobs = new ConcurrentHashMap<String, RepairJob>();
        private final SimpleCondition completed = new SimpleCondition();
        public final Condition differencingDone = new SimpleCondition();

        public RepairSession(TreeRequest req, String tablename, String ... cfnames) {
            this(req.sessionid, req.range, tablename, cfnames);
            instance.sessions.put(this.getName(), this);
        }

        public RepairSession(Range range, String tablename, String ... cfnames) {
            this("manual-repair-" + UUID.randomUUID(), range, tablename, cfnames);
        }

        private RepairSession(String id, Range range, String tablename, String[] cfnames) {
            this.sessionName = id;
            this.tablename = tablename;
            this.cfnames = cfnames;
            assert (cfnames.length > 0) : "Repairing no column families seems pointless, doesn't it";
            this.range = range;
            this.endpoints = AntiEntropyService.getNeighbors(tablename, range);
        }

        public String getName() {
            return this.sessionName;
        }

        RepairFuture getFuture() {
            return new RepairFuture(this);
        }

        @Override
        public void runMayThrow() throws Exception {
            block9: {
                if (this.endpoints.isEmpty()) {
                    this.differencingDone.signalAll();
                    logger.info("No neighbors to repair with for " + this.tablename + " on " + this.range + ": " + this.getName() + " completed.");
                    return;
                }
                for (InetAddress endpoint : this.endpoints) {
                    if (FailureDetector.instance.isAlive(endpoint)) continue;
                    this.differencingDone.signalAll();
                    logger.info("Could not proceed on repair because a neighbor (" + endpoint + ") is dead: " + this.getName() + " failed.");
                    return;
                }
                instance.sessions.put(this.getName(), this);
                Gossiper.instance.register(this);
                FailureDetector.instance.registerFailureDetectionEventListener(this);
                try {
                    for (String cfname : this.cfnames) {
                        RepairJob job = new RepairJob(cfname);
                        this.jobs.offer(job);
                        this.activeJobs.put(cfname, job);
                    }
                    this.jobs.peek().sendTreeRequests();
                    this.completed.await();
                    if (this.exception == null) {
                        logger.info(String.format("Repair session %s (on cfs %s, range %s) completed successfully", this.getName(), this.cfnames, this.range));
                        break block9;
                    }
                    logger.error(String.format("Repair session %s (on cfs %s, range %s) failed with the following error", this.getName(), this.cfnames, this.range), (Throwable)this.exception);
                    throw this.exception;
                }
                catch (InterruptedException e) {
                    throw new RuntimeException("Interrupted while waiting for repair: repair will continue in the background.");
                }
                finally {
                    FailureDetector.instance.unregisterFailureDetectionEventListener(this);
                    Gossiper.instance.unregister(this);
                    instance.sessions.remove(this.getName());
                }
            }
        }

        void completed(InetAddress remote, String cfname) {
            logger.debug("Repair completed for {} on {}", (Object)remote, (Object)cfname);
            RepairJob job = this.activeJobs.get(cfname);
            if (job.completedSynchronizationJob(remote)) {
                this.activeJobs.remove(cfname);
                if (this.activeJobs.isEmpty()) {
                    this.completed.signalAll();
                }
            }
        }

        void failedNode(InetAddress remote) {
            String errorMsg = String.format("Problem during repair session %s, endpoint %s died", this.sessionName, remote);
            logger.error(errorMsg);
            this.exception = new IOException(errorMsg);
            this.jobs.clear();
            this.activeJobs.clear();
            this.differencingDone.signalAll();
            this.completed.signalAll();
        }

        @Override
        public void onJoin(InetAddress endpoint, EndpointState epState) {
        }

        @Override
        public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {
        }

        @Override
        public void onAlive(InetAddress endpoint, EndpointState state) {
        }

        @Override
        public void onDead(InetAddress endpoint, EndpointState state) {
        }

        @Override
        public void onRemove(InetAddress endpoint) {
            this.convict(endpoint, Double.MAX_VALUE);
        }

        @Override
        public void onRestart(InetAddress endpoint, EndpointState epState) {
            this.convict(endpoint, Double.MAX_VALUE);
        }

        @Override
        public void convict(InetAddress endpoint, double phi) {
            if (!this.endpoints.contains(endpoint)) {
                return;
            }
            if (phi < (double)(2 * DatabaseDescriptor.getPhiConvictThreshold())) {
                return;
            }
            if (!this.isFailed.compareAndSet(false, true)) {
                return;
            }
            this.failedNode(endpoint);
        }

        class Differencer
        implements Runnable {
            public final String cfname;
            public final InetAddress remote;
            public final MerkleTree ltree;
            public final MerkleTree rtree;
            public List<Range> differences;

            Differencer(String cfname, InetAddress remote, MerkleTree ltree, MerkleTree rtree) {
                this.cfname = cfname;
                this.remote = remote;
                this.ltree = ltree;
                this.rtree = rtree;
                this.differences = new ArrayList<Range>();
            }

            @Override
            public void run() {
                InetAddress local = FBUtilities.getLocalAddress();
                if (this.ltree.partitioner() == null) {
                    this.ltree.partitioner(StorageService.getPartitioner());
                }
                if (this.rtree.partitioner() == null) {
                    this.rtree.partitioner(StorageService.getPartitioner());
                }
                this.differences.addAll(MerkleTree.difference(this.ltree, this.rtree));
                String format = "Endpoints " + local + " and " + this.remote + " %s for " + this.cfname + " on " + RepairSession.this.range;
                if (this.differences.isEmpty()) {
                    logger.info(String.format(format, "are consistent"));
                    RepairSession.this.completed(this.remote, this.cfname);
                    return;
                }
                logger.info(String.format(format, "have " + this.differences.size() + " range(s) out of sync"));
                try {
                    this.performStreamingRepair();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            void performStreamingRepair() throws IOException {
                logger.info("Performing streaming repair of " + this.differences.size() + " ranges with " + this.remote + " for " + RepairSession.this.range);
                ColumnFamilyStore cfstore = Table.open(RepairSession.this.tablename).getColumnFamilyStore(this.cfname);
                try {
                    Collection<SSTableReader> sstables = cfstore.getSSTables();
                    Callback callback = new Callback();
                    StreamOutSession outsession = StreamOutSession.create(RepairSession.this.tablename, this.remote, callback);
                    StreamOut.transferSSTables(outsession, sstables, this.differences, OperationType.AES);
                    StreamIn.requestRanges(this.remote, RepairSession.this.tablename, this.differences, callback, OperationType.AES);
                }
                catch (Exception e) {
                    throw new IOException("Streaming repair failed.", e);
                }
            }

            public String toString() {
                return "#<Differencer " + this.remote + "/" + RepairSession.this.range + ">";
            }

            class Callback
            extends WrappedRunnable {
                private final AtomicInteger outstanding = new AtomicInteger(2);

                Callback() {
                }

                @Override
                protected void runMayThrow() throws Exception {
                    if (this.outstanding.decrementAndGet() > 0) {
                        return;
                    }
                    RepairSession.this.completed(Differencer.this.remote, Differencer.this.cfname);
                    logger.info(String.format("Finished streaming repair with %s for %s", Differencer.this.remote, RepairSession.this.range));
                }
            }
        }

        class RepairJob {
            private final String cfname;
            private final Set<InetAddress> requestedEndpoints = new HashSet<InetAddress>();
            private final Map<InetAddress, MerkleTree> trees = new HashMap<InetAddress, MerkleTree>();
            private final Set<InetAddress> syncJobs = new HashSet<InetAddress>();
            private final Condition requestsSent = new SimpleCondition();

            public RepairJob(String cfname) {
                this.cfname = cfname;
            }

            public void sendTreeRequests() {
                this.requestedEndpoints.addAll(RepairSession.this.endpoints);
                this.requestedEndpoints.add(FBUtilities.getLocalAddress());
                for (InetAddress endpoint : this.requestedEndpoints) {
                    instance.request(RepairSession.this.getName(), endpoint, RepairSession.this.range, RepairSession.this.tablename, this.cfname);
                }
                this.requestsSent.signalAll();
            }

            public synchronized int addTree(TreeRequest request, MerkleTree tree) {
                try {
                    this.requestsSent.await();
                }
                catch (InterruptedException e) {
                    throw new AssertionError((Object)"Interrupted while waiting for requests to be sent");
                }
                assert (((String)request.cf.right).equals(this.cfname));
                this.trees.put(request.endpoint, tree);
                this.requestedEndpoints.remove(request.endpoint);
                return this.requestedEndpoints.size();
            }

            public void submitDifferencers() {
                assert (this.requestedEndpoints.size() == 0);
                MerkleTree localTree = this.trees.get(FBUtilities.getLocalAddress());
                assert (localTree != null);
                for (Map.Entry<InetAddress, MerkleTree> entry : this.trees.entrySet()) {
                    if (entry.getKey().equals(FBUtilities.getLocalAddress())) continue;
                    Differencer differencer = new Differencer(this.cfname, entry.getKey(), entry.getValue(), localTree);
                    this.syncJobs.add(entry.getKey());
                    logger.debug("Queueing comparison " + differencer);
                    StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
                }
                this.trees.clear();
            }

            synchronized boolean completedSynchronizationJob(InetAddress remote) {
                this.syncJobs.remove(remote);
                return this.syncJobs.isEmpty();
            }
        }
    }

    public static class TreeRequest {
        public final String sessionid;
        public final InetAddress endpoint;
        public final Range range;
        public final CFPair cf;

        public TreeRequest(String sessionid, InetAddress endpoint, Range range, CFPair cf) {
            this.sessionid = sessionid;
            this.endpoint = endpoint;
            this.cf = cf;
            this.range = range;
        }

        public final int hashCode() {
            return Objects.hashCode((Object[])new Object[]{this.sessionid, this.endpoint, this.cf, this.range});
        }

        public final boolean equals(Object o) {
            if (!(o instanceof TreeRequest)) {
                return false;
            }
            TreeRequest that = (TreeRequest)o;
            return Objects.equal((Object)this.sessionid, (Object)that.sessionid) && Objects.equal((Object)this.endpoint, (Object)that.endpoint) && Objects.equal((Object)this.cf, (Object)that.cf) && Objects.equal((Object)this.range, (Object)that.range);
        }

        public String toString() {
            return "#<TreeRequest " + this.sessionid + ", " + this.endpoint + ", " + this.cf + ", " + this.range + ">";
        }
    }

    static class TreePair
    extends Pair<MerkleTree, MerkleTree> {
        public TreePair(MerkleTree local, MerkleTree remote) {
            super(local, remote);
        }
    }

    static class CFPair
    extends Pair<String, String> {
        public CFPair(String table, String cf) {
            super(table, cf);
            assert (table != null && cf != null);
        }
    }

    public static class TreeResponseVerbHandler
    implements IVerbHandler,
    ICompactSerializer<Validator> {
        public static final TreeResponseVerbHandler SERIALIZER = new TreeResponseVerbHandler();

        static Message makeVerb(InetAddress local, Validator validator) {
            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                SERIALIZER.serialize(validator, dos, (int)Gossiper.instance.getVersion(validator.request.endpoint));
                return new Message(local, StorageService.Verb.TREE_RESPONSE, bos.toByteArray(), Gossiper.instance.getVersion(validator.request.endpoint));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void serialize(Validator v, DataOutputStream dos, int version) throws IOException {
            TreeRequestVerbHandler.SERIALIZER.serialize(v.request, dos, version);
            MerkleTree.serializer.serialize(v.tree, dos, version);
            dos.flush();
        }

        @Override
        public Validator deserialize(DataInputStream dis, int version) throws IOException {
            TreeRequest request = TreeRequestVerbHandler.SERIALIZER.deserialize(dis, version);
            try {
                return new Validator(request, MerkleTree.serializer.deserialize(dis, version));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void doVerb(Message message, String id) {
            byte[] bytes = message.getMessageBody();
            DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
            try {
                Validator response = this.deserialize(buffer, message.getVersion());
                TreeRequest request = new TreeRequest(response.request.sessionid, message.getFrom(), response.request.range, response.request.cf);
                instance.rendezvous(request, response.tree);
            }
            catch (IOException e) {
                throw new IOError(e);
            }
        }
    }

    public static class TreeRequestVerbHandler
    implements IVerbHandler,
    ICompactSerializer<TreeRequest> {
        public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();

        static Message makeVerb(TreeRequest request, int version) {
            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                SERIALIZER.serialize(request, dos, version);
                return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void serialize(TreeRequest request, DataOutputStream dos, int version) throws IOException {
            dos.writeUTF(request.sessionid);
            CompactEndpointSerializationHelper.serialize(request.endpoint, dos);
            dos.writeUTF((String)request.cf.left);
            dos.writeUTF((String)request.cf.right);
            if (version > 1) {
                AbstractBounds.serializer().serialize(request.range, dos);
            }
        }

        @Override
        public TreeRequest deserialize(DataInputStream dis, int version) throws IOException {
            String sessId = dis.readUTF();
            InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(dis);
            CFPair cfpair = new CFPair(dis.readUTF(), dis.readUTF());
            Range range = version > 1 ? (Range)AbstractBounds.serializer().deserialize(dis) : new Range((Token)StorageService.getPartitioner().getMinimumToken(), (Token)StorageService.getPartitioner().getMinimumToken());
            return new TreeRequest(sessId, endpoint, range, cfpair);
        }

        @Override
        public void doVerb(Message message, String id) {
            byte[] bytes = message.getMessageBody();
            DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
            try {
                TreeRequest remotereq = this.deserialize(buffer, message.getVersion());
                TreeRequest request = new TreeRequest(remotereq.sessionid, message.getFrom(), remotereq.range, remotereq.cf);
                ColumnFamilyStore store = Table.open((String)request.cf.left).getColumnFamilyStore((String)request.cf.right);
                Validator validator = new Validator(request);
                logger.debug("Queueing validation compaction for " + request);
                CompactionManager.instance.submitValidation(store, validator);
            }
            catch (IOException e) {
                throw new IOError(e);
            }
        }
    }

    public static class Validator
    implements Runnable {
        public final TreeRequest request;
        public final MerkleTree tree;
        private transient List<MerkleTree.RowHash> minrows;
        private transient long validated;
        private transient MerkleTree.TreeRange range;
        private transient MerkleTree.TreeRangeIterator ranges;
        private transient DecoratedKey lastKey;
        public static final MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);

        Validator(TreeRequest request) {
            this(request, new MerkleTree(DatabaseDescriptor.getPartitioner(), request.range, 126, (int)Math.pow(2.0, 15.0)));
        }

        Validator(TreeRequest request, MerkleTree tree) {
            this.request = request;
            this.tree = tree;
            this.tree.fullRange = this.request.range;
            this.minrows = new ArrayList<MerkleTree.RowHash>();
            this.validated = 0L;
            this.range = null;
            this.ranges = null;
        }

        public void prepare(ColumnFamilyStore cfs) {
            if (this.tree.partitioner() instanceof RandomPartitioner) {
                this.tree.init();
            } else {
                ArrayList<DecoratedKey> keys = new ArrayList<DecoratedKey>();
                for (DecoratedKey sample : cfs.keySamples(this.request.range)) {
                    assert (this.request.range.contains((Token)sample.token)) : "Token " + sample.token + " is not within range " + this.request.range;
                    keys.add(sample);
                }
                if (keys.isEmpty()) {
                    this.tree.init();
                } else {
                    DecoratedKey dk;
                    int numkeys = keys.size();
                    Random random = new Random();
                    do {
                        dk = (DecoratedKey)keys.get(random.nextInt(numkeys));
                    } while (this.tree.split((Token)dk.token));
                }
            }
            logger.debug("Prepared AEService tree of size " + this.tree.size() + " for " + this.request);
            this.ranges = this.tree.invalids();
        }

        public void add(AbstractCompactedRow row) {
            assert (this.request.range.contains((Token)row.key.token)) : row.key.token + " is not contained in " + this.request.range;
            assert (this.lastKey == null || this.lastKey.compareTo(row.key) < 0) : "row " + row.key + " received out of order wrt " + this.lastKey;
            this.lastKey = row.key;
            if (this.range == null) {
                this.range = (MerkleTree.TreeRange)this.ranges.next();
            }
            while (!this.range.contains((Token)row.key.token)) {
                this.range.addHash(EMPTY_ROW);
                this.range = (MerkleTree.TreeRange)this.ranges.next();
            }
            this.range.addHash(this.rowHash(row));
        }

        private MerkleTree.RowHash rowHash(AbstractCompactedRow row) {
            ++this.validated;
            MessageDigest digest = FBUtilities.newMessageDigest("SHA-256");
            row.update(digest);
            return new MerkleTree.RowHash((Token)row.key.token, digest.digest());
        }

        public void complete() {
            this.completeTree();
            StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
            logger.debug("Validated " + this.validated + " rows into AEService tree for " + this.request);
        }

        void completeTree() {
            assert (this.ranges != null) : "Validator was not prepared()";
            if (this.range != null) {
                this.range.addHash(EMPTY_ROW);
            }
            while (this.ranges.hasNext()) {
                this.range = (MerkleTree.TreeRange)this.ranges.next();
                this.range.addHash(EMPTY_ROW);
            }
        }

        @Override
        public void run() {
            instance.respond(this, FBUtilities.getLocalAddress());
        }
    }
}

