/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.affinity;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.infinispan.Cache;
import org.infinispan.affinity.KeyAffinityService;
import org.infinispan.affinity.KeyGenerator;
import org.infinispan.affinity.ListenerRegistration;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.concurrent.ConcurrentHashSet;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@ThreadSafe
public class KeyAffinityServiceImpl
implements KeyAffinityService {
    private final float THRESHOLD = 0.5f;
    private static final Log log = LogFactory.getLog(KeyAffinityServiceImpl.class);
    private final Set<Address> filter;
    @GuardedBy(value="maxNumberInvariant")
    private final Map<Address, BlockingQueue> address2key = new ConcurrentHashMap<Address, BlockingQueue>();
    private final Executor executor;
    private final Cache cache;
    private final KeyGenerator keyGenerator;
    private final int bufferSize;
    private final AtomicInteger maxNumberOfKeys = new AtomicInteger();
    private final AtomicInteger exitingNumberOfKeys = new AtomicInteger();
    private volatile boolean started;
    private final ReadWriteLock maxNumberInvariant = new ReentrantReadWriteLock();
    private final ReclosableLatch keyProducerStartLatch = new ReclosableLatch();
    private volatile KeyGeneratorWorker keyGenWorker;
    private volatile ListenerRegistration listenerRegistration;

    public KeyAffinityServiceImpl(Executor executor, Cache cache, KeyGenerator keyGenerator, int bufferSize, Collection<Address> filter, boolean start) {
        this.executor = executor;
        this.cache = cache;
        this.keyGenerator = keyGenerator;
        this.bufferSize = bufferSize;
        if (filter != null) {
            this.filter = new ConcurrentHashSet<Address>();
            for (Address address : filter) {
                this.filter.add(address);
            }
        } else {
            this.filter = null;
        }
        if (start) {
            this.start();
        }
    }

    public Object getCollocatedKey(Object otherKey) {
        Address address = this.getAddressForKey(otherKey);
        return this.getKeyForAddress(address);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object getKeyForAddress(Address address) {
        if (!this.started) {
            throw new IllegalStateException("You have to start the service first!");
        }
        if (address == null) {
            throw new NullPointerException("Null address not supported!");
        }
        BlockingQueue queue = this.address2key.get(address);
        try {
            Object result;
            this.maxNumberInvariant.readLock().lock();
            try {
                result = queue.take();
            }
            finally {
                this.maxNumberInvariant.readLock().unlock();
            }
            this.exitingNumberOfKeys.decrementAndGet();
            Object e = result;
            return e;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            Object var4_7 = null;
            return var4_7;
        }
        finally {
            if ((float)queue.size() < (float)this.maxNumberOfKeys.get() * 0.5f + 1.0f) {
                this.keyProducerStartLatch.open();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() {
        if (this.started) {
            log.info("Service already started, ignoring call to start!");
            return;
        }
        List<Address> existingNodes = this.getExistingNodes();
        this.maxNumberInvariant.writeLock().lock();
        try {
            this.addQueuesForAddresses(existingNodes);
            this.resetNumberOfKeys();
        }
        finally {
            this.maxNumberInvariant.writeLock().unlock();
        }
        this.keyGenWorker = new KeyGeneratorWorker();
        this.executor.execute(this.keyGenWorker);
        this.listenerRegistration = new ListenerRegistration(this);
        this.cache.getCacheManager().addListener(this.listenerRegistration);
        this.cache.addListener(this.listenerRegistration);
        this.keyProducerStartLatch.open();
        this.started = true;
    }

    @Override
    public void stop() {
        if (!this.started) {
            log.info("Ignoring call to stop as service is not started.");
            return;
        }
        this.started = false;
        EmbeddedCacheManager cacheManager = this.cache.getCacheManager();
        if (!cacheManager.getListeners().contains(this.listenerRegistration)) {
            throw new IllegalStateException("Listener must have been registered!");
        }
        cacheManager.removeListener(this.listenerRegistration);
        if (this.cache.getListeners().contains(this.listenerRegistration)) {
            this.cache.removeListener(this.listenerRegistration);
        }
        this.keyGenWorker.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleViewChange(ViewChangedEvent vce) {
        if (log.isTraceEnabled()) {
            log.trace("ViewChange received: " + vce);
        }
        this.maxNumberInvariant.writeLock().lock();
        try {
            this.address2key.clear();
            this.addQueuesForAddresses(vce.getNewMembers());
            this.resetNumberOfKeys();
            this.keyProducerStartLatch.open();
        }
        finally {
            this.maxNumberInvariant.writeLock().unlock();
        }
    }

    public boolean isKeyGeneratorThreadAlive() {
        return this.keyGenWorker.isAlive();
    }

    public void handleCacheStopped(CacheStoppedEvent cse) {
        if (log.isTraceEnabled()) {
            log.trace("Cache stopped, stopping the service: " + cse);
        }
        this.stop();
    }

    private void resetNumberOfKeys() {
        this.maxNumberOfKeys.set(this.address2key.keySet().size() * this.bufferSize);
        this.exitingNumberOfKeys.set(0);
        if (log.isTraceEnabled()) {
            log.trace("resetNumberOfKeys ends with: maxNumberOfKeys=" + this.maxNumberOfKeys + ", exitingNumberOfKeys=" + this.exitingNumberOfKeys);
        }
    }

    private void addQueuesForAddresses(Collection<Address> addresses) {
        for (Address address : addresses) {
            if (this.interestedInAddress(address)) {
                this.address2key.put(address, new ArrayBlockingQueue(this.bufferSize));
                continue;
            }
            if (!log.isTraceEnabled()) continue;
            log.trace("Skipping address: " + address);
        }
    }

    private boolean interestedInAddress(Address address) {
        return this.filter == null || this.filter.contains(address);
    }

    private List<Address> getExistingNodes() {
        return this.cache.getAdvancedCache().getRpcManager().getTransport().getMembers();
    }

    private Address getAddressForKey(Object key) {
        DistributionManager distributionManager = this.getDistributionManager();
        ConsistentHash hash = distributionManager.getConsistentHash();
        List<Address> addressList = hash.locate(key, 1);
        if (addressList.size() == 0) {
            throw new IllegalStateException("Empty address list returned by consistent hash " + hash + " for key " + key);
        }
        return addressList.get(0);
    }

    private DistributionManager getDistributionManager() {
        DistributionManager distributionManager = this.cache.getAdvancedCache().getDistributionManager();
        if (distributionManager == null) {
            throw new IllegalStateException("Null distribution manager. Is this an distributed(v.s. replicated) cache?");
        }
        return distributionManager;
    }

    public Map<Address, BlockingQueue> getAddress2KeysMapping() {
        return Collections.unmodifiableMap(this.address2key);
    }

    public int getMaxNumberOfKeys() {
        return this.maxNumberOfKeys.intValue();
    }

    public int getExitingNumberOfKeys() {
        return this.exitingNumberOfKeys.intValue();
    }

    public boolean isKeyGeneratorThreadActive() {
        return this.keyGenWorker.isActive();
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    public class KeyGeneratorWorker
    implements Runnable {
        private volatile boolean isActive;
        private boolean isAlive;
        private volatile Thread runner;

        @Override
        public void run() {
            this.runner = Thread.currentThread();
            this.isAlive = true;
            while (!this.waitToBeWakenUp()) {
                this.isActive = true;
                if (log.isTraceEnabled()) {
                    log.trace("KeyGeneratorWorker marked as ACTIVE");
                }
                this.generateKeys();
                this.isActive = false;
                if (!log.isTraceEnabled()) continue;
                log.trace("KeyGeneratorWorker marked as INACTIVE");
            }
            this.isAlive = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void generateKeys() {
            KeyAffinityServiceImpl.this.maxNumberInvariant.readLock().lock();
            try {
                while (KeyAffinityServiceImpl.this.maxNumberOfKeys.get() != KeyAffinityServiceImpl.this.exitingNumberOfKeys.get()) {
                    Object key = KeyAffinityServiceImpl.this.keyGenerator.getKey();
                    Address addressForKey = KeyAffinityServiceImpl.this.getAddressForKey(key);
                    if (!KeyAffinityServiceImpl.this.interestedInAddress(addressForKey)) continue;
                    this.tryAddKey(addressForKey, key);
                }
                KeyAffinityServiceImpl.this.keyProducerStartLatch.close();
            }
            finally {
                KeyAffinityServiceImpl.this.maxNumberInvariant.readLock().unlock();
            }
        }

        private boolean waitToBeWakenUp() {
            try {
                KeyAffinityServiceImpl.this.keyProducerStartLatch.await();
            }
            catch (InterruptedException e) {
                if (log.isInfoEnabled()) {
                    log.info("Shutting down KeyAffinity service for key set: " + KeyAffinityServiceImpl.this.filter);
                }
                return true;
            }
            return false;
        }

        private void tryAddKey(Address address, Object key) {
            BlockingQueue queue = (BlockingQueue)KeyAffinityServiceImpl.this.address2key.get(address);
            boolean added = queue.offer(key);
            if (added) {
                KeyAffinityServiceImpl.this.exitingNumberOfKeys.incrementAndGet();
            }
            if (log.isTraceEnabled()) {
                log.trace((added ? "Successfully" : "Not") + " added key(" + key + ") to the address(" + address + ").");
                if (added) {
                    log.trace("maxNumberOfKeys==" + KeyAffinityServiceImpl.this.maxNumberOfKeys + ", exitingNumberOfKeys==" + KeyAffinityServiceImpl.this.exitingNumberOfKeys);
                }
            }
        }

        public boolean isActive() {
            return this.isActive;
        }

        public boolean isAlive() {
            return this.isAlive;
        }

        public void stop() {
            this.runner.interrupt();
        }
    }
}

