/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.DistributionManagerImpl;
import org.infinispan.distribution.RemoteTransactionLoggerImpl;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public abstract class RehashTask
implements Callable<Void> {
    protected DistributionManager distributionManager;
    protected RpcManager rpcManager;
    protected Configuration configuration;
    protected CommandsFactory cf;
    protected DataContainer dataContainer;
    protected final Address self;
    private final AtomicInteger counter = new AtomicInteger(0);
    protected final Log log = LogFactory.getLog(this.getClass());
    protected final boolean trace = this.log.isTraceEnabled();
    protected final ExecutorService statePullExecutor = Executors.newCachedThreadPool(new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            Thread th = new Thread(r, "Rehasher-" + RehashTask.this.self + "-Worker-" + RehashTask.this.counter.getAndIncrement());
            th.setDaemon(true);
            return th;
        }
    });

    protected RehashTask(DistributionManagerImpl distributionManager, RpcManager rpcManager, Configuration configuration, CommandsFactory cf, DataContainer dataContainer) {
        this.distributionManager = distributionManager;
        this.rpcManager = rpcManager;
        this.configuration = configuration;
        this.cf = cf;
        this.dataContainer = dataContainer;
        this.self = rpcManager.getAddress();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Void call() throws Exception {
        this.distributionManager.setRehashInProgress(true);
        try {
            this.performRehash();
            Void void_ = null;
            return void_;
        }
        finally {
            this.distributionManager.setRehashInProgress(false);
        }
    }

    protected abstract void performRehash() throws Exception;

    protected Collection<Address> coordinator() {
        return Collections.singleton(this.rpcManager.getTransport().getCoordinator());
    }

    protected void invalidateInvalidHolders(List<Address> doNotInvalidate, ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Invalidating entries that have migrated across");
        }
        HashMap<Address, HashSet<Object>> invalidations = new HashMap<Address, HashSet<Object>>();
        for (Object object : this.dataContainer.keySet()) {
            Collection<Address> invalidHolders = this.getInvalidHolders(object, chOld, chNew);
            for (Address a : invalidHolders) {
                HashSet<Object> s = (HashSet<Object>)invalidations.get(a);
                if (s == null) {
                    s = new HashSet<Object>();
                    invalidations.put(a, s);
                }
                s.add(object);
            }
        }
        invalidations.keySet().removeAll(doNotInvalidate);
        for (Map.Entry entry : invalidations.entrySet()) {
            InvalidateCommand ic = this.cf.buildInvalidateFromL1Command(true, ((Set)entry.getValue()).toArray());
            this.rpcManager.invokeRemotely(Collections.singletonList(entry.getKey()), ic, false);
        }
    }

    protected void invalidateInvalidHolders(ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
        List<Address> none = Collections.emptyList();
        this.invalidateInvalidHolders(none, chOld, chNew);
    }

    protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
        List<Address> oldOwners = chOld.locate(key, this.configuration.getNumOwners());
        List<Address> newOwners = chNew.locate(key, this.configuration.getNumOwners());
        LinkedList<Address> toInvalidate = new LinkedList<Address>(oldOwners);
        toInvalidate.removeAll(newOwners);
        return toInvalidate;
    }

    protected abstract class StateGrabber
    implements Callable<Void> {
        private final Address stateProvider;
        private final ReplicableCommand command;
        private final ConsistentHash newConsistentHash;

        public StateGrabber(Address stateProvider, ReplicableCommand command, ConsistentHash newConsistentHash) {
            this.stateProvider = stateProvider;
            this.command = command;
            this.newConsistentHash = newConsistentHash;
        }

        @Override
        public Void call() throws Exception {
            Map<Address, Response> resps = RehashTask.this.rpcManager.invokeRemotely(Collections.singleton(this.stateProvider), this.command, ResponseMode.SYNCHRONOUS, RehashTask.this.configuration.getRehashRpcTimeout(), true);
            for (Response r : resps.values()) {
                if (!(r instanceof SuccessfulResponse)) continue;
                Map<Object, InternalCacheValue> state = this.getStateFromResponse((SuccessfulResponse)r);
                RehashTask.this.distributionManager.applyState(this.newConsistentHash, state, new RemoteTransactionLoggerImpl(RehashTask.this.cf, this.stateProvider, RehashTask.this.rpcManager), this.isForLeave());
            }
            return null;
        }

        protected abstract boolean isForLeave();

        private Map<Object, InternalCacheValue> getStateFromResponse(SuccessfulResponse r) {
            return (Map)r.getResponseValue();
        }
    }
}

