/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.tserver.replication;

import com.google.protobuf.GeneratedMessage;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientExecReturn;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.clientImpl.ReplicationClient;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
import org.apache.accumulo.core.replication.thrift.WalEdits;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.replication.ReplicaSystem;
import org.apache.accumulo.server.replication.ReplicaSystemHelper;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.tserver.log.DfsLogger;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.htrace.impl.ProbabilitySampler;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AccumuloReplicaSystem
implements ReplicaSystem {
    private static final Logger log = LoggerFactory.getLogger(AccumuloReplicaSystem.class);
    private static final String RFILE_SUFFIX = ".rf";
    private String instanceName;
    private String zookeepers;
    private AccumuloConfiguration conf;
    private VolumeManager fs;

    protected void setConf(AccumuloConfiguration conf) {
        this.conf = conf;
    }

    public static String buildConfiguration(String instanceName, String zookeepers) {
        return instanceName + "," + zookeepers;
    }

    public void configure(ServerContext context, String configuration) {
        Objects.requireNonNull(configuration);
        int index = configuration.indexOf(44);
        if (index == -1) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            throw new IllegalArgumentException("Expected comma in configuration string");
        }
        this.instanceName = configuration.substring(0, index);
        this.zookeepers = configuration.substring(index + 1);
        this.conf = context.getConfiguration();
        try {
            this.fs = VolumeManagerImpl.get((AccumuloConfiguration)this.conf, (Configuration)context.getHadoopConf());
        }
        catch (IOException e) {
            log.error("Could not connect to filesystem", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    @SuppressFBWarnings(value={"PATH_TRAVERSAL_IN"}, justification="path provided by admin")
    public Replication.Status replicate(Path p, Replication.Status status, ReplicationTarget target, ReplicaSystemHelper helper) {
        String password;
        File keytab;
        AccumuloConfiguration localConf = this.conf;
        log.debug("Replication RPC timeout is {}", (Object)localConf.get(Property.REPLICATION_RPC_TIMEOUT.getKey()));
        String principal = this.getPrincipal(localConf, target);
        if (localConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
            String keytabPath = this.getKeytab(localConf, target);
            keytab = new File(keytabPath);
            if (!keytab.exists() || !keytab.isFile()) {
                log.error("{} is not a regular file. Cannot login to replicate", (Object)keytabPath);
                return status;
            }
            password = null;
        } else {
            keytab = null;
            password = this.getPassword(localConf, target);
        }
        if (keytab != null) {
            try {
                UserGroupInformation accumuloUgi = UserGroupInformation.getCurrentUser();
                UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI((String)principal, (String)keytab.getAbsolutePath());
                return (Replication.Status)ugi.doAs(() -> {
                    KerberosToken token;
                    try {
                        token = new KerberosToken(principal, keytab);
                    }
                    catch (IOException e) {
                        log.error("Failed to create KerberosToken", (Throwable)e);
                        return status;
                    }
                    ClientContext peerContext = this.getContextForPeer(localConf, target, principal, (AuthenticationToken)token);
                    return this._replicate(p, status, target, helper, localConf, peerContext, accumuloUgi);
                });
            }
            catch (IOException e) {
                log.error("Failed to perform local login", (Throwable)e);
                return status;
            }
        }
        PasswordToken token = new PasswordToken((CharSequence)password);
        ClientContext peerContext = this.getContextForPeer(localConf, target, principal, (AuthenticationToken)token);
        return this._replicate(p, status, target, helper, localConf, peerContext, null);
    }

    private Replication.Status _replicate(Path p, Replication.Status status, ReplicationTarget target, ReplicaSystemHelper helper, AccumuloConfiguration localConf, ClientContext peerContext, UserGroupInformation accumuloUgi) {
        double tracePercent = localConf.getFraction(Property.REPLICATION_TRACE_PERCENT);
        ProbabilitySampler sampler = TraceUtil.probabilitySampler((double)tracePercent);
        try (TraceScope replicaSpan = Trace.startSpan((String)"AccumuloReplicaSystem", (Sampler)sampler);){
            String remoteTableId = target.getRemoteIdentifier();
            int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
            for (int i = 0; i < numAttempts; ++i) {
                String peerTserverStr;
                log.debug("Attempt {}", (Object)i);
                log.debug("Fetching peer tserver address");
                try (TraceScope span = Trace.startSpan((String)"Fetch peer tserver");){
                    peerTserverStr = (String)ReplicationClient.executeCoordinatorWithReturn((ClientContext)peerContext, client -> client.getServicerAddress(remoteTableId, peerContext.rpcCreds()));
                }
                catch (AccumuloException | AccumuloSecurityException e) {
                    log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", (Object)target, (Object)e);
                    continue;
                }
                if (peerTserverStr == null) {
                    log.warn("Did not receive tserver from master at {}, cannot proceed with replication. Will retry.", (Object)target);
                    continue;
                }
                HostAndPort peerTserver = HostAndPort.fromString((String)peerTserverStr);
                long timeout = localConf.getTimeInMillis(Property.REPLICATION_RPC_TIMEOUT);
                long sizeLimit = this.conf.getAsBytes(Property.REPLICATION_MAX_UNIT_SIZE);
                try {
                    Replication.Status finalStatus;
                    TraceScope span;
                    if (p.getName().endsWith(RFILE_SUFFIX)) {
                        span = Trace.startSpan((String)"RFile replication");
                        try {
                            finalStatus = this.replicateRFiles(peerContext, peerTserver, target, p, status, timeout);
                        }
                        finally {
                            if (span != null) {
                                span.close();
                            }
                        }
                    }
                    span = Trace.startSpan((String)"WAL replication");
                    try {
                        finalStatus = this.replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi, timeout);
                    }
                    finally {
                        if (span != null) {
                            span.close();
                        }
                    }
                    log.debug("New status for {} after replicating to {} is {}", new Object[]{p, peerContext.getInstanceName(), ProtobufUtil.toString((GeneratedMessage)finalStatus)});
                    span = finalStatus;
                    return span;
                }
                catch (AccumuloException | AccumuloSecurityException | TTransportException e) {
                    log.warn("Could not connect to remote server {}, will retry", (Object)peerTserverStr, (Object)e);
                    UtilWaitThread.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
                    continue;
                }
            }
            log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", (Object)numAttempts, (Object)p);
            Replication.Status status2 = status;
            return status2;
        }
    }

    protected Replication.Status replicateRFiles(ClientContext peerContext, HostAndPort peerTserver, ReplicationTarget target, Path p, Replication.Status status, long timeout) throws TTransportException, AccumuloException, AccumuloSecurityException {
        Replication.Status lastStatus = status;
        Replication.Status currentStatus = status;
        while (true) {
            ReplicationStats replResult = (ReplicationStats)ReplicationClient.executeServicerWithReturn((ClientContext)peerContext, (HostAndPort)peerTserver, (ClientExecReturn)new RFileClientExecReturn(), (long)timeout);
            long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
            if (newBegin < 0L) {
                newBegin = Long.MAX_VALUE;
            }
            currentStatus = Replication.Status.newBuilder((Replication.Status)currentStatus).setBegin(newBegin).build();
            log.debug("Sent batch for replication of {} to {}, with new Status {}", new Object[]{p, target, ProtobufUtil.toString((GeneratedMessage)currentStatus)});
            if (currentStatus.equals((Object)lastStatus)) break;
            if (!StatusUtil.isWorkRequired((Replication.Status)currentStatus)) {
                return currentStatus;
            }
            lastStatus = currentStatus;
        }
        log.debug("Did not replicate any new data for {} to {}, (state was {})", new Object[]{p, target, ProtobufUtil.toString((GeneratedMessage)lastStatus)});
        return status;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected Replication.Status replicateLogs(ClientContext peerContext, HostAndPort peerTserver, ReplicationTarget target, Path p, Replication.Status status, long sizeLimit, String remoteTableId, TCredentials tcreds, ReplicaSystemHelper helper, UserGroupInformation accumuloUgi, long timeout) throws TTransportException, AccumuloException, AccumuloSecurityException {
        log.debug("Replication WAL to peer tserver");
        try {
            FSDataInputStream fsinput = this.fs.open(p);
            try {
                DataInputStream input = this.getWalStream(p, fsinput);
                try {
                    Replication.Status status2;
                    Set<Integer> tids;
                    log.debug("Skipping unwanted data in WAL");
                    try (TraceScope span = Trace.startSpan((String)"Consume WAL prefix");){
                        if (span.getSpan() != null) {
                            span.getSpan().addKVAnnotation("file", p.toString());
                        }
                        tids = this.consumeWalPrefix(target, input, status);
                    }
                    catch (IOException e) {
                        log.warn("Unexpected error consuming file.");
                        Replication.Status status3 = status;
                        if (input != null) {
                            input.close();
                        }
                        if (fsinput == null) return status3;
                        fsinput.close();
                        return status3;
                    }
                    log.debug("Sending batches of data to peer tserver");
                    Replication.Status lastStatus = status;
                    Replication.Status currentStatus = status;
                    AtomicReference exceptionRef = new AtomicReference();
                    while (true) {
                        ReplicationStats replResult;
                        try (TraceScope span = Trace.startSpan((String)"Replicate WAL batch");){
                            if (span.getSpan() != null) {
                                span.getSpan().addKVAnnotation("Batch size (bytes)", Long.toString(sizeLimit));
                                span.getSpan().addKVAnnotation("File", p.toString());
                                span.getSpan().addKVAnnotation("Peer instance name", peerContext.getInstanceName());
                                span.getSpan().addKVAnnotation("Peer tserver", peerTserver.toString());
                                span.getSpan().addKVAnnotation("Remote table ID", remoteTableId);
                            }
                            replResult = (ReplicationStats)ReplicationClient.executeServicerWithReturn((ClientContext)peerContext, (HostAndPort)peerTserver, (ClientExecReturn)new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids), (long)timeout);
                        }
                        catch (Exception e) {
                            log.error("Caught exception replicating data to {} at {}", new Object[]{peerContext.getInstanceName(), peerTserver, e});
                            throw e;
                        }
                        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
                        if (newBegin < 0L) {
                            newBegin = Long.MAX_VALUE;
                        }
                        currentStatus = Replication.Status.newBuilder((Replication.Status)currentStatus).setBegin(newBegin).build();
                        log.debug("Sent batch for replication of {} to {}, with new Status {}", new Object[]{p, target, ProtobufUtil.toString((GeneratedMessage)currentStatus)});
                        if (currentStatus.equals((Object)lastStatus)) break;
                        try (TraceScope span = Trace.startSpan((String)"Update replication table");){
                            if (accumuloUgi != null) {
                                Replication.Status copy = currentStatus;
                                accumuloUgi.doAs(() -> {
                                    try {
                                        helper.recordNewStatus(p, copy, target);
                                    }
                                    catch (Exception e) {
                                        exceptionRef.set(e);
                                    }
                                    return null;
                                });
                                Exception e = (Exception)exceptionRef.get();
                                if (e != null) {
                                    if (e instanceof TableNotFoundException) {
                                        throw (TableNotFoundException)((Object)e);
                                    }
                                    if (e instanceof AccumuloSecurityException) {
                                        throw (AccumuloSecurityException)((Object)e);
                                    }
                                    if (!(e instanceof AccumuloException)) throw new RuntimeException("Received unexpected exception", e);
                                    throw (AccumuloException)((Object)e);
                                }
                            } else {
                                helper.recordNewStatus(p, currentStatus, target);
                            }
                        }
                        catch (TableNotFoundException e) {
                            log.error("Tried to update status in replication table for {} as {}, but the table did not exist", new Object[]{p, ProtobufUtil.toString((GeneratedMessage)currentStatus), e});
                            throw new RuntimeException("Replication table did not exist, will retry", e);
                        }
                        log.debug("Recorded updated status for {}: {}", (Object)p, (Object)ProtobufUtil.toString((GeneratedMessage)currentStatus));
                        if (!StatusUtil.isWorkRequired((Replication.Status)currentStatus)) {
                            status2 = currentStatus;
                            return status2;
                        }
                        lastStatus = currentStatus;
                    }
                    log.debug("Did not replicate any new data for {} to {}, (state was {})", new Object[]{p, target, ProtobufUtil.toString((GeneratedMessage)lastStatus)});
                    status2 = status;
                    return status2;
                }
                finally {
                    if (input != null) {
                        try {
                            input.close();
                        }
                        catch (Throwable throwable) {
                            Throwable lastStatus;
                            lastStatus.addSuppressed(throwable);
                        }
                    }
                }
            }
            finally {
                if (fsinput != null) {
                    try {
                        fsinput.close();
                    }
                    catch (Throwable lastStatus) {
                        Throwable input;
                        input.addSuppressed(lastStatus);
                    }
                }
            }
        }
        catch (DfsLogger.LogHeaderIncompleteException e) {
            log.warn("Could not read header from {}, assuming that there is no data present in the WAL, therefore replication is complete", (Object)p);
            Replication.Status newStatus = status.getInfiniteEnd() ? Replication.Status.newBuilder((Replication.Status)status).setBegin(Long.MAX_VALUE).build() : Replication.Status.newBuilder((Replication.Status)status).setBegin(status.getEnd()).build();
            try (TraceScope span = Trace.startSpan((String)"Update replication table");){
                helper.recordNewStatus(p, newStatus, target);
                return newStatus;
            }
            catch (TableNotFoundException tnfe) {
                log.error("Tried to update status in replication table for {} as {}, but the table did not exist", new Object[]{p, ProtobufUtil.toString((GeneratedMessage)newStatus), e});
                throw new RuntimeException("Replication table did not exist, will retry", e);
            }
        }
        catch (IOException e) {
            log.error("Could not create stream for WAL", (Throwable)e);
            return status;
        }
    }

    protected String getPassword(AccumuloConfiguration localConf, ReplicationTarget target) {
        Objects.requireNonNull(localConf);
        Objects.requireNonNull(target);
        Map peerPasswords = localConf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_PASSWORD);
        String password = (String)peerPasswords.get(Property.REPLICATION_PEER_PASSWORD.getKey() + target.getPeerName());
        if (password == null) {
            throw new IllegalArgumentException("Cannot get password for " + target.getPeerName());
        }
        return password;
    }

    protected String getKeytab(AccumuloConfiguration localConf, ReplicationTarget target) {
        Objects.requireNonNull(localConf);
        Objects.requireNonNull(target);
        Map peerKeytabs = localConf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_KEYTAB);
        String keytab = (String)peerKeytabs.get(Property.REPLICATION_PEER_KEYTAB.getKey() + target.getPeerName());
        if (keytab == null) {
            throw new IllegalArgumentException("Cannot get keytab for " + target.getPeerName());
        }
        return keytab;
    }

    protected String getPrincipal(AccumuloConfiguration localConf, ReplicationTarget target) {
        Objects.requireNonNull(localConf);
        Objects.requireNonNull(target);
        String peerName = target.getPeerName();
        String userKey = Property.REPLICATION_PEER_USER.getKey() + peerName;
        Map peerUsers = localConf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_USER);
        String user = (String)peerUsers.get(userKey);
        if (user == null) {
            throw new IllegalArgumentException("Cannot get user for " + target.getPeerName());
        }
        return user;
    }

    protected ClientContext getContextForPeer(AccumuloConfiguration localConf, ReplicationTarget target, String principal, AuthenticationToken token) {
        Objects.requireNonNull(localConf);
        Objects.requireNonNull(target);
        Objects.requireNonNull(principal);
        Objects.requireNonNull(token);
        Properties properties = new Properties();
        properties.setProperty(ClientProperty.INSTANCE_NAME.getKey(), this.instanceName);
        properties.setProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), this.zookeepers);
        properties.setProperty(ClientProperty.AUTH_PRINCIPAL.getKey(), principal);
        ClientProperty.setAuthenticationToken((Properties)properties, (AuthenticationToken)token);
        return new ClientContext(ClientInfo.from((Properties)properties, (AuthenticationToken)token), localConf);
    }

    protected Set<Integer> consumeWalPrefix(ReplicationTarget target, DataInputStream wal, Replication.Status status) throws IOException {
        LogFileKey key = new LogFileKey();
        LogFileValue value = new LogFileValue();
        HashSet<Integer> desiredTids = new HashSet<Integer>();
        block3: for (long i = 0L; i < status.getBegin(); ++i) {
            key.readFields(wal);
            value.readFields(wal);
            switch (key.event) {
                case DEFINE_TABLET: {
                    if (!target.getSourceTableId().equals((Object)key.tablet.getTableId())) continue block3;
                    desiredTids.add(key.tabletId);
                    continue block3;
                }
            }
        }
        return desiredTids;
    }

    public DataInputStream getWalStream(Path p, FSDataInputStream input) throws IOException {
        try (TraceScope span = Trace.startSpan((String)"Read WAL header");){
            if (span.getSpan() != null) {
                span.getSpan().addKVAnnotation("file", p.toString());
            }
            DfsLogger.DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(input, this.conf);
            DataInputStream dataInputStream = streams.getDecryptingInputStream();
            return dataInputStream;
        }
    }

    protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Replication.Status status, long sizeLimit, Set<Integer> desiredTids) throws IOException {
        WalEdits edits = new WalEdits();
        edits.edits = new ArrayList();
        long size = 0L;
        long entriesConsumed = 0L;
        long numUpdates = 0L;
        LogFileKey key = new LogFileKey();
        LogFileValue value = new LogFileValue();
        block6: while (size < sizeLimit) {
            try {
                key.readFields(wal);
                value.readFields(wal);
            }
            catch (EOFException e) {
                log.debug("Caught EOFException reading {}", (Object)p);
                if (!status.getInfiniteEnd() || !status.getClosed()) break;
                log.debug("{} is closed and has unknown length, assuming entire file has been consumed", (Object)p);
                entriesConsumed = Long.MAX_VALUE;
                break;
            }
            ++entriesConsumed;
            switch (key.event) {
                case DEFINE_TABLET: {
                    if (!target.getSourceTableId().equals((Object)key.tablet.getTableId())) continue block6;
                    desiredTids.add(key.tabletId);
                    continue block6;
                }
                case MUTATION: 
                case MANY_MUTATIONS: {
                    if (!desiredTids.contains(key.tabletId)) continue block6;
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    DataOutputStream out = new DataOutputStream(baos);
                    key.write(out);
                    numUpdates += this.writeValueAvoidingReplicationCycles(out, value, target);
                    out.flush();
                    byte[] data = baos.toByteArray();
                    size += (long)data.length;
                    edits.addToEdits(ByteBuffer.wrap(data));
                    continue block6;
                }
            }
            log.trace("Ignorning WAL entry which doesn't contain mutations, should not have received such entries");
        }
        return new WalReplication(edits, size, entriesConsumed, numUpdates);
    }

    protected long writeValueAvoidingReplicationCycles(DataOutputStream out, LogFileValue value, ReplicationTarget target) throws IOException {
        String name;
        int mutationsToSend = 0;
        for (Mutation m : value.mutations) {
            if (m.getReplicationSources().contains(target.getPeerName())) continue;
            ++mutationsToSend;
        }
        int mutationsRemoved = value.mutations.size() - mutationsToSend;
        if (mutationsRemoved > 0) {
            log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", (Object)mutationsRemoved, (Object)target.getPeerName());
        }
        if (StringUtils.isBlank((CharSequence)(name = this.conf.get(Property.REPLICATION_NAME)))) {
            throw new IllegalArgumentException("Local system has no replication name configured");
        }
        out.writeInt(mutationsToSend);
        for (Mutation m : value.mutations) {
            if (m.getReplicationSources().contains(target.getPeerName())) continue;
            m.addReplicationSource(name);
            m.write((DataOutput)out);
        }
        return mutationsToSend;
    }

    public static class WalReplication
    extends ReplicationStats {
        public WalEdits walEdits;
        public long numUpdates;

        public WalReplication(WalEdits edits, long size, long entriesConsumed, long numMutations) {
            super(size, edits.getEditsSize(), entriesConsumed);
            this.walEdits = edits;
            this.numUpdates = numMutations;
        }

        @Override
        public int hashCode() {
            return super.hashCode() + Objects.hashCode(this.walEdits) + Objects.hashCode(this.numUpdates);
        }

        @Override
        public boolean equals(Object o) {
            if (o instanceof WalReplication) {
                WalReplication other = (WalReplication)o;
                return super.equals(other) && this.walEdits.equals(other.walEdits) && this.numUpdates == other.numUpdates;
            }
            return false;
        }
    }

    public static class ReplicationStats {
        public long sizeInBytes;
        public long sizeInRecords;
        public long entriesConsumed;

        public ReplicationStats(long sizeInBytes, long sizeInRecords, long entriesConsumed) {
            this.sizeInBytes = sizeInBytes;
            this.sizeInRecords = sizeInRecords;
            this.entriesConsumed = entriesConsumed;
        }

        public int hashCode() {
            return Objects.hashCode(this.sizeInBytes + this.sizeInRecords + this.entriesConsumed);
        }

        public boolean equals(Object o) {
            if (o != null && ReplicationStats.class.isAssignableFrom(o.getClass())) {
                ReplicationStats other = (ReplicationStats)o;
                return this.sizeInBytes == other.sizeInBytes && this.sizeInRecords == other.sizeInRecords && this.entriesConsumed == other.entriesConsumed;
            }
            return false;
        }
    }

    protected class RFileClientExecReturn
    implements ClientExecReturn<ReplicationStats, ReplicationServicer.Client> {
        protected RFileClientExecReturn() {
        }

        public ReplicationStats execute(ReplicationServicer.Client client) {
            return new ReplicationStats(0L, 0L, 0L);
        }
    }

    protected class WalClientExecReturn
    implements ClientExecReturn<ReplicationStats, ReplicationServicer.Client> {
        private ReplicationTarget target;
        private DataInputStream input;
        private Path p;
        private Replication.Status status;
        private long sizeLimit;
        private String remoteTableId;
        private TCredentials tcreds;
        private Set<Integer> tids;

        public WalClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Replication.Status status, long sizeLimit, String remoteTableId, TCredentials tcreds, Set<Integer> tids) {
            this.target = target;
            this.input = input;
            this.p = p;
            this.status = status;
            this.sizeLimit = sizeLimit;
            this.remoteTableId = remoteTableId;
            this.tcreds = tcreds;
            this.tids = tids;
        }

        public ReplicationStats execute(ReplicationServicer.Client client) throws Exception {
            WalReplication edits = AccumuloReplicaSystem.this.getWalEdits(this.target, this.input, this.p, this.status, this.sizeLimit, this.tids);
            log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", new Object[]{edits.entriesConsumed == Long.MAX_VALUE ? "all remaining" : Long.valueOf(edits.entriesConsumed), edits.sizeInBytes, this.p});
            if (edits.walEdits.getEditsSize() > 0) {
                log.debug("Sending {} edits", (Object)edits.walEdits.getEditsSize());
                long entriesReplicated = client.replicateLog(this.remoteTableId, edits.walEdits, this.tcreds);
                if (entriesReplicated != edits.numUpdates) {
                    log.warn("Sent {} WAL entries for replication but {} were reported as replicated", (Object)edits.numUpdates, (Object)entriesReplicated);
                } else {
                    log.debug("Replicated {} edits", (Object)entriesReplicated);
                }
                return edits;
            }
            if (edits.entriesConsumed > 0L) {
                return edits;
            }
            return new ReplicationStats(0L, 0L, 0L);
        }
    }
}

