/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.config.Configuration;
import org.infinispan.config.GlobalConfiguration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.distribution.DataLocality;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.InvertedLeaveTask;
import org.infinispan.distribution.JoinTask;
import org.infinispan.distribution.RemoteTransactionLogger;
import org.infinispan.distribution.TransactionLogger;
import org.infinispan.distribution.TransactionLoggerImpl;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashHelper;
import org.infinispan.distribution.ch.NodeTopologyInfo;
import org.infinispan.distribution.ch.TopologyInfo;
import org.infinispan.distribution.ch.UnionConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.InboundInvocationHandler;
import org.infinispan.remoting.MembershipArithmetic;
import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.rhq.helpers.pluginAnnotations.agent.DataType;
import org.rhq.helpers.pluginAnnotations.agent.Metric;
import org.rhq.helpers.pluginAnnotations.agent.Operation;
import org.rhq.helpers.pluginAnnotations.agent.Parameter;

@MBean(objectName="DistributionManager", description="Component that handles distribution of content across a cluster")
public class DistributionManagerImpl
implements DistributionManager {
    private static final Log log = LogFactory.getLog(DistributionManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private Configuration configuration;
    private volatile ConsistentHash consistentHash;
    private volatile ConsistentHash oldConsistentHash;
    private Address self;
    private CacheLoaderManager cacheLoaderManager;
    RpcManager rpcManager;
    private CacheManagerNotifier notifier;
    private ViewChangeListener listener;
    private CommandsFactory cf;
    private final ExecutorService rehashExecutor;
    private TransactionLogger transactionLogger;
    TopologyInfo topologyInfo = new TopologyInfo();
    volatile boolean rehashInProgress = false;
    private final ReentrantReadWriteLock chSwitchLock = new ReentrantReadWriteLock(true);
    private volatile Address joiner;
    private static final AtomicReferenceFieldUpdater<DistributionManagerImpl, Address> JOINER_CAS = AtomicReferenceFieldUpdater.newUpdater(DistributionManagerImpl.class, Address.class, "joiner");
    private DataContainer dataContainer;
    private InterceptorChain interceptorChain;
    private InvocationContextContainer icc;
    @ManagedAttribute(description="If true, the node has successfully joined the grid and is considered to hold state.  If false, the join process is still in progress.")
    @Metric(displayName="Is join completed?", dataType=DataType.TRAIT)
    private volatile boolean joinComplete = false;
    private Future<Void> joinFuture;
    final List<Address> leavers = new CopyOnWriteArrayList<Address>();
    private volatile Future<Void> leaveTaskFuture;
    private final ReclosableLatch startLatch = new ReclosableLatch(false);
    private final Lock leaveAcksLock = new ReentrantLock();
    private final Condition acksArrived = this.leaveAcksLock.newCondition();
    private final Set<Address> leaveRehashAcks = new CopyOnWriteArraySet<Address>();
    final CountDownLatch finalJoinPhaseLatch = new CountDownLatch(1);
    volatile boolean enteredFinalJoinPhase = false;
    InboundInvocationHandler inboundInvocationHandler;

    public DistributionManagerImpl() {
        LinkedBlockingQueue<Runnable> rehashQueue = new LinkedBlockingQueue<Runnable>();
        ThreadFactory tf = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setPriority(1);
                t.setName("Rehasher-" + DistributionManagerImpl.this.rpcManager.getTransport().getAddress());
                return t;
            }
        };
        this.rehashExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, rehashQueue, tf);
    }

    @Inject
    public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf, DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer icc, CacheLoaderManager cacheLoaderManager, InboundInvocationHandler inboundInvocationHandler) {
        this.cacheLoaderManager = cacheLoaderManager;
        this.configuration = configuration;
        this.rpcManager = rpcManager;
        this.notifier = notifier;
        this.cf = cf;
        this.transactionLogger = new TransactionLoggerImpl(cf);
        this.dataContainer = dataContainer;
        this.interceptorChain = interceptorChain;
        this.icc = icc;
        this.inboundInvocationHandler = inboundInvocationHandler;
    }

    @Start(priority=20)
    public void start() throws Exception {
        if (trace) {
            log.trace("Starting distribution manager on " + this.getMyAddress());
        }
        this.listener = new ViewChangeListener();
        this.notifier.addListener(this.listener);
        GlobalConfiguration gc = this.configuration.getGlobalConfiguration();
        if (gc.hasTopologyInfo()) {
            Address address = this.rpcManager.getTransport().getAddress();
            NodeTopologyInfo nti = new NodeTopologyInfo(gc.getMachineId(), gc.getRackId(), gc.getSiteId(), address);
            this.topologyInfo.addNodeTopologyInfo(address, nti);
        }
        this.join();
    }

    private int getReplCount() {
        return this.configuration.getNumOwners();
    }

    private Address getMyAddress() {
        return this.rpcManager != null ? this.rpcManager.getAddress() : null;
    }

    @Start(priority=1000)
    public void waitForJoinToComplete() throws Throwable {
        if (this.joinFuture != null) {
            try {
                this.joinFuture.get();
            }
            catch (InterruptedException e) {
                throw e;
            }
            catch (ExecutionException e) {
                if (e.getCause() != null) {
                    throw e.getCause();
                }
                throw e;
            }
        }
    }

    private void join() throws Exception {
        this.startLatch.close();
        this.setJoinComplete(false);
        Transport t = this.rpcManager.getTransport();
        List<Address> members = t.getMembers();
        this.consistentHash = ConsistentHashHelper.createConsistentHash(this.configuration, members, this.topologyInfo);
        this.self = t.getAddress();
        if (members.size() > 1 && !t.getCoordinator().equals(this.self)) {
            JoinTask joinTask = new JoinTask(this.rpcManager, this.cf, this.configuration, this.dataContainer, this, this.inboundInvocationHandler);
            this.joinFuture = this.rehashExecutor.submit(joinTask);
        } else {
            this.setJoinComplete(true);
        }
        this.startLatch.open();
    }

    @Stop(priority=20)
    public void stop() {
        this.notifier.removeListener(this.listener);
        this.rehashExecutor.shutdownNow();
        this.setJoinComplete(false);
    }

    public void rehash(List<Address> newMembers, List<Address> oldMembers) {
        boolean join = oldMembers.size() < newMembers.size();
        log.info((Object)"Detected a view change.  Member list changed from %s to %s", oldMembers, newMembers);
        if (join) {
            Address joiner = MembershipArithmetic.getMemberJoined(oldMembers, newMembers);
            log.info("This is a JOIN event!  Wait for notification from new joiner " + joiner);
        } else {
            Address leaver = MembershipArithmetic.getMemberLeft(oldMembers, newMembers);
            log.info((Object)"This is a LEAVE event!  Node %s has just left", leaver);
            try {
                this.oldConsistentHash = !(this.consistentHash instanceof UnionConsistentHash) ? this.consistentHash : ((UnionConsistentHash)this.consistentHash).getNewConsistentHash();
                this.addLeaverAndUpdatedConsistentHash(leaver);
            }
            catch (Exception e) {
                log.fatal((Object)"Unable to process leaver!!", e);
                throw new CacheException(e);
            }
            List<Address> stateProviders = this.holdersOfLeaversState(leaver);
            List<Address> receiversOfLeaverState = this.receiversOfLeaverState(stateProviders);
            boolean willReceiveLeaverState = receiversOfLeaverState.contains(this.self);
            boolean willProvideState = stateProviders.contains(this.self);
            if (willReceiveLeaverState || willProvideState) {
                log.info((Object)"I %s am participating in rehash, state providers %s, state receivers %s", this.rpcManager.getTransport().getAddress(), stateProviders, receiversOfLeaverState);
                this.transactionLogger.enable();
                if (!(this.leaveTaskFuture == null || this.leaveTaskFuture.isCancelled() && this.leaveTaskFuture.isDone())) {
                    if (trace) {
                        log.trace("Canceling running leave task!");
                    }
                    this.leaveTaskFuture.cancel(true);
                }
                InvertedLeaveTask task = new InvertedLeaveTask(this, this.rpcManager, this.configuration, this.cf, this.dataContainer, stateProviders, receiversOfLeaverState, willReceiveLeaverState);
                this.leaveTaskFuture = this.rehashExecutor.submit(task);
            } else {
                log.info("Not in same subspace, so ignoring leave event");
                this.topologyInfo.removeNodeInfo(leaver);
                this.removeLeaver(leaver);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Address> getLeavers() {
        this.chSwitchLock.readLock().lock();
        try {
            List<Address> list = Collections.unmodifiableList(this.leavers);
            return list;
        }
        finally {
            this.chSwitchLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addLeaverAndUpdatedConsistentHash(Address leaver) {
        this.chSwitchLock.writeLock().lock();
        try {
            this.leavers.add(leaver);
            if (trace) {
                log.trace((Object)"Added new leaver %s, leavers list is %s", leaver, this.leavers);
            }
            this.consistentHash = ConsistentHashHelper.removeAddress(this.consistentHash, leaver, this.configuration, this.topologyInfo);
        }
        finally {
            this.chSwitchLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean removeLeaver(Address leaver) {
        boolean bl;
        block3: {
            this.chSwitchLock.writeLock().lock();
            try {
                bl = this.leavers.remove(leaver);
                if (!trace) break block3;
            }
            catch (Throwable throwable) {
                if (trace) {
                    log.trace((Object)"After removing leaver[ %s ] leavers list is %s", leaver, this.leavers);
                }
                this.chSwitchLock.writeLock().unlock();
                throw throwable;
            }
            log.trace((Object)"After removing leaver[ %s ] leavers list is %s", leaver, this.leavers);
        }
        this.chSwitchLock.writeLock().unlock();
        return bl;
    }

    List<Address> holdersOfLeaversState(Address leaver) {
        ArrayList<Address> result = new ArrayList<Address>();
        for (Address addr : this.oldConsistentHash.getCaches()) {
            List<Address> backups = this.oldConsistentHash.getBackupsForNode(addr, this.getReplCount());
            if (addr.equals(leaver)) {
                if (backups.size() <= 1) continue;
                Address mainBackup = backups.get(1);
                result.add(mainBackup);
                if (!trace) continue;
                log.trace((Object)"Leaver %s main backup %s is looking for another backup as well.", leaver, mainBackup);
                continue;
            }
            if (!backups.contains(leaver)) continue;
            if (trace) {
                log.trace((Object)"%s is looking for a new backup to replace leaver %s", addr, leaver);
            }
            result.add(addr);
        }
        if (trace) {
            log.trace((Object)"Nodes that need new backups are: %s", result);
        }
        return result;
    }

    List<Address> receiversOfLeaverState(List<Address> stateProviders) {
        ArrayList<Address> result = new ArrayList<Address>();
        for (Address addr : stateProviders) {
            List<Address> addressList = this.consistentHash.getBackupsForNode(addr, this.getReplCount());
            result.add(addressList.get(addressList.size() - 1));
        }
        if (trace) {
            log.trace("This node won't receive state");
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DataLocality getLocality(Object key) {
        this.chSwitchLock.readLock().lock();
        try {
            if (this.consistentHash == null) {
                DataLocality dataLocality = DataLocality.LOCAL;
                return dataLocality;
            }
            boolean local = this.consistentHash.isKeyLocalToAddress(this.self, key, this.getReplCount());
            if (this.isRehashInProgress()) {
                if (local) {
                    DataLocality dataLocality = DataLocality.LOCAL_UNCERTAIN;
                    return dataLocality;
                }
                DataLocality dataLocality = DataLocality.NOT_LOCAL_UNCERTAIN;
                return dataLocality;
            }
            if (local) {
                DataLocality dataLocality = DataLocality.LOCAL;
                return dataLocality;
            }
            DataLocality dataLocality = DataLocality.NOT_LOCAL;
            return dataLocality;
        }
        finally {
            this.chSwitchLock.readLock().unlock();
        }
    }

    @Override
    public List<Address> locate(Object key) {
        if (this.consistentHash == null) {
            return Collections.singletonList(this.self);
        }
        return this.consistentHash.locate(key, this.getReplCount());
    }

    @Override
    public Map<Object, List<Address>> locateAll(Collection<Object> keys) {
        return this.locateAll(keys, this.getReplCount());
    }

    @Override
    public Map<Object, List<Address>> locateAll(Collection<Object> keys, int numOwners) {
        if (this.consistentHash == null) {
            HashMap<Object, List<Address>> m = new HashMap<Object, List<Address>>(keys.size());
            List<Address> selfList = Collections.singletonList(this.self);
            for (Object k : keys) {
                m.put(k, selfList);
            }
            return m;
        }
        return this.consistentHash.locateAll(keys, numOwners);
    }

    @Override
    public void transformForL1(CacheEntry entry) {
        if (entry.getLifespan() < 0L || entry.getLifespan() > this.configuration.getL1Lifespan()) {
            entry.setLifespan(this.configuration.getL1Lifespan());
        }
    }

    @Override
    public InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx) throws Exception {
        ClusteredGetCommand get = this.cf.buildClusteredGetCommand(key, ctx.getFlags());
        ClusteredGetResponseValidityFilter filter = new ClusteredGetResponseValidityFilter(this.locate(key));
        Map<Address, Response> responses = this.rpcManager.invokeRemotely(this.locate(key), get, ResponseMode.SYNCHRONOUS, this.configuration.getSyncReplTimeout(), false, filter);
        if (!responses.isEmpty()) {
            for (Response r : responses.values()) {
                if (!(r instanceof SuccessfulResponse)) continue;
                InternalCacheValue cacheValue = (InternalCacheValue)((SuccessfulResponse)r).getResponseValue();
                return cacheValue.toInternalCacheEntry(key);
            }
        }
        return null;
    }

    @Override
    public ConsistentHash getConsistentHash() {
        return this.consistentHash;
    }

    @Override
    public void setConsistentHash(ConsistentHash consistentHash) {
        if (trace) {
            log.trace((Object)"Installing new consistent hash %s", consistentHash);
        }
        this.consistentHash = consistentHash;
    }

    public void setOldConsistentHash(ConsistentHash oldConsistentHash) {
        this.oldConsistentHash = oldConsistentHash;
    }

    @Override
    @ManagedOperation(description="Determines whether a given key is affected by an ongoing rehash, if any.")
    @Operation(displayName="Could key be affected by rehash?")
    public boolean isAffectedByRehash(@Parameter(name="key", description="Key to check") Object key) {
        return this.transactionLogger.isEnabled() && this.oldConsistentHash != null && !this.oldConsistentHash.locate(key, this.getReplCount()).contains(this.self);
    }

    @Override
    public TransactionLogger getTransactionLogger() {
        return this.transactionLogger;
    }

    @Override
    public List<Address> requestPermissionToJoin(Address a) {
        if (JOINER_CAS.compareAndSet(this, null, a)) {
            if (trace) {
                log.trace((Object)"Allowing %s to join", a);
            }
            return new LinkedList<Address>(this.consistentHash.getCaches());
        }
        if (trace) {
            log.trace((Object)"Not alowing %s to join since there is a join already in progress for node %s", a, this.joiner);
        }
        return null;
    }

    @Override
    public NodeTopologyInfo informRehashOnJoin(Address a, boolean starting, NodeTopologyInfo nodeTopologyInfo) {
        if (trace) {
            log.trace((Object)"Informed of a JOIN by %s.  Starting? %s", a, starting);
        }
        if (!starting) {
            if (this.consistentHash instanceof UnionConsistentHash) {
                UnionConsistentHash uch = (UnionConsistentHash)this.consistentHash;
                this.consistentHash = uch.getNewConsistentHash();
                this.oldConsistentHash = null;
            }
            this.joiner = null;
        } else {
            ConsistentHash chOld;
            this.topologyInfo.addNodeTopologyInfo(a, nodeTopologyInfo);
            if (trace) {
                log.trace((Object)"Node topology info added(%s).  Topology info is %s", nodeTopologyInfo, this.topologyInfo);
            }
            if ((chOld = this.consistentHash) instanceof UnionConsistentHash) {
                throw new RuntimeException("Not expecting a union CH!");
            }
            this.oldConsistentHash = chOld;
            this.joiner = a;
            ConsistentHash chNew = ConsistentHashHelper.createConsistentHash(this.configuration, chOld.getCaches(), this.topologyInfo, a);
            this.consistentHash = new UnionConsistentHash(chOld, chNew);
        }
        if (trace) {
            log.trace((Object)"New CH is %s", this.consistentHash);
        }
        return this.topologyInfo.getNodeTopologyInfo(this.rpcManager.getAddress());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void informRehashOnLeave(Address sender) {
        this.leaveAcksLock.lock();
        try {
            this.leaveRehashAcks.add(sender);
            if (trace) {
                log.trace((Object)"%s has been informed that %s has completed applying state sent from %s as a part of a LEAVE_REHASH.", this.self, sender, this.self);
            }
            this.acksArrived.signalAll();
        }
        finally {
            this.leaveAcksLock.unlock();
        }
    }

    private Map<Object, InternalCacheValue> applyStateMap(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state, boolean withRetry) {
        HashMap<Object, InternalCacheValue> retry = withRetry ? new HashMap<Object, InternalCacheValue>() : null;
        for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
            if (!consistentHash.locate(e.getKey(), this.configuration.getNumOwners()).contains(this.self)) continue;
            InternalCacheValue v = e.getValue();
            InvocationContext ctx = this.icc.createInvocationContext();
            ctx.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_SHARED_CACHE_STORE, Flag.SKIP_LOCKING);
            try {
                PutKeyValueCommand put = this.cf.buildPutKeyValueCommand(e.getKey(), v.getValue(), v.getLifespan(), v.getMaxIdle(), ctx.getFlags());
                this.interceptorChain.invoke(ctx, put);
            }
            catch (Exception ee) {
                if (withRetry) {
                    if (trace) {
                        log.trace((Object)"Problem %s encountered when applying state for key %s. Adding entry to retry queue.", ee.getMessage(), e.getKey());
                    }
                    retry.put(e.getKey(), e.getValue());
                    continue;
                }
                log.warn((Object)"Problem %s encountered when applying state for key %s!", ee.getMessage(), e.getKey());
            }
        }
        return retry;
    }

    @Override
    public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state, RemoteTransactionLogger tlog, boolean forLeave) {
        if (trace) {
            log.trace((Object)"Applying the following keys: %s", state.keySet());
        }
        int retryCount = 3;
        Map<Object, InternalCacheValue> pendingApplications = state;
        for (int i = 0; i < retryCount && !(pendingApplications = this.applyStateMap(consistentHash, pendingApplications, true)).isEmpty(); ++i) {
        }
        if (!pendingApplications.isEmpty()) {
            this.applyStateMap(consistentHash, pendingApplications, false);
        }
        if (!forLeave) {
            this.drainLocalTransactionLog(tlog);
        }
        if (trace) {
            log.trace((Object)"%s has completed applying state", this.self);
        }
    }

    @Override
    public void setRehashInProgress(boolean value) {
        this.rehashInProgress = value;
    }

    @Override
    public CacheStore getCacheStoreForRehashing() {
        if (this.cacheLoaderManager == null || !this.cacheLoaderManager.isEnabled() || this.cacheLoaderManager.isShared()) {
            return null;
        }
        return this.cacheLoaderManager.getCacheStore();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @ManagedAttribute(description="Checks whether the node is involved in a rehash.")
    @Metric(displayName="Is rehash in progress?", dataType=DataType.TRAIT)
    public boolean isRehashInProgress() {
        boolean nodeLeaving;
        this.chSwitchLock.readLock().lock();
        try {
            boolean bl = nodeLeaving = !this.leavers.isEmpty();
            if (trace) {
                log.trace((Object)"Node leaving? %s RehashInProgress? %s Leavers = %s", nodeLeaving, this.rehashInProgress, this.leavers);
            }
        }
        finally {
            this.chSwitchLock.readLock().unlock();
        }
        return nodeLeaving || this.rehashInProgress;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void markLeaverAsHandled(Address leaver) {
        this.chSwitchLock.writeLock().lock();
        try {
            this.leavers.remove(leaver);
            this.topologyInfo.removeNodeInfo(leaver);
        }
        finally {
            this.chSwitchLock.writeLock().unlock();
        }
    }

    @Override
    public boolean isJoinComplete() {
        return this.joinComplete;
    }

    @Override
    public void waitForFinalJoin() {
        if (this.enteredFinalJoinPhase) {
            try {
                this.finalJoinPhaseLatch.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public boolean isInFinalJoinPhase() {
        return this.enteredFinalJoinPhase;
    }

    @Override
    public void setJoinComplete(boolean joinComplete) {
        this.joinComplete = joinComplete;
        if (joinComplete) {
            this.finalJoinPhaseLatch.countDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void drainLocalTransactionLog(RemoteTransactionLogger tlog) {
        List<WriteCommand> c;
        while (tlog.shouldDrainWithoutLock()) {
            c = tlog.drain();
            if (trace) {
                log.trace((Object)"Draining %s entries from transaction log", c.size());
            }
            this.applyRemoteTxLog(c);
        }
        boolean unlock = this.acquireDistSyncLock();
        try {
            this.enteredFinalJoinPhase = true;
            c = tlog.drainAndLock(null);
            if (trace) {
                log.trace((Object)"Locked and draining %s entries from transaction log", c.size());
            }
            this.applyRemoteTxLog(c);
            Collection<PrepareCommand> pendingPrepares = tlog.getPendingPrepares();
            if (trace) {
                log.trace((Object)"Applying %s pending prepares", pendingPrepares.size());
            }
            for (PrepareCommand pc : pendingPrepares) {
                this.cf.initializeReplicableCommand(pc, true);
                try {
                    pc.perform(null);
                }
                catch (Throwable throwable) {
                    log.warn((Object)("Unable to apply prepare " + pc), throwable);
                }
            }
        }
        finally {
            tlog.unlockAndDisable(null);
            if (unlock) {
                this.rpcManager.getTransport().getDistributedSync().releaseProcessingLock(true);
            }
        }
    }

    private boolean acquireDistSyncLock() {
        try {
            this.rpcManager.getTransport().getDistributedSync().acquireProcessingLock(true, 100L, TimeUnit.DAYS);
            return true;
        }
        catch (TimeoutException e) {
            log.info("Couldn't acquire shared lock");
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

    @Override
    public List<Address> getAffectedNodes(Set<Object> affectedKeys) {
        if (affectedKeys == null || affectedKeys.isEmpty()) {
            if (log.isTraceEnabled()) {
                log.trace("Affected keys are empty");
            }
            return Collections.emptyList();
        }
        HashSet<Address> an = new HashSet<Address>();
        for (List<Address> addresses : this.locateAll(affectedKeys).values()) {
            an.addAll(addresses);
        }
        return new ArrayList<Address>(an);
    }

    @Override
    public void applyRemoteTxLog(List<WriteCommand> commands) {
        for (WriteCommand cmd : commands) {
            try {
                this.cf.initializeReplicableCommand(cmd, true);
                InvocationContext ctx = this.icc.createInvocationContext();
                ctx.setFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.CACHE_MODE_LOCAL, Flag.SKIP_SHARED_CACHE_STORE, Flag.SKIP_LOCKING);
                this.interceptorChain.invoke(ctx, cmd);
            }
            catch (Exception e) {
                log.warn((Object)"Caught exception replaying %s", e, cmd);
            }
        }
    }

    @ManagedOperation(description="Tells you whether a given key is local to this instance of the cache.  Only works with String keys.")
    @Operation(displayName="Is key local?")
    public boolean isLocatedLocally(@Parameter(name="key", description="Key to query") String key) {
        return this.getLocality(key).isLocal();
    }

    @ManagedOperation(description="Locates an object in a cluster.  Only works with String keys.")
    @Operation(displayName="Locate key")
    public List<String> locateKey(@Parameter(name="key", description="Key to locate") String key) {
        LinkedList<String> l = new LinkedList<String>();
        for (Address a : this.locate(key)) {
            l.add(a.toString());
        }
        return l;
    }

    public String toString() {
        return "DistributionManagerImpl[rehashInProgress=" + this.rehashInProgress + ", consistentHash=" + this.consistentHash + "]";
    }

    @Override
    public TopologyInfo getTopologyInfo() {
        return this.topologyInfo;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean awaitLeaveRehashAcks(Set<Address> stateReceivers, long timeout) throws InterruptedException {
        long start = System.currentTimeMillis();
        boolean timeoutReached = false;
        this.leaveAcksLock.lock();
        try {
            while (!timeoutReached) {
                boolean receivedAcks = ((Object)stateReceivers).equals(this.leaveRehashAcks);
                if (receivedAcks) {
                    break;
                }
                this.acksArrived.await(1000L, TimeUnit.MILLISECONDS);
                timeoutReached = System.currentTimeMillis() - start > timeout;
            }
        }
        finally {
            this.leaveRehashAcks.clear();
            this.leaveAcksLock.unlock();
        }
        return !timeoutReached;
    }

    @Listener
    public class ViewChangeListener {
        @ViewChanged
        public void handleViewChange(ViewChangedEvent e) {
            if (trace) {
                log.trace("view change received. Needs to re-join? " + e.isNeedsToRejoin());
            }
            if (e.isNeedsToRejoin()) {
                try {
                    DistributionManagerImpl.this.join();
                }
                catch (Exception e1) {
                    log.fatal((Object)"Unable to recover from a partition merge!", e1);
                }
            } else {
                try {
                    boolean started = DistributionManagerImpl.this.startLatch.await(5L, TimeUnit.MINUTES);
                    if (started) {
                        DistributionManagerImpl.this.rehash(e.getNewMembers(), e.getOldMembers());
                    } else {
                        log.warn("DistributionManager not started after waiting up to 5 minutes!  Not rehashing!");
                    }
                }
                catch (InterruptedException ie) {
                    log.warn("View change interrupted; not rehashing!");
                }
            }
        }
    }
}

