/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.distribution.ConsistentHash;
import org.infinispan.distribution.ConsistentHashHelper;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.JoinTask;
import org.infinispan.distribution.LeaveTask;
import org.infinispan.distribution.TransactionLogger;
import org.infinispan.distribution.TransactionLoggerImpl;
import org.infinispan.distribution.UnionConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.Util;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@MBean(objectName="DistributionManager", description="Component that handles distribution of content across a cluster")
public class DistributionManagerImpl
implements DistributionManager {
    private final Log log = LogFactory.getLog(DistributionManagerImpl.class);
    private final boolean trace = this.log.isTraceEnabled();
    Configuration configuration;
    volatile ConsistentHash consistentHash;
    volatile ConsistentHash oldConsistentHash;
    Address self;
    CacheLoaderManager cacheLoaderManager;
    RpcManager rpcManager;
    CacheManagerNotifier notifier;
    int replCount;
    ViewChangeListener listener;
    CommandsFactory cf;
    LinkedBlockingQueue<Runnable> rehashQueue = new LinkedBlockingQueue();
    ThreadFactory tf = new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setPriority(1);
            t.setName("Rehasher-" + DistributionManagerImpl.this.rpcManager.getTransport().getAddress());
            return t;
        }
    };
    ExecutorService rehashExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, this.rehashQueue, this.tf);
    TransactionLogger transactionLogger = new TransactionLoggerImpl();
    volatile boolean rehashInProgress = false;
    volatile Address joiner;
    static final AtomicReferenceFieldUpdater<DistributionManagerImpl, Address> JOINER_CAS = AtomicReferenceFieldUpdater.newUpdater(DistributionManagerImpl.class, Address.class, "joiner");
    private DataContainer dataContainer;
    private InterceptorChain interceptorChain;
    private InvocationContextContainer icc;
    @ManagedAttribute(description="If true, the node has successfully joined the grid and is considered to hold state.  If false, the join process is still in progress.")
    volatile boolean joinComplete = false;
    final List<Address> leavers = new CopyOnWriteArrayList<Address>();
    volatile Future<Void> leaveTaskFuture;
    final CountDownLatch startLatch = new CountDownLatch(1);

    @Inject
    public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf, DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer icc, CacheLoaderManager cacheLoaderManager) {
        this.cacheLoaderManager = cacheLoaderManager;
        this.configuration = configuration;
        this.rpcManager = rpcManager;
        this.notifier = notifier;
        this.cf = cf;
        this.dataContainer = dataContainer;
        this.interceptorChain = interceptorChain;
        this.icc = icc;
    }

    @Start(priority=20)
    public void start() throws Exception {
        this.replCount = this.configuration.getNumOwners();
        this.consistentHash = ConsistentHashHelper.createConsistentHash(this.configuration, this.rpcManager.getTransport().getMembers());
        this.self = this.rpcManager.getTransport().getAddress();
        this.listener = new ViewChangeListener();
        this.notifier.addListener(this.listener);
        if (this.rpcManager.getTransport().getMembers().size() > 1) {
            JoinTask joinTask = new JoinTask(this.rpcManager, this.cf, this.configuration, this.transactionLogger, this.dataContainer, this);
            this.rehashExecutor.submit(joinTask);
        } else {
            this.joinComplete = true;
        }
        this.startLatch.countDown();
    }

    @Stop(priority=20)
    public void stop() {
        this.notifier.removeListener(this.listener);
        this.rehashExecutor.shutdownNow();
        this.joinComplete = false;
    }

    final List<Address> diffAll(List<Address> l1, List<Address> l2) {
        List<Address> largerList = l1.size() > l2.size() ? l1 : l2;
        List<Address> smallerList = largerList == l1 ? l2 : l1;
        ArrayList<Address> list = new ArrayList<Address>(largerList);
        list.removeAll(smallerList);
        return list;
    }

    final Address diff(List<Address> l1, List<Address> l2) {
        List<Address> l = this.diffAll(l1, l2);
        return l.isEmpty() ? null : l.get(0);
    }

    public void rehash(List<Address> newMembers, List<Address> oldMembers) {
        boolean join = oldMembers.size() < newMembers.size();
        this.log.info((Object)"Detected a veiw change.  Member list changed from {0} to {1}", oldMembers, newMembers);
        if (join) {
            Address joiner = this.diff(newMembers, oldMembers);
            this.log.info("This is a JOIN event!  Wait for notification from new joiner " + joiner);
        } else {
            Address leaver = this.diff(newMembers, oldMembers);
            this.log.info((Object)"This is a LEAVE event!  Node {0} has just left", leaver);
            boolean willReceiveLeaverState = this.willReceiveLeaverState(leaver);
            boolean willSendLeaverState = this.willSendLeaverState(leaver);
            try {
                this.oldConsistentHash = this.consistentHash;
                this.consistentHash = ConsistentHashHelper.removeAddress(this.consistentHash, leaver, this.configuration);
            }
            catch (Exception e) {
                this.log.fatal((Object)"Unable to process leaver!!", e);
                throw new CacheException(e);
            }
            if (willReceiveLeaverState) {
                this.log.info("Starting transaction logging; expecting state from someone!");
                this.transactionLogger.enable();
            }
            if (willSendLeaverState) {
                if (!(this.leaveTaskFuture == null || this.leaveTaskFuture.isCancelled() && this.leaveTaskFuture.isDone())) {
                    this.leaveTaskFuture.cancel(true);
                }
                this.leavers.add(leaver);
                LeaveTask task = new LeaveTask(this, this.rpcManager, this.configuration, this.leavers, this.transactionLogger, this.cf, this.dataContainer);
                this.leaveTaskFuture = this.rehashExecutor.submit(task);
                this.log.info("Need to rehash");
            } else {
                this.log.info("Not in same subspace, so ignoring leave event");
            }
        }
    }

    boolean willSendLeaverState(Address leaver) {
        return this.consistentHash.isAdjacent(leaver, this.self);
    }

    boolean willReceiveLeaverState(Address leaver) {
        int dist = this.consistentHash.getDistance(leaver, this.self);
        return dist <= this.replCount;
    }

    @Override
    public boolean isLocal(Object key) {
        return this.consistentHash.locate(key, this.replCount).contains(this.self);
    }

    @Override
    public List<Address> locate(Object key) {
        return this.consistentHash.locate(key, this.replCount);
    }

    @Override
    public Map<Object, List<Address>> locateAll(Collection<Object> keys) {
        return this.consistentHash.locateAll(keys, this.replCount);
    }

    @Override
    public void transformForL1(CacheEntry entry) {
        if (entry.getLifespan() < 0L || entry.getLifespan() > this.configuration.getL1Lifespan()) {
            entry.setLifespan(this.configuration.getL1Lifespan());
        }
    }

    @Override
    public InternalCacheEntry retrieveFromRemoteSource(Object key) throws Exception {
        ClusteredGetCommand get = this.cf.buildClusteredGetCommand(key);
        ClusteredGetResponseValidityFilter filter = new ClusteredGetResponseValidityFilter(this.locate(key));
        List<Response> responses = this.rpcManager.invokeRemotely(this.locate(key), get, ResponseMode.SYNCHRONOUS, this.configuration.getSyncReplTimeout(), false, filter);
        if (!responses.isEmpty()) {
            for (Response r : responses) {
                if (!(r instanceof SuccessfulResponse)) continue;
                InternalCacheValue cacheValue = (InternalCacheValue)((SuccessfulResponse)r).getResponseValue();
                return cacheValue.toInternalCacheEntry(key);
            }
        }
        return null;
    }

    @Override
    public ConsistentHash getConsistentHash() {
        return this.consistentHash;
    }

    @Override
    public void setConsistentHash(ConsistentHash consistentHash) {
        this.log.trace((Object)"Installing new consistent hash {0}", consistentHash);
        this.consistentHash = consistentHash;
    }

    @Override
    @ManagedOperation(description="Determines whether a given key is affected by an ongoing rehash, if any.")
    public boolean isAffectedByRehash(Object key) {
        return this.transactionLogger.isEnabled() && this.oldConsistentHash != null && !this.oldConsistentHash.locate(key, this.replCount).contains(this.self);
    }

    @Override
    public TransactionLogger getTransactionLogger() {
        return this.transactionLogger;
    }

    @Override
    public List<Address> requestPermissionToJoin(Address joiner) {
        if (JOINER_CAS.compareAndSet(this, null, joiner)) {
            return new LinkedList<Address>(this.consistentHash.getCaches());
        }
        return null;
    }

    @Override
    public void notifyJoinComplete(Address joiner) {
        this.log.trace((Object)"Received notification that {0} has completed a join.  Current 'joiner' flag is {1}, setting this to null.", joiner, this.joiner);
        if (this.joiner != null && this.joiner.equals(joiner)) {
            this.joiner = null;
        }
    }

    @Override
    public void informRehashOnJoin(Address joiner, boolean starting) {
        this.log.trace((Object)"Informed of a JOIN by {0}.  Starting? {1}", joiner, starting);
        if (!starting) {
            if (this.consistentHash instanceof UnionConsistentHash) {
                UnionConsistentHash uch = (UnionConsistentHash)this.consistentHash;
                this.consistentHash = uch.getNewConsistentHash();
            }
            this.rehashInProgress = false;
        } else {
            ConsistentHash chNew;
            ConsistentHash chOld = this.consistentHash;
            if (chOld instanceof UnionConsistentHash) {
                throw new RuntimeException("Not expecting a union CH!");
            }
            this.joiner = joiner;
            this.rehashInProgress = true;
            try {
                chNew = (ConsistentHash)Util.getInstance(this.configuration.getConsistentHashClass());
            }
            catch (Exception e) {
                throw new CacheException("Unable to create instance of " + this.configuration.getConsistentHashClass(), e);
            }
            LinkedList<Address> newAddresses = new LinkedList<Address>(chOld.getCaches());
            newAddresses.add(joiner);
            chNew.setCaches(newAddresses);
            this.consistentHash = new UnionConsistentHash(chOld, chNew);
        }
        this.log.trace((Object)"New CH is {0}", this.consistentHash);
    }

    public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state) {
        for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
            if (!consistentHash.locate(e.getKey(), this.configuration.getNumOwners()).contains(this.self)) continue;
            InternalCacheValue v = e.getValue();
            PutKeyValueCommand put = this.cf.buildPutKeyValueCommand(e.getKey(), v.getValue(), v.getLifespan(), v.getMaxIdle());
            InvocationContext ctx = this.icc.createInvocationContext();
            ctx.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_REMOTE_LOOKUP);
            this.interceptorChain.invoke(ctx, put);
        }
    }

    @Override
    public CacheStore getCacheStoreForRehashing() {
        if (this.cacheLoaderManager == null || !this.cacheLoaderManager.isEnabled() || this.cacheLoaderManager.isShared()) {
            return null;
        }
        return this.cacheLoaderManager.getCacheStore();
    }

    @Override
    @ManagedAttribute(description="Checks whether the node is involved in a rehash.")
    public boolean isRehashInProgress() {
        return !this.leavers.isEmpty() || this.rehashInProgress;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void applyReceivedState(Map<Object, InternalCacheValue> state) {
        this.applyState(this.consistentHash, state);
        boolean unlocked = false;
        try {
            this.drainTransactionLog();
            unlocked = true;
        }
        finally {
            if (!unlocked) {
                this.transactionLogger.unlockAndDisable();
            }
        }
    }

    void drainTransactionLog() {
        List<WriteCommand> c;
        while (this.transactionLogger.size() > 10) {
            c = this.transactionLogger.drain();
            this.apply(c);
        }
        c = this.transactionLogger.drainAndLock();
        this.apply(c);
        this.transactionLogger.unlockAndDisable();
    }

    private void apply(List<WriteCommand> c) {
        for (WriteCommand cmd : c) {
            InvocationContext ctx = this.icc.createInvocationContext();
            ctx.setFlags(Flag.SKIP_REMOTE_LOOKUP);
            this.interceptorChain.invoke(ctx, cmd);
        }
    }

    @ManagedAttribute(description="Size of the cluster in number of nodes")
    public String getClusterSize() {
        return this.rpcManager.getTransport().getMembers().size() + "";
    }

    @ManagedOperation(description="Tells you whether a given key is local to this instance of the cache.  Only works with String keys.")
    public boolean isLocatedLocally(String key) {
        return this.isLocal(key);
    }

    @ManagedOperation(description="Locates an object in a cluster.  Only works with String keys.")
    public List<String> locateKey(String key) {
        LinkedList<String> l = new LinkedList<String>();
        for (Address a : this.locate(key)) {
            l.add(a.toString());
        }
        return l;
    }

    @Listener
    public class ViewChangeListener {
        @ViewChanged
        public void handleViewChange(ViewChangedEvent e) {
            boolean started = false;
            try {
                started = DistributionManagerImpl.this.startLatch.await(2L, TimeUnit.MINUTES);
                if (started) {
                    DistributionManagerImpl.this.rehash(e.getNewMembers(), e.getOldMembers());
                } else {
                    DistributionManagerImpl.this.log.warn("DistributionManager not started after waiting up to 2 minutes!  Not rehashing!");
                }
            }
            catch (InterruptedException ie) {
                DistributionManagerImpl.this.log.warn("View change interrupted; not rehashing!");
            }
        }
    }
}

