/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.wan.serial;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.AttributesMutator;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.cache.wan.serial.BatchDestroyOperation;
import org.apache.geode.internal.cache.wan.serial.SerialSecondaryGatewayListener;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;

public class SerialGatewaySenderQueue
implements RegionQueue {
    private static final Logger logger = LogService.getLogger();
    private long headKey = -1L;
    private final AtomicLong tailKey = new AtomicLong();
    private long currentKey;
    private final Deque<Long> peekedIds = new LinkedBlockingDeque<Long>();
    private final String regionName;
    private Region<Long, AsyncEvent> region;
    private String diskStoreName;
    private int batchSize;
    private int maximumQueueMemory;
    private boolean enableConflation;
    private boolean enablePersistence;
    private boolean isDiskSynchronous;
    private final Map<String, Map<Object, Long>> indexes;
    private final GatewaySenderStats stats;
    private static final long MAXIMUM_KEY = Long.MAX_VALUE;
    private static final boolean NO_ACK = Boolean.getBoolean("gemfire.gateway-queue-no-ack");
    private volatile long lastDispatchedKey = -1L;
    private volatile long lastDestroyedKey = -1L;
    public static final int DEFAULT_MESSAGE_SYNC_INTERVAL = 1;
    private static volatile int messageSyncInterval = 1;
    private BatchRemovalThread removalThread = null;
    private final boolean keyPutNoSync;
    private final int maxPendingPuts;
    private final PriorityQueue<Long> pendingPuts;
    private AbstractGatewaySender sender = null;

    public SerialGatewaySenderQueue(AbstractGatewaySender abstractSender, String regionName, CacheListener listener) {
        this.regionName = regionName;
        this.headKey = -1L;
        this.tailKey.set(-1L);
        this.currentKey = -1L;
        this.indexes = new HashMap<String, Map<Object, Long>>();
        this.enableConflation = abstractSender.isBatchConflationEnabled();
        this.diskStoreName = abstractSender.getDiskStoreName();
        this.batchSize = abstractSender.getBatchSize();
        this.enablePersistence = abstractSender.isPersistenceEnabled();
        this.isDiskSynchronous = this.enablePersistence ? abstractSender.isDiskSynchronous() : false;
        if (Boolean.getBoolean("gemfire.gateway-queue-sync")) {
            this.keyPutNoSync = false;
            this.maxPendingPuts = 0;
            this.pendingPuts = null;
        } else {
            this.keyPutNoSync = true;
            this.maxPendingPuts = Math.max(this.batchSize, 100);
            this.pendingPuts = new PriorityQueue(this.maxPendingPuts + 5);
        }
        this.maximumQueueMemory = abstractSender.getMaximumMemeoryPerDispatcherQueue();
        this.stats = abstractSender.getStatistics();
        this.initializeRegion(abstractSender, listener);
        this.stats.incQueueSize(this.region.size());
        this.removalThread = new BatchRemovalThread(abstractSender.getCache());
        this.removalThread.start();
        this.sender = abstractSender;
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Contains {} elements", (Object)this, (Object)this.size());
        }
    }

    @Override
    public Region<Long, AsyncEvent> getRegion() {
        return this.region;
    }

    public void destroy() {
        this.getRegion().localDestroyRegion();
    }

    @Override
    public synchronized boolean put(Object event) throws CacheException {
        GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl)event;
        Region<?, ?> r = eventImpl.getRegion();
        boolean isPDXRegion = r instanceof DistributedRegion && r.getName().equals("PdxTypes");
        boolean isWbcl = this.regionName.startsWith("AsyncEventQueue_");
        if (!isPDXRegion || !isWbcl) {
            this.putAndGetKey(event);
            return true;
        }
        return false;
    }

    private long putAndGetKey(Object object) throws CacheException {
        Long key = this.getTailKey();
        this.region.put(key, (AsyncEvent)object);
        this.incrementTailKey();
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Inserted {} -> {}", (Object)this, (Object)key, object);
        }
        if (object instanceof Conflatable) {
            this.removeOldEntry((Conflatable)object, key);
        }
        return key;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private long putAndGetKeyNoSync(Object object) throws CacheException {
        var3_2 = this;
        synchronized (var3_2) {
            this.initializeKeys();
            ckey = this.currentKey;
            if (SerialGatewaySenderQueue.logger.isTraceEnabled()) {
                SerialGatewaySenderQueue.logger.trace("{}: Determined current key: {}", (Object)this, (Object)ckey);
            }
            key = ckey;
            this.currentKey = this.inc(ckey);
        }
        try {
            this.region.put(key, (AsyncEvent)object);
            if (SerialGatewaySenderQueue.logger.isDebugEnabled()) {
                SerialGatewaySenderQueue.logger.debug("{}: Inserted {} -> {}", (Object)this, (Object)key, object);
            }
            var4_4 = sync = this.pendingPuts;
        }
        catch (Throwable var10_13) {
            var12_15 = sync = this.pendingPuts;
            synchronized (var12_15) {
                while (true) lbl-1000:
                // 4 sources

                {
                    if (key.longValue() == this.tailKey.get()) {
                        this.incrementTailKey();
                        notifyWaiters = false;
                        if (this.pendingPuts.size() > 0) {
                            itr = this.pendingPuts.iterator();
                            while (itr.hasNext() && (k = itr.next()).longValue() == this.tailKey.get()) {
                                this.incrementTailKey();
                                if (!notifyWaiters) {
                                    notifyWaiters = this.pendingPuts.size() >= this.maxPendingPuts;
                                }
                                itr.remove();
                            }
                        }
                        if (!notifyWaiters) ** break;
                        sync.notifyAll();
                        ** break;
                    }
                    if (this.pendingPuts.size() < this.maxPendingPuts) {
                        this.pendingPuts.add(key);
                        ** break;
                    }
                    interrupted = Thread.interrupted();
                    t = null;
                    try {
                        sync.wait(5L);
                    }
                    catch (InterruptedException ie) {
                        t = ie;
                        interrupted = true;
                    }
                    finally {
                        if (interrupted) {
                            Thread.currentThread().interrupt();
                        }
                        ((LocalRegion)this.region).getCancelCriterion().checkCancelInProgress(t);
                        continue;
                    }
                    break;
                }
                ** GOTO lbl-1000
lbl96:
                // 3 sources

            }
            throw var10_13;
        }
        synchronized (var4_4) {
            while (true) lbl-1000:
            // 4 sources

            {
                if (key.longValue() == this.tailKey.get()) {
                    this.incrementTailKey();
                    notifyWaiters = false;
                    if (this.pendingPuts.size() > 0) {
                        itr = this.pendingPuts.iterator();
                        while (itr.hasNext() && (k = itr.next()).longValue() == this.tailKey.get()) {
                            this.incrementTailKey();
                            if (!notifyWaiters) {
                                notifyWaiters = this.pendingPuts.size() >= this.maxPendingPuts;
                            }
                            itr.remove();
                        }
                    }
                    if (!notifyWaiters) ** break;
                    sync.notifyAll();
                    ** break;
                }
                if (this.pendingPuts.size() < this.maxPendingPuts) {
                    this.pendingPuts.add(key);
                    ** break;
                }
                interrupted = Thread.interrupted();
                t = null;
                try {
                    sync.wait(5L);
                }
                catch (InterruptedException ie) {
                    t = ie;
                    interrupted = true;
                }
                finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                    ((LocalRegion)this.region).getCancelCriterion().checkCancelInProgress(t);
                    continue;
                }
                break;
            }
            ** GOTO lbl-1000
lbl55:
            // 3 sources

        }
        if (object instanceof Conflatable) {
            this.removeOldEntry((Conflatable)object, key);
        }
        return key;
    }

    @Override
    public synchronized AsyncEvent take() throws CacheException {
        throw new UnsupportedOperationException();
    }

    @Override
    public List<AsyncEvent> take(int batchSize) throws CacheException {
        throw new UnsupportedOperationException();
    }

    @Override
    public synchronized void remove() throws CacheException {
        Long key;
        block5: {
            if (this.peekedIds.isEmpty()) {
                return;
            }
            key = this.peekedIds.remove();
            try {
                this.updateHeadKey(key);
                this.removeIndex(key);
                this.region.localDestroy(key, "WAN_QUEUE_TOKEN");
                this.stats.decQueueSize();
            }
            catch (EntryNotFoundException ok) {
                if (!logger.isDebugEnabled()) break block5;
                logger.debug("{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.", (Object)this, (Object)key);
            }
        }
        boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
        this.lastDispatchedKey = key;
        if (wasEmpty) {
            this.notifyAll();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}", (Object)this, (Object)key, (Object)this.lastDispatchedKey, (Object)this.lastDestroyedKey);
        }
    }

    @Override
    public void remove(int size) throws CacheException {
        for (int i = 0; i < size; ++i) {
            this.remove();
        }
        if (logger.isTraceEnabled()) {
            logger.trace("{}: Removed a batch of {} entries", (Object)this, (Object)size);
        }
    }

    @Override
    public Object peek() throws CacheException {
        AsyncEvent object = this.peekAhead();
        if (logger.isTraceEnabled()) {
            logger.trace("{}: Peeked {} -> {}", (Object)this, this.peekedIds, (Object)object);
        }
        return object;
    }

    @Override
    public List<AsyncEvent> peek(int size) throws CacheException {
        return this.peek(size, -1);
    }

    @Override
    public List<AsyncEvent> peek(int size, int timeToWait) throws CacheException {
        boolean isTraceEnabled = logger.isTraceEnabled();
        long start = System.currentTimeMillis();
        long end = start + (long)timeToWait;
        if (isTraceEnabled) {
            logger.trace("{}: Peek start time={} end time={} time to wait={}", (Object)this, (Object)start, (Object)end, (Object)timeToWait);
        }
        ArrayList<AsyncEvent> batch = new ArrayList<AsyncEvent>(size * 2);
        while (batch.size() < size) {
            AsyncEvent object = this.peekAhead();
            if (object != null) {
                batch.add(object);
                continue;
            }
            long currentTime = System.currentTimeMillis();
            if (isTraceEnabled) {
                logger.trace("{}: Peek current time: {}", (Object)this, (Object)currentTime);
            }
            if (timeToWait == -1 || end <= currentTime) {
                if (!isTraceEnabled) break;
                logger.trace("{}: Peek breaking", (Object)this);
                break;
            }
            if (isTraceEnabled) {
                logger.trace("{}: Peek continuing", (Object)this);
            }
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        if (isTraceEnabled) {
            logger.trace("{}: Peeked a batch of {} entries", (Object)this, (Object)batch.size());
        }
        return batch;
    }

    public String toString() {
        return "SerialGatewaySender queue :" + this.regionName;
    }

    @Override
    public int size() {
        int size = ((LocalRegion)this.region).entryCount();
        return size + this.sender.getTmpQueuedEventSize();
    }

    @Override
    public void addCacheListener(CacheListener listener) {
        AttributesMutator<Long, AsyncEvent> mutator = this.region.getAttributesMutator();
        mutator.addCacheListener(listener);
    }

    @Override
    public void removeCacheListener() {
        AttributesMutator<Long, AsyncEvent> mutator = this.region.getAttributesMutator();
        CacheListener<Long, AsyncEvent>[] listeners = this.region.getAttributes().getCacheListeners();
        for (int i = 0; i < listeners.length; ++i) {
            if (!(listeners[i] instanceof SerialSecondaryGatewayListener)) continue;
            mutator.removeCacheListener(listeners[i]);
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean removeOldEntry(Conflatable object, Long tailKey) throws CacheException {
        boolean isDebugEnabled = logger.isDebugEnabled();
        boolean keepOldEntry = true;
        if (this.enableConflation && object.shouldBeConflated()) {
            Long previousIndex;
            if (isDebugEnabled) {
                logger.debug("{}: Conflating {} at queue index={} queue size={} head={} tail={}", (Object)this, (Object)object, (Object)tailKey, (Object)this.size(), (Object)this.headKey, (Object)tailKey);
            }
            String rName = object.getRegionToConflate();
            Object key = object.getKeyToConflate();
            SerialGatewaySenderQueue serialGatewaySenderQueue = this;
            synchronized (serialGatewaySenderQueue) {
                Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName);
                if (latestIndexesForRegion == null) {
                    latestIndexesForRegion = new HashMap<Object, Long>();
                    this.indexes.put(rName, latestIndexesForRegion);
                }
                previousIndex = latestIndexesForRegion.put(key, tailKey);
            }
            if (isDebugEnabled) {
                logger.debug("{}: Adding index key={}->index={} for {} head={} tail={}", (Object)this, key, (Object)tailKey, (Object)object, (Object)this.headKey, (Object)tailKey);
            }
            if (previousIndex != null) {
                if (isDebugEnabled) {
                    logger.debug("{}: Indexes contains index={} for key={} head={} tail={} and it can be used.", (Object)this, (Object)previousIndex, key, (Object)this.headKey, (Object)tailKey);
                }
                keepOldEntry = false;
            } else {
                if (isDebugEnabled) {
                    logger.debug("{}: No old entry for key={} head={} tail={} not removing old entry.", (Object)this, key, (Object)this.headKey, (Object)tailKey);
                }
                this.stats.incConflationIndexesMapSize();
                keepOldEntry = true;
            }
            if (!keepOldEntry) {
                Conflatable previous = (Conflatable)((Object)this.region.remove(previousIndex));
                this.stats.decQueueSize(1);
                if (isDebugEnabled) {
                    logger.debug("{}: Previous conflatable at key={} head={} tail={}: {}", (Object)this, (Object)previousIndex, (Object)this.headKey, (Object)tailKey, (Object)previous);
                    logger.debug("{}: Current conflatable at key={} head={} tail={}: {}", (Object)this, (Object)tailKey, (Object)this.headKey, (Object)tailKey, (Object)object);
                    if (previous != null) {
                        logger.debug("{}: Removed {} and added {} for key={} head={} tail={} in queue for region={} old event={}", (Object)this, previous.getValueToConflate(), object.getValueToConflate(), key, (Object)this.headKey, (Object)tailKey, (Object)rName, (Object)previous);
                    }
                }
            }
        } else if (isDebugEnabled) {
            logger.debug("{}: Not conflating {} queue size: {} head={} tail={}", (Object)this, (Object)object, (Object)this.size(), (Object)this.headKey, (Object)tailKey);
        }
        return keepOldEntry;
    }

    private AsyncEvent optimalGet(Long k) {
        LocalRegion lr = (LocalRegion)this.region;
        Object o = null;
        try {
            o = lr.getValueInVMOrDiskWithoutFaultIn(k);
            if (o != null && o instanceof CachedDeserializable) {
                o = ((CachedDeserializable)o).getDeserializedValue(lr, lr.getRegionEntry(k));
            }
        }
        catch (EntryNotFoundException entryNotFoundException) {
            // empty catch block
        }
        if (o == Token.TOMBSTONE) {
            o = null;
        }
        return (AsyncEvent)o;
    }

    private void removeIndex(Long qkey) {
        Conflatable object;
        AsyncEvent o;
        if (this.enableConflation && (o = this.optimalGet(qkey)) instanceof Conflatable && (object = (Conflatable)((Object)o)).shouldBeConflated()) {
            String rName = object.getRegionToConflate();
            Object key = object.getKeyToConflate();
            Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName);
            if (latestIndexesForRegion != null) {
                Long index = latestIndexesForRegion.remove(key);
                if (index != null) {
                    this.stats.decConflationIndexesMapSize();
                }
                if (logger.isDebugEnabled() && index != null) {
                    logger.debug("{}: Removed index {} for {}", (Object)this, (Object)index, (Object)object);
                }
            }
        }
    }

    private boolean before(long a, long b) {
        return a < b ^ a - b > 0x3FFFFFFFFFFFFFFFL;
    }

    private long inc(long value) {
        long val = value + 1L;
        val = val == Long.MAX_VALUE ? 0L : val;
        return val;
    }

    public void resetLastPeeked() {
        this.peekedIds.clear();
    }

    private Long getCurrentKey() {
        long currentKey;
        if (this.peekedIds.isEmpty()) {
            currentKey = this.getHeadKey();
        } else {
            Long lastPeek = this.peekedIds.peekLast();
            if (lastPeek == null) {
                return null;
            }
            currentKey = lastPeek + 1L;
        }
        return currentKey;
    }

    private AsyncEvent getObjectInSerialSenderQueue(Long currentKey) {
        AsyncEvent object = this.optimalGet(currentKey);
        if (null != object && logger.isDebugEnabled()) {
            logger.debug("{}: Peeked {}->{}", (Object)this, (Object)currentKey, (Object)object);
        }
        if (object != null && object instanceof GatewaySenderEventImpl) {
            GatewaySenderEventImpl copy = ((GatewaySenderEventImpl)object).makeHeapCopyIfOffHeap();
            if (copy == null) {
                logger.debug("Unable to make heap copy and will not be added to peekedIds for object : {} ", (Object)object.toString());
            }
            object = copy;
        }
        return object;
    }

    private AsyncEvent peekAhead() throws CacheException {
        AsyncEvent object = null;
        Long currentKey = this.getCurrentKey();
        if (currentKey == null) {
            return null;
        }
        while (this.before(currentKey, this.getTailKey()) && null == (object = this.getObjectInSerialSenderQueue(currentKey))) {
            if (logger.isTraceEnabled()) {
                logger.trace("{}: Trying head key + offset: {}", (Object)this, (Object)currentKey);
            }
            currentKey = this.inc(currentKey);
            if (this.stats == null) continue;
            this.stats.incEventsNotQueuedConflated();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Peeked {}->{}", (Object)this, (Object)currentKey, (Object)object);
        }
        if (object != null) {
            this.peekedIds.add(currentKey);
        }
        return object;
    }

    private long getTailKey() throws CacheException {
        this.initializeKeys();
        long tlKey = this.tailKey.get();
        if (logger.isTraceEnabled()) {
            logger.trace("{}: Determined tail key: {}", (Object)this, (Object)tlKey);
        }
        return tlKey;
    }

    private void incrementTailKey() throws CacheException {
        this.tailKey.set(this.inc(this.tailKey.get()));
        if (logger.isTraceEnabled()) {
            logger.trace("{}: Incremented TAIL_KEY for region {} to {}", (Object)this, (Object)this.region.getName(), (Object)this.tailKey);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeKeys() throws CacheException {
        if (this.tailKey.get() != -1L) {
            return;
        }
        SerialGatewaySenderQueue serialGatewaySenderQueue = this;
        synchronized (serialGatewaySenderQueue) {
            long largestKey = -1L;
            long largestKeyLessThanHalfMax = -1L;
            long smallestKey = -1L;
            long smallestKeyGreaterThanHalfMax = -1L;
            Set<Long> keySet = this.region.keySet();
            for (Long key : keySet) {
                long k = key;
                if (k > largestKey) {
                    largestKey = k;
                }
                if (k > largestKeyLessThanHalfMax && k < 0x3FFFFFFFFFFFFFFFL) {
                    largestKeyLessThanHalfMax = k;
                }
                if (k < smallestKey || smallestKey == -1L) {
                    smallestKey = k;
                }
                if (k >= smallestKeyGreaterThanHalfMax && smallestKeyGreaterThanHalfMax != -1L || k <= 0x3FFFFFFFFFFFFFFFL) continue;
                smallestKeyGreaterThanHalfMax = k;
            }
            if (smallestKeyGreaterThanHalfMax != -1L && largestKeyLessThanHalfMax != -1L && smallestKeyGreaterThanHalfMax - largestKeyLessThanHalfMax > 0x3FFFFFFFFFFFFFFFL) {
                this.headKey = smallestKeyGreaterThanHalfMax;
                this.tailKey.set(this.inc(largestKeyLessThanHalfMax));
                logger.info((Message)LocalizedMessage.create(LocalizedStrings.SingleWriteSingleReadRegionQueue_0_DURING_FAILOVER_DETECTED_THAT_KEYS_HAVE_WRAPPED, new Object[]{this, this.tailKey, this.headKey}));
            } else {
                this.headKey = smallestKey == -1L ? 0L : smallestKey;
                this.tailKey.set(this.inc(largestKey));
            }
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Initialized tail key to: {}, head key to: {}", (Object)this, (Object)this.tailKey, (Object)this.headKey);
            }
        }
    }

    private long getHeadKey() throws CacheException {
        this.initializeKeys();
        long hKey = this.headKey;
        if (logger.isTraceEnabled()) {
            logger.trace("{}: Determined head key: {}", (Object)this, (Object)hKey);
        }
        return hKey;
    }

    private void updateHeadKey(long destroyedKey) throws CacheException {
        this.headKey = this.inc(destroyedKey);
        if (logger.isTraceEnabled()) {
            logger.trace("{}: Incremented HEAD_KEY for region {} to {}", (Object)this, (Object)this.region.getName(), (Object)this.headKey);
        }
    }

    private void initializeRegion(AbstractGatewaySender sender, CacheListener listener) {
        InternalCache gemCache = sender.getCache();
        this.region = gemCache.getRegion(this.regionName);
        if (this.region == null) {
            AttributesFactory factory = new AttributesFactory();
            factory.setScope(NO_ACK ? Scope.DISTRIBUTED_NO_ACK : Scope.DISTRIBUTED_ACK);
            factory.setDataPolicy(this.enablePersistence ? DataPolicy.PERSISTENT_REPLICATE : DataPolicy.REPLICATE);
            if (logger.isDebugEnabled()) {
                logger.debug("The policy of region is {}", (Object)(this.enablePersistence ? DataPolicy.PERSISTENT_REPLICATE : DataPolicy.REPLICATE));
            }
            if (listener != null) {
                factory.addCacheListener(listener);
            }
            EvictionAttributes ea = EvictionAttributes.createLIFOMemoryAttributes(this.maximumQueueMemory, EvictionAction.OVERFLOW_TO_DISK);
            factory.setEvictionAttributes(ea);
            factory.setConcurrencyChecksEnabled(false);
            factory.setDiskStoreName(this.diskStoreName);
            factory.setDiskSynchronous(this.isDiskSynchronous);
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Attempting to create queue region: {}", (Object)this, (Object)this.regionName);
            }
            RegionAttributes ra = factory.create();
            try {
                SerialGatewaySenderQueueMetaRegion meta = new SerialGatewaySenderQueueMetaRegion(this.regionName, ra, null, gemCache, sender);
                try {
                    this.region = gemCache.createVMRegion(this.regionName, ra, new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true).setSnapshotInputStream(null).setImageTarget(null).setIsUsedForSerialGatewaySenderQueue(true).setInternalRegion(true).setSerialGatewaySender(sender));
                }
                catch (IOException veryUnLikely) {
                    logger.fatal((Message)LocalizedMessage.create(LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0, this.getClass()), (Throwable)veryUnLikely);
                }
                catch (ClassNotFoundException alsoUnlikely) {
                    logger.fatal((Message)LocalizedMessage.create(LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0, this.getClass()), (Throwable)alsoUnlikely);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: Created queue region: {}", (Object)this, this.region);
                }
            }
            catch (CacheException e) {
                logger.fatal((Message)LocalizedMessage.create(LocalizedStrings.SingleWriteSingleReadRegionQueue_0_THE_QUEUE_REGION_NAMED_1_COULD_NOT_BE_CREATED, new Object[]{this, this.regionName}), (Throwable)e);
            }
        } else {
            throw new IllegalStateException("Queue region " + this.region.getFullPath() + " already exists.");
        }
    }

    public void cleanUp() {
        if (this.removalThread != null) {
            this.removalThread.shutdown();
        }
    }

    public boolean isRemovalThreadAlive() {
        if (this.removalThread != null) {
            return this.removalThread.isAlive();
        }
        return false;
    }

    @Override
    public void close() {
        Region<Long, AsyncEvent> r = this.getRegion();
        if (r != null && !r.isDestroyed()) {
            try {
                r.close();
            }
            catch (RegionDestroyedException regionDestroyedException) {
                // empty catch block
            }
        }
    }

    public static class SerialGatewaySenderQueueMetaRegion
    extends DistributedRegion {
        AbstractGatewaySender sender = null;

        protected SerialGatewaySenderQueueMetaRegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion, InternalCache cache, AbstractGatewaySender sender) {
            super(regionName, attrs, parentRegion, cache, new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false).setSnapshotInputStream(null).setImageTarget(null).setIsUsedForSerialGatewaySenderQueue(true).setSerialGatewaySender(sender));
            this.sender = sender;
        }

        @Override
        public boolean supportsConcurrencyChecks() {
            return false;
        }

        @Override
        protected boolean isCopyOnRead() {
            return false;
        }

        @Override
        public boolean isSecret() {
            return true;
        }

        @Override
        public void createEventTracker() {
        }

        @Override
        protected boolean shouldNotifyBridgeClients() {
            return false;
        }

        @Override
        public boolean generateEventID() {
            return false;
        }

        @Override
        public boolean isUsedForSerialGatewaySenderQueue() {
            return true;
        }

        @Override
        public AbstractGatewaySender getSerialGatewaySender() {
            return this.sender;
        }

        @Override
        public void closeEntries() {
            OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable(){

                @Override
                public void run() {
                    SerialGatewaySenderQueueMetaRegion.super.closeEntries();
                }
            });
        }

        @Override
        public Set<VersionSource> clearEntries(final RegionVersionVector rvv) {
            final AtomicReference result = new AtomicReference();
            OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable(){

                @Override
                public void run() {
                    result.set(SerialGatewaySenderQueueMetaRegion.super.clearEntries(rvv));
                }
            });
            return (Set)result.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void basicDestroy(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue) throws EntryNotFoundException, CacheWriterException, TimeoutException {
            try {
                super.basicDestroy(event, cacheWrite, expectedOldValue);
            }
            finally {
                GatewaySenderEventImpl.release(event.getRawOldValue());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, boolean overwriteDestroyed) throws TimeoutException, CacheWriterException {
            try {
                boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, requireOldValue, lastModified, overwriteDestroyed);
                if (!success) {
                    GatewaySenderEventImpl.release(event.getRawNewValue());
                }
                boolean bl = success;
                return bl;
            }
            finally {
                GatewaySenderEventImpl.release(event.getRawOldValue());
            }
        }
    }

    private class BatchRemovalThread
    extends Thread {
        private volatile boolean shutdown = false;
        private final InternalCache cache;

        public BatchRemovalThread(InternalCache c) {
            this.setDaemon(true);
            this.cache = c;
        }

        private boolean checkCancelled() {
            if (this.shutdown) {
                return true;
            }
            return this.cache.getCancelCriterion().isCancelInProgress();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block32: {
                InternalDistributedSystem ids = this.cache.getInternalDistributedSystem();
                try {
                    while (true) {
                        try {
                            while (true) {
                                long temp;
                                if (this.checkCancelled()) {
                                    break block32;
                                }
                                boolean interrupted = Thread.interrupted();
                                try {
                                    BatchRemovalThread batchRemovalThread = this;
                                    synchronized (batchRemovalThread) {
                                        this.wait(messageSyncInterval * 1000);
                                    }
                                }
                                catch (InterruptedException e) {
                                    interrupted = true;
                                    if (this.checkCancelled()) {
                                        // empty if block
                                    }
                                    break block32;
                                }
                                finally {
                                    if (interrupted) {
                                        Thread.currentThread().interrupt();
                                    }
                                }
                                if (logger.isDebugEnabled()) {
                                    logger.debug("BatchRemovalThread about to send the last Dispatched key {}", (Object)SerialGatewaySenderQueue.this.lastDispatchedKey);
                                }
                                SerialGatewaySenderQueue serialGatewaySenderQueue = SerialGatewaySenderQueue.this;
                                synchronized (serialGatewaySenderQueue) {
                                    boolean wasEmpty;
                                    temp = SerialGatewaySenderQueue.this.lastDispatchedKey;
                                    boolean bl = wasEmpty = temp == SerialGatewaySenderQueue.this.lastDestroyedKey;
                                    while (SerialGatewaySenderQueue.this.lastDispatchedKey == SerialGatewaySenderQueue.this.lastDestroyedKey) {
                                        SerialGatewaySenderQueue.this.wait();
                                        temp = SerialGatewaySenderQueue.this.lastDispatchedKey;
                                    }
                                    if (wasEmpty) {
                                        continue;
                                    }
                                }
                                EntryEventImpl event = EntryEventImpl.create((LocalRegion)SerialGatewaySenderQueue.this.region, Operation.DESTROY, (Object)(SerialGatewaySenderQueue.this.lastDestroyedKey + 1L), null, null, false, this.cache.getMyId());
                                event.disallowOffHeapValues();
                                event.setTailKey(temp);
                                BatchDestroyOperation op = new BatchDestroyOperation(event);
                                op.distribute();
                                if (logger.isDebugEnabled()) {
                                    logger.debug("BatchRemovalThread completed destroy of keys from {} to {}", (Object)SerialGatewaySenderQueue.this.lastDestroyedKey, (Object)temp);
                                }
                                SerialGatewaySenderQueue.this.lastDestroyedKey = temp;
                            }
                        }
                        catch (CancelException e) {
                            if (logger.isDebugEnabled()) {
                                logger.debug("BatchRemovalThread is exiting due to cancellation");
                            }
                        }
                        catch (VirtualMachineError err) {
                            SystemFailure.initiateFailure(err);
                            throw err;
                        }
                        catch (Throwable t) {
                            SystemFailure.checkFailure();
                            if (this.checkCancelled()) {
                                break;
                            }
                            if (!logger.isDebugEnabled()) continue;
                            logger.debug("BatchRemovalThread: ignoring exception", t);
                            continue;
                        }
                        break;
                    }
                }
                catch (CancelException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("BatchRemovalThread exiting due to cancellation: " + e);
                    }
                }
                finally {
                    logger.info((Message)LocalizedMessage.create(LocalizedStrings.HARegionQueue_THE_QUEUEREMOVALTHREAD_IS_DONE));
                }
            }
        }

        public void shutdown() {
            this.shutdown = true;
            this.interrupt();
            boolean interrupted = Thread.interrupted();
            try {
                this.join(15000L);
            }
            catch (InterruptedException e) {
                interrupted = true;
            }
            finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
            if (this.isAlive()) {
                logger.warn((Message)LocalizedMessage.create(LocalizedStrings.HARegionQueue_QUEUEREMOVALTHREAD_IGNORED_CANCELLATION));
            }
        }
    }
}

