/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.loaders.decorators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.CacheException;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.decorators.AbstractDelegatingStore;
import org.infinispan.loaders.decorators.AsyncStoreConfig;
import org.infinispan.loaders.modifications.Clear;
import org.infinispan.loaders.modifications.Modification;
import org.infinispan.loaders.modifications.PurgeExpired;
import org.infinispan.loaders.modifications.Remove;
import org.infinispan.loaders.modifications.Store;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class AsyncStore
extends AbstractDelegatingStore {
    private static final Log log = LogFactory.getLog(AsyncStore.class);
    private static final boolean trace = log.isTraceEnabled();
    private static AtomicInteger threadId = new AtomicInteger(0);
    private ExecutorService executor;
    private AtomicBoolean stopped = new AtomicBoolean(true);
    private BlockingQueue<Modification> queue;
    private List<Future> processorFutures;
    private AsyncStoreConfig asyncStoreConfig;

    public AsyncStore(CacheStore cacheStore, AsyncStoreConfig asyncStoreConfig) {
        super(cacheStore);
        this.asyncStoreConfig = asyncStoreConfig;
    }

    @Override
    public void store(InternalCacheEntry ed) {
        this.enqueue(new Store(ed));
    }

    @Override
    public void clear() {
        this.enqueue(new Clear());
    }

    @Override
    public boolean remove(Object key) {
        this.enqueue(new Remove(key));
        return true;
    }

    @Override
    public void purgeExpired() {
        this.enqueue(new PurgeExpired());
    }

    private void enqueue(Modification mod) {
        try {
            if (this.stopped.get()) {
                throw new CacheException("AsyncStore stopped; no longer accepting more entries.");
            }
            log.trace((Object)"Enqueuing modification {0}", mod);
            this.queue.put(mod);
        }
        catch (Exception e) {
            throw new CacheException("Unable to enqueue asynchronous task", e);
        }
    }

    @Override
    public void start() throws CacheLoaderException {
        this.queue = new LinkedBlockingQueue<Modification>(this.asyncStoreConfig.getQueueSize());
        log.info((Object)"Async cache loader starting {0}", this);
        this.stopped.set(false);
        super.start();
        int poolSize = this.asyncStoreConfig.getThreadPoolSize();
        this.executor = Executors.newFixedThreadPool(poolSize, new ThreadFactory(){

            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "AsyncStore-" + threadId.getAndIncrement());
                t.setDaemon(true);
                return t;
            }
        });
        this.processorFutures = new ArrayList<Future>(poolSize);
        for (int i = 0; i < poolSize; ++i) {
            this.processorFutures.add(this.executor.submit(new AsyncProcessor()));
        }
    }

    @Override
    public void stop() throws CacheLoaderException {
        this.stopped.set(true);
        if (this.executor != null) {
            for (Future f : this.processorFutures) {
                f.cancel(true);
            }
            this.executor.shutdown();
            try {
                boolean terminated = this.executor.isTerminated();
                while (!terminated) {
                    terminated = this.executor.awaitTermination(60L, TimeUnit.SECONDS);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.executor = null;
        super.stop();
    }

    protected void applyModificationsSync(List<Modification> mods) throws CacheLoaderException {
        block6: for (Modification m : mods) {
            switch (m.getType()) {
                case STORE: {
                    Store s = (Store)m;
                    super.store(s.getStoredEntry());
                    continue block6;
                }
                case CLEAR: {
                    super.clear();
                    continue block6;
                }
                case REMOVE: {
                    Remove r = (Remove)m;
                    super.remove(r.getKey());
                    continue block6;
                }
                case PURGE_EXPIRED: {
                    super.purgeExpired();
                    continue block6;
                }
            }
            throw new IllegalArgumentException("Unknown modification type " + (Object)((Object)m.getType()));
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class AsyncProcessor
    implements Runnable {
        private final List<Modification> mods;

        private AsyncProcessor() {
            this.mods = new ArrayList<Modification>(AsyncStore.this.asyncStoreConfig.getBatchSize());
        }

        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    this.run0();
                }
                catch (InterruptedException e) {
                    // empty catch block
                    break;
                }
            }
            try {
                if (trace) {
                    log.trace((Object)"Process remaining batch {0}", this.mods.size());
                }
                this.put(this.mods);
                if (trace) {
                    log.trace((Object)"Process remaining queued {0}", AsyncStore.this.queue.size());
                }
                while (!AsyncStore.this.queue.isEmpty()) {
                    this.run0();
                }
            }
            catch (InterruptedException e) {
                log.trace("remaining interrupted");
            }
        }

        private void run0() throws InterruptedException {
            log.trace("Checking for modifications");
            int i = AsyncStore.this.queue.drainTo(this.mods, AsyncStore.this.asyncStoreConfig.getBatchSize());
            if (i == 0) {
                Modification m = (Modification)AsyncStore.this.queue.take();
                this.mods.add(m);
            }
            if (trace) {
                log.trace((Object)"Calling put(List) with {0} modifications", this.mods.size());
            }
            this.put(this.mods);
            this.mods.clear();
        }

        private void put(List<Modification> mods) {
            block3: {
                try {
                    AsyncStore.this.applyModificationsSync(mods);
                }
                catch (Exception e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Failed to process async modifications: " + e);
                    }
                    if (!log.isDebugEnabled()) break block3;
                    log.debug((Object)"Exception: ", e);
                }
            }
        }
    }
}

