/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.loaders.decorators;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.CacheException;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.loaders.CacheLoaderConfig;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.decorators.AbstractDelegatingStore;
import org.infinispan.loaders.decorators.AsyncStoreConfig;
import org.infinispan.loaders.modifications.Clear;
import org.infinispan.loaders.modifications.Commit;
import org.infinispan.loaders.modifications.Modification;
import org.infinispan.loaders.modifications.Prepare;
import org.infinispan.loaders.modifications.PurgeExpired;
import org.infinispan.loaders.modifications.Remove;
import org.infinispan.loaders.modifications.Store;
import org.infinispan.marshall.Marshaller;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.locks.containers.ReentrantPerEntryLockContainer;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class AsyncStore
extends AbstractDelegatingStore {
    private static final Log log = LogFactory.getLog(AsyncStore.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final AtomicInteger threadId = new AtomicInteger(0);
    private final AtomicBoolean stopped = new AtomicBoolean(true);
    private final AsyncStoreConfig asyncStoreConfig;
    private final AtomicInteger count = new AtomicInteger(0);
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = this.lock.newCondition();
    ExecutorService executor;
    private List<Future> processorFutures;
    private final ReadWriteLock mapLock = new ReentrantReadWriteLock();
    private final Lock read = this.mapLock.readLock();
    private final Lock write = this.mapLock.writeLock();
    private int concurrencyLevel;
    @GuardedBy(value="mapLock")
    protected ConcurrentMap<Object, Modification> state;
    private ReleaseAllLockContainer lockContainer;

    public AsyncStore(CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
        super(delegate);
        this.asyncStoreConfig = asyncStoreConfig;
    }

    @Override
    public void init(CacheLoaderConfig config, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException {
        super.init(config, cache, m);
        this.concurrencyLevel = cache == null || cache.getConfiguration() == null ? 16 : cache.getConfiguration().getConcurrencyLevel();
        this.lockContainer = new ReleaseAllLockContainer(this.concurrencyLevel);
    }

    @Override
    public void store(InternalCacheEntry ed) {
        this.enqueue(ed.getKey(), new Store(ed));
    }

    @Override
    public boolean remove(Object key) {
        this.enqueue(key, new Remove(key));
        return true;
    }

    @Override
    public void clear() {
        Clear clear = new Clear();
        this.enqueue(clear, clear);
    }

    @Override
    public void purgeExpired() {
        PurgeExpired purge = new PurgeExpired();
        this.enqueue(purge, purge);
    }

    @Override
    public void prepare(List<? extends Modification> list, GlobalTransaction tx, boolean isOnePhase) {
        Prepare prepare = new Prepare(list, tx, isOnePhase);
        this.enqueue(prepare, prepare);
    }

    @Override
    public void commit(GlobalTransaction tx) throws CacheLoaderException {
        Commit commit = new Commit(tx);
        this.enqueue(commit, commit);
    }

    @Override
    public void start() throws CacheLoaderException {
        this.state = this.newStateMap();
        log.info((Object)"Async cache loader starting {0}", this);
        this.stopped.set(false);
        super.start();
        int poolSize = this.asyncStoreConfig.getThreadPoolSize();
        this.executor = Executors.newFixedThreadPool(poolSize, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "CoalescedAsyncStore-" + threadId.getAndIncrement());
                t.setDaemon(true);
                return t;
            }
        });
        this.processorFutures = new ArrayList<Future>(poolSize);
        for (int i = 0; i < poolSize; ++i) {
            this.processorFutures.add(this.executor.submit(this.createAsyncProcessor()));
        }
    }

    @Override
    public void stop() throws CacheLoaderException {
        this.stopped.set(true);
        if (this.executor != null) {
            for (Future f : this.processorFutures) {
                f.cancel(true);
            }
            this.executor.shutdown();
            try {
                boolean terminated = this.executor.isTerminated();
                while (!terminated) {
                    terminated = this.executor.awaitTermination(60L, TimeUnit.SECONDS);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.executor = null;
        super.stop();
    }

    protected void applyModificationsSync(ConcurrentMap<Object, Modification> mods) throws CacheLoaderException {
        Set entries = mods.entrySet();
        for (Map.Entry entry : entries) {
            Modification mod = (Modification)entry.getValue();
            switch (mod.getType()) {
                case STORE: {
                    super.store(((Store)mod).getStoredEntry());
                    break;
                }
                case REMOVE: {
                    super.remove(entry.getKey());
                    break;
                }
                case CLEAR: {
                    super.clear();
                    break;
                }
                case PURGE_EXPIRED: {
                    super.purgeExpired();
                    break;
                }
                case PREPARE: {
                    List<? extends Modification> coalesced = this.coalesceModificationList(((Prepare)mod).getList());
                    super.prepare(coalesced, ((Prepare)mod).getTx(), ((Prepare)mod).isOnePhase());
                    break;
                }
                case COMMIT: {
                    super.commit(((Commit)mod).getTx());
                }
            }
        }
    }

    protected Runnable createAsyncProcessor() {
        return new AsyncProcessor();
    }

    private List<? extends Modification> coalesceModificationList(List<? extends Modification> mods) {
        HashMap<Object, Modification> keyMods = new HashMap<Object, Modification>();
        ArrayList<Modification> coalesced = new ArrayList<Modification>();
        block5: for (Modification modification : mods) {
            switch (modification.getType()) {
                case STORE: {
                    keyMods.put(((Store)modification).getStoredEntry().getKey(), modification);
                    continue block5;
                }
                case CLEAR: {
                    keyMods.clear();
                    coalesced.add(modification);
                    continue block5;
                }
                case REMOVE: {
                    if (!coalesced.isEmpty() && keyMods.containsKey(((Remove)modification).getKey())) {
                        keyMods.remove(((Remove)modification).getKey());
                        continue block5;
                    }
                    if (!coalesced.isEmpty()) continue block5;
                    keyMods.put(((Remove)modification).getKey(), modification);
                    continue block5;
                }
            }
            throw new IllegalArgumentException("Unknown modification type " + (Object)((Object)modification.getType()));
        }
        coalesced.addAll(keyMods.values());
        return coalesced;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void enqueue(Object key, Modification mod) {
        try {
            if (this.stopped.get()) {
                throw new CacheException("AsyncStore stopped; no longer accepting more entries.");
            }
            if (trace) {
                log.trace((Object)"Enqueuing modification {0}", mod);
            }
            Modification prev = null;
            int c = -1;
            boolean unlock = false;
            try {
                this.acquireLock(this.read);
                unlock = true;
                prev = this.state.put(key, mod);
            }
            finally {
                if (unlock) {
                    this.read.unlock();
                }
            }
            if (prev == null) {
                c = this.count.getAndIncrement();
            }
            if (c == 0) {
                this.signalNotEmpty();
            }
        }
        catch (Exception e) {
            throw new CacheException("Unable to enqueue asynchronous task", e);
        }
    }

    private void acquireLock(Lock lock) {
        try {
            if (!lock.tryLock(this.asyncStoreConfig.getFlushLockTimeout(), TimeUnit.MILLISECONDS)) {
                throw new CacheException("Unable to acquire lock on update map");
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void signalNotEmpty() {
        this.lock.lock();
        try {
            this.notEmpty.signal();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void awaitNotEmpty() throws InterruptedException {
        this.lock.lockInterruptibly();
        try {
            try {
                while (this.count.get() == 0) {
                    this.notEmpty.await();
                }
            }
            catch (InterruptedException ie) {
                this.notEmpty.signal();
                throw ie;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private int decrementAndGet(int delta) {
        int next;
        int current;
        while (!this.count.compareAndSet(current = this.count.get(), next = current - delta)) {
        }
        return next;
    }

    private ConcurrentMap<Object, Modification> newStateMap() {
        return new ConcurrentHashMap<Object, Modification>(64, 0.75f, this.concurrencyLevel);
    }

    private static class ReleaseAllLockContainer
    extends ReentrantPerEntryLockContainer {
        private ReleaseAllLockContainer(int concurrencyLevel) {
            super(concurrencyLevel);
        }

        void releaseLocks(Set<Object> keys) {
            for (Object key : keys) {
                if (trace) {
                    log.trace((Object)"Release lock for key {0}", key);
                }
                this.releaseLock(key);
            }
        }
    }

    class AsyncProcessor
    implements Runnable {
        private ConcurrentMap<Object, Modification> swap;
        private final Set<Object> lockedKeys;

        AsyncProcessor() {
            this.swap = AsyncStore.this.newStateMap();
            this.lockedKeys = new HashSet<Object>();
        }

        @Override
        public void run() {
            block8: {
                while (!Thread.interrupted()) {
                    try {
                        this.run0();
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                        break;
                    }
                }
                try {
                    if (trace) {
                        log.trace((Object)"Process remaining batch {0}", this.swap.size());
                    }
                    this.put(this.swap);
                    if (trace) {
                        log.trace((Object)"Process remaining queued {0}", AsyncStore.this.state.size());
                    }
                    while (!AsyncStore.this.state.isEmpty()) {
                        this.run0();
                    }
                }
                catch (InterruptedException e) {
                    if (!trace) break block8;
                    log.trace("Remaining interrupted");
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void run0() throws InterruptedException {
            if (trace) {
                log.trace("Checking for modifications");
            }
            boolean unlock = false;
            try {
                AsyncStore.this.acquireLock(AsyncStore.this.write);
                unlock = true;
                this.swap = AsyncStore.this.state;
                AsyncStore.this.state = AsyncStore.this.newStateMap();
                for (Object key : this.swap.keySet()) {
                    boolean acquired;
                    boolean bl = acquired = AsyncStore.this.lockContainer.acquireLock(key, 0L, TimeUnit.NANOSECONDS) != null;
                    if (trace) {
                        log.trace((Object)"Lock for key {0} was acquired={1}", key, acquired);
                    }
                    if (!acquired) {
                        Modification prev = (Modification)this.swap.remove(key);
                        AsyncStore.this.state.put(key, prev);
                        continue;
                    }
                    this.lockedKeys.add(key);
                }
            }
            finally {
                if (unlock) {
                    AsyncStore.this.write.unlock();
                }
            }
            try {
                int size = this.swap.size();
                if (this.swap.isEmpty()) {
                    AsyncStore.this.awaitNotEmpty();
                } else {
                    boolean successful;
                    AsyncStore.this.decrementAndGet(size);
                    if (trace) {
                        log.trace((Object)"Apply {0} modifications", size);
                    }
                    int maxRetries = 3;
                    int attemptNumber = 0;
                    do {
                        if (attemptNumber <= 0 || !log.isDebugEnabled()) continue;
                        log.debug((Object)"Retrying due to previous failure. {0} attempts left.", maxRetries - attemptNumber);
                    } while (!(successful = this.put(this.swap)) && ++attemptNumber <= maxRetries);
                    if (!successful) {
                        log.warn("Unable to process some async modifications after " + maxRetries + " retries!");
                    }
                }
            }
            finally {
                AsyncStore.this.lockContainer.releaseLocks(this.lockedKeys);
                this.lockedKeys.clear();
            }
        }

        boolean put(ConcurrentMap<Object, Modification> mods) {
            try {
                AsyncStore.this.applyModificationsSync(mods);
                return true;
            }
            catch (Exception e) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)"Failed to process async modifications", e);
                }
                return false;
            }
        }
    }
}

