/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.wan;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.FixedPartitionResolver;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueStats;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallbackAdapter;
import org.apache.geode.cache.control.RebalanceFactory;
import org.apache.geode.cache.control.RebalanceOperation;
import org.apache.geode.cache.control.RebalanceResults;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewayReceiverFactory;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.CustomAsyncEventListener;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
import org.apache.geode.internal.cache.wan.MyAsyncEventListener2;
import org.apache.geode.internal.cache.wan.MyAsyncEventListener_CacheLoader;
import org.apache.geode.internal.cache.wan.MyCacheLoader;
import org.apache.geode.internal.cache.wan.MyGatewayEventSubstitutionFilter;
import org.apache.geode.internal.cache.wan.MyGatewaySenderEventListener;
import org.apache.geode.internal.cache.wan.SizeableGatewayEventSubstitutionFilter;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.awaitility.Awaitility;
import org.junit.Assert;

public class AsyncEventQueueTestBase
extends JUnit4DistributedTestCase {
    protected static Cache cache;
    protected static VM vm0;
    protected static VM vm1;
    protected static VM vm2;
    protected static VM vm3;
    protected static VM vm4;
    protected static AsyncEventListener eventListener1;
    private static final long MAX_WAIT = 60000L;
    protected static GatewayEventFilter eventFilter;
    protected static boolean destroyFlag;
    protected static List<Integer> dispatcherThreads;
    protected static int numDispatcherThreadsForTheRun;

    @Override
    public final void preSetUp() throws Exception {
        Host host = Host.getHost(0);
        vm0 = host.getVM(0);
        vm1 = host.getVM(1);
        vm2 = host.getVM(2);
        vm3 = host.getVM(3);
        vm4 = host.getVM(4);
    }

    @Override
    public final void postSetUp() throws Exception {
        AsyncEventQueueTestBase.shuffleNumDispatcherThreads();
        Invoke.invokeInEveryVM(() -> AsyncEventQueueTestBase.setNumDispatcherThreadsForTheRun(dispatcherThreads.get(0)));
    }

    public static void shuffleNumDispatcherThreads() {
        Collections.shuffle(dispatcherThreads);
    }

    public static void setNumDispatcherThreadsForTheRun(int numThreads) {
        numDispatcherThreadsForTheRun = numThreads;
    }

    public static Integer createFirstLocatorWithDSId(int dsId) {
        if (Locator.hasLocator()) {
            Locator.getLocator().stop();
        }
        AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        Properties props = test.getDistributedSystemProperties();
        props.setProperty("mcast-port", "0");
        props.setProperty("locators", "localhost[" + port + "]");
        props.setProperty("start-locator", "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
        test.startLocatorDistributedSystem(props);
        return port;
    }

    public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
        AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        Properties props = test.getDistributedSystemProperties();
        props.setProperty("mcast-port", "0");
        props.setProperty("distributed-system-id", "" + dsId);
        props.setProperty("locators", "localhost[" + port + "]");
        props.setProperty("start-locator", "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
        props.setProperty("remote-locators", "localhost[" + remoteLocPort + "]");
        test.startLocatorDistributedSystem(props);
        return port;
    }

    private void startLocatorDistributedSystem(Properties props) {
        System.setProperty("Locator.forceLocatorDMType", "true");
        try {
            this.getSystem(props);
        }
        finally {
            System.clearProperty("Locator.forceLocatorDMType");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createReplicatedRegionWithAsyncEventQueue(String regionName, String asyncQueueIds, Boolean offHeap) {
        IgnoredException exp1 = IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
        try {
            AttributesFactory fact = new AttributesFactory();
            AsyncEventQueueTestBase.addAsyncEventQueueIds(fact, asyncQueueIds);
            fact.setDataPolicy(DataPolicy.REPLICATE);
            fact.setOffHeap(offHeap.booleanValue());
            RegionFactory regionFactory = cache.createRegionFactory(fact.create());
            Region r = regionFactory.create(regionName);
            Assert.assertNotNull((Object)r);
        }
        finally {
            exp1.remove();
        }
    }

    public static void createReplicatedRegionWithCacheLoaderAndAsyncEventQueue(String regionName, String asyncQueueIds) {
        AttributesFactory fact = new AttributesFactory();
        AsyncEventQueueTestBase.addAsyncEventQueueIds(fact, asyncQueueIds);
        fact.setDataPolicy(DataPolicy.REPLICATE);
        fact.setCacheLoader((CacheLoader)new MyCacheLoader());
        RegionFactory regionFactory = cache.createRegionFactory(fact.create());
        Region r = regionFactory.create(regionName);
        Assert.assertNotNull((Object)r);
    }

    private static void addAsyncEventQueueIds(AttributesFactory fact, String asyncQueueIds) {
        if (asyncQueueIds != null) {
            StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
            while (tokenizer.hasMoreTokens()) {
                String asyncQueueId = tokenizer.nextToken();
                fact.addAsyncEventQueueId(asyncQueueId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createReplicatedRegionWithSenderAndAsyncEventQueue(String regionName, String senderIds, String asyncChannelId, Boolean offHeap) {
        IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
        try {
            AttributesFactory fact = new AttributesFactory();
            if (senderIds != null) {
                StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
                while (tokenizer.hasMoreTokens()) {
                    String senderId = tokenizer.nextToken();
                    fact.addGatewaySenderId(senderId);
                }
            }
            fact.setDataPolicy(DataPolicy.REPLICATE);
            fact.setOffHeap(offHeap.booleanValue());
            fact.setScope(Scope.DISTRIBUTED_ACK);
            RegionFactory regionFactory = cache.createRegionFactory(fact.create());
            regionFactory.addAsyncEventQueueId(asyncChannelId);
            Region r = regionFactory.create(regionName);
            Assert.assertNotNull((Object)r);
        }
        finally {
            exp.remove();
        }
    }

    public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous) {
        AsyncEventQueueTestBase.createAsyncEventQueue(asyncChannelId, isParallel, maxMemory, batchSize, isConflation, isPersistent, diskStoreName, isDiskSynchronous, (AsyncEventListener)new MyAsyncEventListener());
    }

    public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, AsyncEventListener asyncEventListener) {
        AsyncEventQueueTestBase.createAsyncEventQueue(asyncChannelId, isParallel, maxMemory, batchSize, isConflation, isPersistent, diskStoreName, isDiskSynchronous, numDispatcherThreadsForTheRun, asyncEventListener);
    }

    public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, int numDispatcherThreads, AsyncEventListener asyncEventListener) {
        AsyncEventQueueTestBase.createDiskStore(asyncChannelId, diskStoreName);
        AsyncEventQueueFactory factory = AsyncEventQueueTestBase.getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
        factory.setDiskSynchronous(isDiskSynchronous);
        factory.setBatchConflationEnabled(isConflation);
        factory.setDispatcherThreads(numDispatcherThreads);
        AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
    }

    private static void createDiskStore(String asyncChannelId, String diskStoreName) {
        if (diskStoreName != null) {
            File directory = new File(asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
            directory.mkdir();
            File[] dirs1 = new File[]{directory};
            DiskStoreFactory dsf = cache.createDiskStoreFactory();
            dsf.setDiskDirs(dirs1);
            DiskStore diskStore = dsf.create(diskStoreName);
        }
    }

    public static void createAsyncEventQueueWithListener2(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isPersistent, String diskStoreName) {
        AsyncEventQueueTestBase.createDiskStore(asyncChannelId, diskStoreName);
        MyAsyncEventListener2 asyncEventListener = new MyAsyncEventListener2();
        AsyncEventQueueFactory factory = AsyncEventQueueTestBase.getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
        factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
        AsyncEventQueue asyncChannel = factory.create(asyncChannelId, (AsyncEventListener)asyncEventListener);
    }

    public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
        AsyncEventQueueTestBase.createAsyncEventQueue(asyncChannelId, isParallel, maxMemory, batchSize, isConflation, isPersistent, diskStoreName, isDiskSynchronous, asyncListenerClass, null);
    }

    public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, String asyncListenerClass, String substitutionFilterClass) throws Exception {
        AsyncEventQueueTestBase.createDiskStore(asyncChannelId, diskStoreName);
        AsyncEventQueueFactory factory = AsyncEventQueueTestBase.getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
        factory.setDiskSynchronous(isDiskSynchronous);
        factory.setBatchConflationEnabled(isConflation);
        if (substitutionFilterClass != null) {
            factory.setGatewayEventSubstitutionListener((GatewayEventSubstitutionFilter)AsyncEventQueueTestBase.getClass(substitutionFilterClass).newInstance());
        }
        factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
        AsyncEventQueue asyncChannel = factory.create(asyncChannelId, (AsyncEventListener)AsyncEventQueueTestBase.getClass(asyncListenerClass).newInstance());
    }

    private static Class getClass(String simpleClassName) throws Exception {
        String packagePrefix = "org.apache.geode.internal.cache.wan.";
        String className = packagePrefix + simpleClassName;
        Class<?> clazz = null;
        clazz = Class.forName(className);
        return clazz;
    }

    public static void createAsyncEventQueueWithCustomListener(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous) {
        AsyncEventQueueTestBase.createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel, maxMemory, batchSize, isConflation, isPersistent, diskStoreName, isDiskSynchronous, 5);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createAsyncEventQueueWithCustomListener(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, int nDispatchers) {
        IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
        try {
            AsyncEventQueueTestBase.createDiskStore(asyncChannelId, diskStoreName);
            CustomAsyncEventListener asyncEventListener = new CustomAsyncEventListener();
            AsyncEventQueueFactory factory = AsyncEventQueueTestBase.getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
            factory.setDispatcherThreads(nDispatchers);
            AsyncEventQueue asyncEventQueue = factory.create(asyncChannelId, (AsyncEventListener)asyncEventListener);
        }
        finally {
            exp.remove();
        }
    }

    private static AsyncEventQueueFactory getInitialAsyncEventQueueFactory(boolean isParallel, Integer maxMemory, Integer batchSize, boolean isPersistent, String diskStoreName) {
        AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
        factory.setBatchSize(batchSize.intValue());
        factory.setPersistent(isPersistent);
        factory.setDiskStoreName(diskStoreName);
        factory.setMaximumQueueMemory(maxMemory.intValue());
        factory.setParallel(isParallel);
        return factory;
    }

    public static void createConcurrentAsyncEventQueue(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, int dispatcherThreads, GatewaySender.OrderPolicy policy) {
        AsyncEventQueueTestBase.createDiskStore(asyncChannelId, diskStoreName);
        MyAsyncEventListener asyncEventListener = new MyAsyncEventListener();
        AsyncEventQueueFactory factory = AsyncEventQueueTestBase.getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
        factory.setDiskSynchronous(isDiskSynchronous);
        factory.setBatchConflationEnabled(isConflation);
        factory.setOrderPolicy(policy);
        AsyncEventQueue asyncChannel = factory.create(asyncChannelId, (AsyncEventListener)asyncEventListener);
    }

    public static String createAsyncEventQueueWithDiskStore(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isPersistent, String diskStoreName) {
        MyAsyncEventListener asyncEventListener = new MyAsyncEventListener();
        File persistentDirectory = null;
        persistentDirectory = diskStoreName == null ? new File(asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()) : new File(diskStoreName);
        LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName());
        persistentDirectory.mkdir();
        DiskStoreFactory dsf = cache.createDiskStoreFactory();
        File[] dirs1 = new File[]{persistentDirectory};
        AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
        factory.setBatchSize(batchSize.intValue());
        factory.setParallel(isParallel);
        if (isPersistent) {
            factory.setPersistent(isPersistent);
            factory.setDiskStoreName(dsf.setDiskDirs(dirs1).create(asyncChannelId).getName());
        }
        factory.setMaximumQueueMemory(maxMemory.intValue());
        factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
        AsyncEventQueue asyncChannel = factory.create(asyncChannelId, (AsyncEventListener)asyncEventListener);
        return persistentDirectory.getName();
    }

    public static void pauseAsyncEventQueue(String asyncChannelId) {
        AsyncEventQueue theChannel = null;
        Set asyncEventChannels = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncChannel : asyncEventChannels) {
            if (!asyncChannelId.equals(asyncChannel.getId())) continue;
            theChannel = asyncChannel;
        }
        ((AsyncEventQueueImpl)theChannel).getSender().pause();
    }

    public static void pauseAsyncEventQueueAndWaitForDispatcherToPause(String asyncChannelId) {
        AsyncEventQueue theChannel = null;
        Set asyncEventChannels = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncChannel : asyncEventChannels) {
            if (!asyncChannelId.equals(asyncChannel.getId())) continue;
            theChannel = asyncChannel;
            break;
        }
        ((AsyncEventQueueImpl)theChannel).getSender().pause();
        ((AbstractGatewaySender)((AsyncEventQueueImpl)theChannel).getSender()).getEventProcessor().waitForDispatcherToPause();
    }

    public static void resumeAsyncEventQueue(String asyncQueueId) {
        AsyncEventQueue theQueue = null;
        Set asyncEventChannels = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncChannel : asyncEventChannels) {
            if (!asyncQueueId.equals(asyncChannel.getId())) continue;
            theQueue = asyncChannel;
        }
        ((AsyncEventQueueImpl)theQueue).getSender().resume();
    }

    public static void waitForAsyncEventQueueSize(String senderId, int numQueueEntries, boolean localSize) {
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).untilAsserted(() -> AsyncEventQueueTestBase.checkAsyncEventQueueSize(senderId, numQueueEntries, localSize));
    }

    public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries) {
        AsyncEventQueueTestBase.checkAsyncEventQueueSize(asyncQueueId, numQueueEntries, false);
    }

    public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries, boolean localSize) {
        AsyncEventQueueImpl aeq = (AsyncEventQueueImpl)cache.getAsyncEventQueue(asyncQueueId);
        InternalGatewaySender sender = aeq.getSender();
        if (sender.isParallel()) {
            Set queues = ((AbstractGatewaySender)sender).getQueues();
            Region queueRegion = queues.toArray(new RegionQueue[queues.size()])[0].getRegion();
            if (localSize) {
                queueRegion = PartitionRegionHelper.getLocalData((Region)queueRegion);
            }
            Assert.assertEquals((long)numQueueEntries, (long)queueRegion.size());
        } else {
            Set queues = ((AbstractGatewaySender)sender).getQueues();
            int size = 0;
            for (RegionQueue q : queues) {
                size += q.size();
            }
            Assert.assertEquals((long)numQueueEntries, (long)size);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets) {
        IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
        IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
        try {
            AttributesFactory fact = new AttributesFactory();
            if (senderIds != null) {
                StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
                while (tokenizer.hasMoreTokens()) {
                    String senderId = tokenizer.nextToken();
                    fact.addGatewaySenderId(senderId);
                }
            }
            PartitionAttributesFactory pfact = new PartitionAttributesFactory();
            pfact.setTotalNumBuckets(totalNumBuckets.intValue());
            pfact.setRedundantCopies(redundantCopies.intValue());
            pfact.setRecoveryDelay(0L);
            fact.setPartitionAttributes(pfact.create());
            Region r = cache.createRegionFactory(fact.create()).create(regionName);
            Assert.assertNotNull((Object)r);
        }
        finally {
            exp.remove();
            exp1.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createPartitionedRegionWithAsyncEventQueue(String regionName, String asyncEventQueueId, Boolean offHeap) {
        IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
        IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
        try {
            AttributesFactory fact = new AttributesFactory();
            PartitionAttributesFactory pfact = new PartitionAttributesFactory();
            pfact.setTotalNumBuckets(16);
            fact.setPartitionAttributes(pfact.create());
            fact.setOffHeap(offHeap.booleanValue());
            Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId).create(regionName);
            Assert.assertNotNull((Object)r);
        }
        finally {
            exp.remove();
            exp1.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createFixedPartitionedRegionWithAsyncEventQueue(String regionName, String asyncEventQueueId, String partitionName, List<String> allPartitions, boolean offHeap) {
        IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
        IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
        try {
            AttributesFactory fact = new AttributesFactory();
            PartitionAttributesFactory pfact = new PartitionAttributesFactory();
            pfact.setTotalNumBuckets(16);
            pfact.addFixedPartitionAttributes(FixedPartitionAttributes.createFixedPartition((String)partitionName, (boolean)true));
            pfact.setPartitionResolver((PartitionResolver)new MyFixedPartitionResolver(allPartitions));
            fact.setPartitionAttributes(pfact.create());
            fact.setOffHeap(offHeap);
            Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId).create(regionName);
            Assert.assertNotNull((Object)r);
        }
        finally {
            exp.remove();
            exp1.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createColocatedPartitionedRegionWithAsyncEventQueue(String regionName, String asyncEventQueueId, Integer totalNumBuckets, String colocatedWith) {
        IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
        IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
        try {
            AttributesFactory fact = new AttributesFactory();
            PartitionAttributesFactory pfact = new PartitionAttributesFactory();
            pfact.setTotalNumBuckets(totalNumBuckets.intValue());
            pfact.setColocatedWith(colocatedWith);
            fact.setPartitionAttributes(pfact.create());
            Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId).create(regionName);
            Assert.assertNotNull((Object)r);
        }
        finally {
            exp.remove();
            exp1.remove();
        }
    }

    public static void createPartitionedRegionWithCacheLoaderAndAsyncQueue(String regionName, String asyncEventQueueId) {
        AttributesFactory fact = new AttributesFactory();
        PartitionAttributesFactory pfact = new PartitionAttributesFactory();
        pfact.setTotalNumBuckets(16);
        fact.setPartitionAttributes(pfact.create());
        fact.setCacheLoader((CacheLoader)new MyCacheLoader());
        Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId).create(regionName);
        Assert.assertNotNull((Object)r);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createPRWithRedundantCopyWithAsyncEventQueue(String regionName, String asyncEventQueueId, Boolean offHeap) throws InterruptedException {
        IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
        final CountDownLatch recoveryDone = new CountDownLatch(2);
        InternalResourceManager.ResourceObserverAdapter observer = new InternalResourceManager.ResourceObserverAdapter(){

            public void recoveryFinished(Region region) {
                recoveryDone.countDown();
            }
        };
        InternalResourceManager.setResourceObserver((InternalResourceManager.ResourceObserver)observer);
        try {
            AttributesFactory fact = new AttributesFactory();
            PartitionAttributesFactory pfact = new PartitionAttributesFactory();
            pfact.setTotalNumBuckets(16);
            pfact.setRedundantCopies(1);
            fact.setPartitionAttributes(pfact.create());
            fact.setOffHeap(offHeap.booleanValue());
            Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId).create(regionName);
            Assert.assertNotNull((Object)r);
            recoveryDone.await();
        }
        finally {
            exp.remove();
        }
    }

    public static void createPartitionedRegionAccessorWithAsyncEventQueue(String regionName, String asyncEventQueueId) {
        AttributesFactory fact = new AttributesFactory();
        PartitionAttributesFactory pfact = new PartitionAttributesFactory();
        pfact.setTotalNumBuckets(16);
        pfact.setLocalMaxMemory(0);
        fact.setPartitionAttributes(pfact.create());
        Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId).create(regionName);
        Assert.assertNotNull((Object)r);
    }

    protected static void createCache(Integer locPort) {
        AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
        Properties props = test.getDistributedSystemProperties();
        props.setProperty("mcast-port", "0");
        props.setProperty("locators", "localhost[" + locPort + "]");
        InternalDistributedSystem ds = test.getSystem(props);
        cache = CacheFactory.create((DistributedSystem)ds);
    }

    public static void createCacheWithoutLocator(Integer mCastPort) {
        AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
        Properties props = test.getDistributedSystemProperties();
        props.setProperty("mcast-port", "" + mCastPort);
        InternalDistributedSystem ds = test.getSystem(props);
        cache = CacheFactory.create((DistributedSystem)ds);
    }

    public static void checkAsyncEventQueueStats(String queueId, int queueSize, int secondaryQueueSize, int eventsReceived, int eventsQueued, int eventsDistributed) {
        Set asyncQueues = cache.getAsyncEventQueues();
        AsyncEventQueue queue = null;
        boolean isParallel = false;
        for (AsyncEventQueue q : asyncQueues) {
            isParallel = q.isParallel();
            if (!q.getId().equals(queueId)) continue;
            queue = q;
            break;
        }
        AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
        Awaitility.await().atMost(120L, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertEquals((String)("Expected queue entries: " + queueSize + " but actual entries: " + statistics.getEventQueueSize()), (long)queueSize, (long)statistics.getEventQueueSize()));
        if (isParallel) {
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertEquals((String)("Expected events in the secondary queue is " + secondaryQueueSize + ", but actual is " + statistics.getSecondaryEventQueueSize()), (long)secondaryQueueSize, (long)statistics.getSecondaryEventQueueSize()));
        } else {
            Assert.assertEquals((long)0L, (long)statistics.getSecondaryEventQueueSize());
        }
        Assert.assertEquals((long)queueSize, (long)statistics.getEventQueueSize());
        Assert.assertEquals((long)eventsReceived, (long)statistics.getEventsReceived());
        Assert.assertEquals((long)eventsQueued, (long)statistics.getEventsQueued());
        assert (statistics.getEventsDistributed() >= eventsDistributed);
    }

    public static void checkAsyncEventQueueConflatedStats(String asyncEventQueueId, int eventsConflated) {
        Set queues = cache.getAsyncEventQueues();
        AsyncEventQueue queue = null;
        for (AsyncEventQueue q : queues) {
            if (!q.getId().equals(asyncEventQueueId)) continue;
            queue = q;
            break;
        }
        AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
        Assert.assertEquals((long)eventsConflated, (long)statistics.getEventsNotQueuedConflated());
    }

    public static void checkAsyncEventQueueStats_Failover(String asyncEventQueueId, int eventsReceived) {
        Set asyncEventQueues = cache.getAsyncEventQueues();
        AsyncEventQueue queue = null;
        for (AsyncEventQueue q : asyncEventQueues) {
            if (!q.getId().equals(asyncEventQueueId)) continue;
            queue = q;
            break;
        }
        AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
        Assert.assertEquals((long)eventsReceived, (long)statistics.getEventsReceived());
        Assert.assertEquals((long)eventsReceived, (long)(statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary() + statistics.getUnprocessedEventsRemovedByPrimary()));
    }

    public static void checkAsyncEventQueueBatchStats(String asyncQueueId, int batches) {
        Set queues = cache.getAsyncEventQueues();
        AsyncEventQueue queue = null;
        for (AsyncEventQueue q : queues) {
            if (!q.getId().equals(asyncQueueId)) continue;
            queue = q;
            break;
        }
        AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
        assert (statistics.getBatchesDistributed() >= batches);
        Assert.assertEquals((long)0L, (long)statistics.getBatchesRedistributed());
    }

    public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId, int events) {
        Set asyncQueues = cache.getAsyncEventQueues();
        AsyncEventQueue queue = null;
        for (AsyncEventQueue q : asyncQueues) {
            if (!q.getId().equals(asyncQueueId)) continue;
            queue = q;
            break;
        }
        AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
        Assert.assertEquals((long)events, (long)(statistics.getUnprocessedEventsAddedBySecondary() + statistics.getUnprocessedTokensRemovedBySecondary()));
        Assert.assertEquals((long)events, (long)(statistics.getUnprocessedEventsRemovedByPrimary() + statistics.getUnprocessedTokensAddedByPrimary()));
    }

    public static void setRemoveFromQueueOnException(String senderId, boolean removeFromQueue) {
        Set senders = cache.getGatewaySenders();
        GatewaySender sender = null;
        for (GatewaySender s : senders) {
            if (!s.getId().equals(senderId)) continue;
            sender = s;
            break;
        }
        Assert.assertNotNull(sender);
        ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(removeFromQueue);
    }

    public static void unsetRemoveFromQueueOnException(String senderId) {
        Set senders = cache.getGatewaySenders();
        GatewaySender sender = null;
        for (GatewaySender s : senders) {
            if (!s.getId().equals(senderId)) continue;
            sender = s;
            break;
        }
        Assert.assertNotNull(sender);
        ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(false);
    }

    public static void waitForSenderToBecomePrimary(String senderId) {
        Set senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
        final GatewaySender sender = AsyncEventQueueTestBase.getGatewaySenderById(senders, senderId);
        WaitCriterion wc = new WaitCriterion(){

            @Override
            public boolean done() {
                return sender != null && ((AbstractGatewaySender)sender).isPrimary();
            }

            @Override
            public String description() {
                return "Expected sender primary state to be true but is false";
            }
        };
        Wait.waitForCriterion(wc, 10000L, 1000L, true);
    }

    private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
        for (GatewaySender s : senders) {
            if (!s.getId().equals(senderId)) continue;
            return s;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter, boolean isManulaStart) {
        IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
        try {
            File persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
            persistentDirectory.mkdir();
            DiskStoreFactory dsf = cache.createDiskStoreFactory();
            File[] dirs1 = new File[]{persistentDirectory};
            if (isParallel) {
                GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
                gateway.setParallel(true);
                gateway.setMaximumQueueMemory(maxMemory.intValue());
                gateway.setBatchSize(batchSize.intValue());
                gateway.setManualStart(isManulaStart);
                gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
                ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback((LocatorDiscoveryCallback)new MyLocatorCallback());
                if (filter != null) {
                    eventFilter = filter;
                    gateway.addGatewayEventFilter(filter);
                }
                if (isPersistent) {
                    gateway.setPersistenceEnabled(true);
                    gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
                } else {
                    DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
                    gateway.setDiskStoreName(store.getName());
                }
                gateway.setBatchConflationEnabled(isConflation);
                gateway.create(dsName, remoteDsId);
            } else {
                GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
                gateway.setMaximumQueueMemory(maxMemory.intValue());
                gateway.setBatchSize(batchSize.intValue());
                gateway.setManualStart(isManulaStart);
                gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
                ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback((LocatorDiscoveryCallback)new MyLocatorCallback());
                if (filter != null) {
                    eventFilter = filter;
                    gateway.addGatewayEventFilter(filter);
                }
                gateway.setBatchConflationEnabled(isConflation);
                if (isPersistent) {
                    gateway.setPersistenceEnabled(true);
                    gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
                } else {
                    DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
                    gateway.setDiskStoreName(store.getName());
                }
                gateway.create(dsName, remoteDsId);
            }
        }
        finally {
            exln.remove();
        }
    }

    public static void pauseWaitCriteria(final long millisec) {
        WaitCriterion wc = new WaitCriterion(){

            @Override
            public boolean done() {
                return false;
            }

            @Override
            public String description() {
                return "Expected to wait for " + millisec + " millisec.";
            }
        };
        Wait.waitForCriterion(wc, millisec, 500L, false);
    }

    public static int createReceiver(int locPort) {
        AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
        Properties props = test.getDistributedSystemProperties();
        props.setProperty("mcast-port", "0");
        props.setProperty("locators", "localhost[" + locPort + "]");
        InternalDistributedSystem ds = test.getSystem(props);
        cache = CacheFactory.create((DistributedSystem)ds);
        GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        fact.setStartPort(port);
        fact.setEndPort(port);
        fact.setManualStart(true);
        GatewayReceiver receiver = fact.create();
        try {
            receiver.start();
        }
        catch (IOException e) {
            e.printStackTrace();
            Assert.fail((String)("Test " + test.getName() + " failed to start GatewayRecevier on port " + port));
        }
        return port;
    }

    public static String makePath(String[] strings) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < strings.length; ++i) {
            sb.append(strings[i]);
            sb.append(File.separator);
        }
        return sb.toString();
    }

    public static void doRebalance() {
        ResourceManager resMan = cache.getResourceManager();
        boolean heapEviction = resMan.getEvictionHeapPercentage() > 0.0f;
        RebalanceFactory factory = resMan.createRebalanceFactory();
        try {
            RebalanceResults simulateResults = null;
            if (!heapEviction) {
                LogWriterUtils.getLogWriter().info("Calling rebalance simulate");
                RebalanceOperation simulateOp = factory.simulate();
                simulateResults = simulateOp.getResults();
            }
            LogWriterUtils.getLogWriter().info("Starting rebalancing");
            RebalanceOperation rebalanceOp = factory.start();
            RebalanceResults rebalanceResults = rebalanceOp.getResults();
        }
        catch (InterruptedException e) {
            org.apache.geode.test.dunit.Assert.fail("Interrupted", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void doPuts(String regionName, int numPuts) {
        IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class.getName());
        IgnoredException exp2 = IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
        try {
            Region r = cache.getRegion("/" + regionName);
            Assert.assertNotNull((Object)r);
            for (long i = 0L; i < (long)numPuts; ++i) {
                r.put((Object)i, (Object)i);
            }
        }
        finally {
            exp1.remove();
            exp2.remove();
        }
    }

    public static void doHeavyPuts(String regionName, int numPuts) {
        Region r = cache.getRegion("/" + regionName);
        Assert.assertNotNull((Object)r);
        for (long i = 0L; i < (long)numPuts; ++i) {
            r.put((Object)i, (Object)new byte[0x100000]);
        }
    }

    public static void doGets(String regionName, int numGets) {
        Region r = cache.getRegion("/" + regionName);
        Assert.assertNotNull((Object)r);
        for (long i = 0L; i < (long)numGets; ++i) {
            r.get((Object)i);
        }
    }

    public static void doPutsFrom(String regionName, int from, int numPuts) {
        Region r = cache.getRegion("/" + regionName);
        Assert.assertNotNull((Object)r);
        for (long i = (long)from; i < (long)numPuts; ++i) {
            r.put((Object)i, (Object)i);
        }
    }

    public static void doPutAll(String regionName, int numPuts, int size) {
        Region r = cache.getRegion("/" + regionName);
        Assert.assertNotNull((Object)r);
        for (long i = 0L; i < (long)numPuts; ++i) {
            HashMap<Long, Long> putAllMap = new HashMap<Long, Long>();
            for (long j = 0L; j < (long)size; ++j) {
                putAllMap.put((long)size * i + j, i);
            }
            r.putAll(putAllMap, (Object)"putAllCallback");
            putAllMap.clear();
        }
    }

    public static void putGivenKeyValue(String regionName, Map keyValues) {
        Region r = cache.getRegion("/" + regionName);
        Assert.assertNotNull((Object)r);
        for (Object key : keyValues.keySet()) {
            r.put(key, keyValues.get(key));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void doNextPuts(String regionName, int start, int numPuts) {
        IgnoredException exp = IgnoredException.addIgnoredException(CacheClosedException.class.getName());
        try {
            Region r = cache.getRegion("/" + regionName);
            Assert.assertNotNull((Object)r);
            for (long i = (long)start; i < (long)numPuts; ++i) {
                r.put((Object)i, (Object)i);
            }
        }
        finally {
            exp.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void validateRegionSize(String regionName, final int regionSize) {
        IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
        IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class.getName());
        try {
            final Region r = cache.getRegion("/" + regionName);
            Assert.assertNotNull((Object)r);
            WaitCriterion wc = new WaitCriterion(){

                @Override
                public boolean done() {
                    return r.keySet().size() == regionSize;
                }

                @Override
                public String description() {
                    return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet();
                }
            };
            Wait.waitForCriterion(wc, 240000L, 500L, true);
        }
        finally {
            exp.remove();
            exp1.remove();
        }
    }

    public static void validateAsyncEventQueueAttributes(String asyncChannelId, int maxQueueMemory, int batchSize, int batchTimeInterval, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, boolean batchConflationEnabled) {
        AsyncEventQueue theChannel = null;
        Set asyncEventChannels = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncChannel : asyncEventChannels) {
            if (!asyncChannelId.equals(asyncChannel.getId())) continue;
            theChannel = asyncChannel;
        }
        InternalGatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender();
        Assert.assertEquals((String)"maxQueueMemory", (long)maxQueueMemory, (long)theSender.getMaximumQueueMemory());
        Assert.assertEquals((String)"batchSize", (long)batchSize, (long)theSender.getBatchSize());
        Assert.assertEquals((String)"batchTimeInterval", (long)batchTimeInterval, (long)theSender.getBatchTimeInterval());
        Assert.assertEquals((String)"isPersistent", (Object)isPersistent, (Object)theSender.isPersistenceEnabled());
        Assert.assertEquals((String)"diskStoreName", (Object)diskStoreName, (Object)theSender.getDiskStoreName());
        Assert.assertEquals((String)"isDiskSynchronous", (Object)isDiskSynchronous, (Object)theSender.isDiskSynchronous());
        Assert.assertEquals((String)"batchConflation", (Object)batchConflationEnabled, (Object)theSender.isBatchConflationEnabled());
    }

    public static void validateConcurrentAsyncEventQueueAttributes(String asyncChannelId, int maxQueueMemory, int batchSize, int batchTimeInterval, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, boolean batchConflationEnabled, int dispatcherThreads, GatewaySender.OrderPolicy policy) {
        AsyncEventQueue theChannel = null;
        Set asyncEventChannels = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncChannel : asyncEventChannels) {
            if (!asyncChannelId.equals(asyncChannel.getId())) continue;
            theChannel = asyncChannel;
        }
        InternalGatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender();
        Assert.assertEquals((String)"maxQueueMemory", (long)maxQueueMemory, (long)theSender.getMaximumQueueMemory());
        Assert.assertEquals((String)"batchSize", (long)batchSize, (long)theSender.getBatchSize());
        Assert.assertEquals((String)"batchTimeInterval", (long)batchTimeInterval, (long)theSender.getBatchTimeInterval());
        Assert.assertEquals((String)"isPersistent", (Object)isPersistent, (Object)theSender.isPersistenceEnabled());
        Assert.assertEquals((String)"diskStoreName", (Object)diskStoreName, (Object)theSender.getDiskStoreName());
        Assert.assertEquals((String)"isDiskSynchronous", (Object)isDiskSynchronous, (Object)theSender.isDiskSynchronous());
        Assert.assertEquals((String)"batchConflation", (Object)batchConflationEnabled, (Object)theSender.isBatchConflationEnabled());
        Assert.assertEquals((String)"dispatcherThreads", (long)dispatcherThreads, (long)theSender.getDispatcherThreads());
        Assert.assertEquals((String)"orderPolicy", (Object)policy, (Object)theSender.getOrderPolicy());
    }

    public static void validateAsyncEventListener(String asyncQueueId, final int expectedSize) {
        AsyncEventListener theListener = null;
        Set asyncEventQueues = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncQueue : asyncEventQueues) {
            if (!asyncQueueId.equals(asyncQueue.getId())) continue;
            theListener = asyncQueue.getAsyncEventListener();
        }
        final Map eventsMap = ((MyAsyncEventListener)theListener).getEventsMap();
        Assert.assertNotNull((Object)eventsMap);
        WaitCriterion wc = new WaitCriterion(){

            @Override
            public boolean done() {
                return eventsMap.size() == expectedSize;
            }

            @Override
            public String description() {
                return "Expected map entries: " + expectedSize + " but actual entries: " + eventsMap.size();
            }
        };
        Wait.waitForCriterion(wc, 60000L, 500L, true);
    }

    public static void validateAsyncEventForOperationDetail(String asyncQueueId, final int expectedSize, boolean isLoad, boolean isPutAll) {
        AsyncEventListener theListener = null;
        Set asyncEventQueues = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncQueue : asyncEventQueues) {
            if (!asyncQueueId.equals(asyncQueue.getId())) continue;
            theListener = asyncQueue.getAsyncEventListener();
        }
        final Map eventsMap = ((MyAsyncEventListener_CacheLoader)theListener).getEventsMap();
        Assert.assertNotNull((Object)eventsMap);
        WaitCriterion wc = new WaitCriterion(){

            @Override
            public boolean done() {
                return eventsMap.size() == expectedSize;
            }

            @Override
            public String description() {
                return "Expected map entries: " + expectedSize + " but actual entries: " + eventsMap.size();
            }
        };
        Wait.waitForCriterion(wc, 60000L, 500L, true);
        Collection values = eventsMap.values();
        for (AsyncEvent asyncEvent : values) {
            if (isLoad) {
                Assert.assertTrue((boolean)asyncEvent.getOperation().isLoad());
            }
            if (!isPutAll) continue;
            Assert.assertTrue((boolean)asyncEvent.getOperation().isPutAll());
        }
    }

    public static void validateCustomAsyncEventListener(String asyncQueueId, final int expectedSize) {
        AsyncEventListener theListener = null;
        Set asyncEventQueues = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncQueue : asyncEventQueues) {
            if (!asyncQueueId.equals(asyncQueue.getId())) continue;
            theListener = asyncQueue.getAsyncEventListener();
        }
        final Map eventsMap = ((CustomAsyncEventListener)theListener).getEventsMap();
        Assert.assertNotNull((Object)eventsMap);
        WaitCriterion wc = new WaitCriterion(){

            @Override
            public boolean done() {
                return eventsMap.size() == expectedSize;
            }

            @Override
            public String description() {
                return "Expected map entries: " + expectedSize + " but actual entries: " + eventsMap.size();
            }
        };
        Wait.waitForCriterion(wc, 60000L, 500L, true);
        for (AsyncEvent event : eventsMap.values()) {
            Assert.assertTrue((String)("possibleDuplicate should be true for event: " + event), (boolean)event.getPossibleDuplicate());
        }
    }

    public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) {
        AsyncEventQueue theAsyncEventQueue = cache.getAsyncEventQueue(asyncQueueId);
        InternalGatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue).getSender();
        if (sender.isParallel()) {
            final Set queues = ((AbstractGatewaySender)sender).getQueues();
            WaitCriterion wc = new WaitCriterion(){

                @Override
                public boolean done() {
                    int size = 0;
                    for (RegionQueue q : queues) {
                        size += q.size();
                    }
                    return size == 0;
                }

                @Override
                public String description() {
                    int size = 0;
                    for (RegionQueue q : queues) {
                        size += q.size();
                    }
                    return "Expected queue size to be : 0 but actual entries: " + size;
                }
            };
            Wait.waitForCriterion(wc, 60000L, 500L, true);
        } else {
            WaitCriterion wc = new WaitCriterion((GatewaySender)sender){
                final /* synthetic */ GatewaySender val$sender;
                {
                    this.val$sender = gatewaySender;
                }

                @Override
                public boolean done() {
                    Set queues = ((AbstractGatewaySender)this.val$sender).getQueues();
                    int size = 0;
                    for (RegionQueue q : queues) {
                        size += q.size();
                    }
                    return size == 0;
                }

                @Override
                public String description() {
                    Set queues = ((AbstractGatewaySender)this.val$sender).getQueues();
                    int size = 0;
                    for (RegionQueue q : queues) {
                        size += q.size();
                    }
                    return "Expected queue size to be : 0 but actual entries: " + size;
                }
            };
            Wait.waitForCriterion(wc, 60000L, 500L, true);
        }
    }

    public static void verifyAsyncEventListenerForPossibleDuplicates(String asyncEventQueueId, Set<Integer> bucketIds, int batchSize) {
        AsyncEventListener theListener = null;
        Set asyncEventQueues = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncQueue : asyncEventQueues) {
            if (!asyncEventQueueId.equals(asyncQueue.getId())) continue;
            theListener = asyncQueue.getAsyncEventListener();
        }
        Map bucketToEventsMap = ((MyAsyncEventListener2)theListener).getBucketToEventsMap();
        Assert.assertNotNull((Object)bucketToEventsMap);
        Assert.assertTrue((bucketIds.size() > 1 ? 1 : 0) != 0);
        for (int bucketId : bucketIds) {
            List eventsForBucket = (List)bucketToEventsMap.get(bucketId);
            LogWriterUtils.getLogWriter().info("Events for bucket: " + bucketId + " is " + eventsForBucket);
            Assert.assertNotNull((Object)eventsForBucket);
            for (int i = 0; i < batchSize; ++i) {
                GatewaySenderEventImpl senderEvent = (GatewaySenderEventImpl)eventsForBucket.get(i);
                Assert.assertTrue((boolean)senderEvent.getPossibleDuplicate());
            }
        }
    }

    public static void verifySubstitutionFilterInvocations(String asyncEventQueueId, int expectedNumInvocations) {
        AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId);
        Assert.assertNotNull((Object)queue);
        MyGatewayEventSubstitutionFilter filter = (MyGatewayEventSubstitutionFilter)queue.getGatewayEventSubstitutionFilter();
        Assert.assertNotNull((Object)filter);
        Assert.assertEquals((long)expectedNumInvocations, (long)filter.getNumInvocations());
        MyAsyncEventListener listener = (MyAsyncEventListener)queue.getAsyncEventListener();
        Map eventsMap = listener.getEventsMap();
        Assert.assertNotNull((Object)eventsMap);
        Assert.assertEquals((long)expectedNumInvocations, (long)eventsMap.size());
        for (Map.Entry entry : eventsMap.entrySet()) {
            Assert.assertEquals((Object)("substituted_" + entry.getKey()), entry.getValue());
        }
    }

    public static void verifySubstitutionFilterToDataInvocations(String asyncEventQueueId, int expectedToDataInvoations) {
        AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId);
        Assert.assertNotNull((Object)queue);
        SizeableGatewayEventSubstitutionFilter filter = (SizeableGatewayEventSubstitutionFilter)queue.getGatewayEventSubstitutionFilter();
        Assert.assertNotNull((Object)filter);
        Assert.assertEquals((long)expectedToDataInvoations, (long)filter.getNumToDataInvocations());
    }

    public static AsyncEventListener getAsyncEventListener(String asyncEventQueueId) {
        Object theListener = null;
        Set asyncEventQueues = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncQueue : asyncEventQueues) {
            if (!asyncEventQueueId.equals(asyncQueue.getId())) continue;
            return asyncQueue.getAsyncEventListener();
        }
        return null;
    }

    public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
        AsyncEventListener theListener = AsyncEventQueueTestBase.getAsyncEventListener(asyncEventQueueId);
        Map eventsMap = ((MyAsyncEventListener)theListener).getEventsMap();
        Assert.assertNotNull((Object)eventsMap);
        LogWriterUtils.getLogWriter().info("The events map size is " + eventsMap.size());
        return eventsMap.size();
    }

    public static int getAsyncEventQueueSize(String asyncEventQueueId) {
        AsyncEventQueue theQueue = null;
        Set asyncEventQueues = cache.getAsyncEventQueues();
        for (AsyncEventQueue asyncQueue : asyncEventQueues) {
            if (!asyncEventQueueId.equals(asyncQueue.getId())) continue;
            theQueue = asyncQueue;
        }
        Assert.assertNotNull(theQueue);
        return theQueue.size();
    }

    public static String getRegionFullPath(String regionName) {
        Region r = cache.getRegion("/" + regionName);
        Assert.assertNotNull((Object)r);
        return r.getFullPath();
    }

    public static Set<Integer> getAllPrimaryBucketsOnTheNode(String regionName) {
        PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
        return region.getDataStore().getAllLocalPrimaryBucketIds();
    }

    public static void addCacheListenerAndCloseCache(String regionName) {
        Region region = cache.getRegion("/" + regionName);
        Assert.assertNotNull((Object)region);
        CacheListenerAdapter cl = new CacheListenerAdapter(){

            public void afterCreate(EntryEvent event) {
                if ((Long)event.getKey() == 900L) {
                    cache.getLogger().fine(" Gateway sender is killed by a test");
                    cache.close();
                    cache.getDistributedSystem().disconnect();
                }
            }
        };
        region.getAttributesMutator().addCacheListener((CacheListener)cl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Boolean killSender(String senderId) {
        IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
        IgnoredException exp = IgnoredException.addIgnoredException(CacheClosedException.class.getName());
        IgnoredException exp1 = IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
        try {
            Object object;
            Set senders = cache.getGatewaySenders();
            AbstractGatewaySender sender = null;
            for (GatewaySender s : senders) {
                if (!s.getId().equals(senderId)) continue;
                sender = (AbstractGatewaySender)s;
                break;
            }
            if (sender.isPrimary()) {
                LogWriterUtils.getLogWriter().info("Gateway sender is killed by a test");
                cache.getDistributedSystem().disconnect();
                object = Boolean.TRUE;
                return object;
            }
            object = Boolean.FALSE;
            return object;
        }
        finally {
            exp.remove();
            exp1.remove();
            exln.remove();
        }
    }

    public static Boolean killAsyncEventQueue(String asyncQueueId) {
        Set queues = cache.getAsyncEventQueues();
        AsyncEventQueueImpl queue = null;
        for (AsyncEventQueue q : queues) {
            if (!q.getId().equals(asyncQueueId)) continue;
            queue = (AsyncEventQueueImpl)q;
            break;
        }
        if (queue.isPrimary()) {
            LogWriterUtils.getLogWriter().info("AsyncEventQueue is killed by a test");
            cache.getDistributedSystem().disconnect();
            return Boolean.TRUE;
        }
        return Boolean.FALSE;
    }

    public static void killSender() {
        LogWriterUtils.getLogWriter().info("Gateway sender is going to be killed by a test");
        cache.close();
        cache.getDistributedSystem().disconnect();
        LogWriterUtils.getLogWriter().info("Gateway sender is killed by a test");
    }

    @Override
    public final void postTearDown() throws Exception {
        AsyncEventQueueTestBase.cleanupVM();
        vm0.invoke(() -> AsyncEventQueueTestBase.cleanupVM());
        vm1.invoke(() -> AsyncEventQueueTestBase.cleanupVM());
        vm2.invoke(() -> AsyncEventQueueTestBase.cleanupVM());
        vm3.invoke(() -> AsyncEventQueueTestBase.cleanupVM());
        vm4.invoke(() -> AsyncEventQueueTestBase.cleanupVM());
    }

    public static void cleanupVM() throws IOException {
        AsyncEventQueueTestBase.closeCache();
        JUnit4DistributedTestCase.cleanDiskDirs();
    }

    public static void closeCache() throws IOException {
        if (cache != null && !cache.isClosed()) {
            cache.close();
            cache.getDistributedSystem().disconnect();
            cache = null;
        } else {
            AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
            if (test.isConnectedToDS()) {
                test.getSystem().disconnect();
            }
        }
    }

    public static void shutdownLocator() {
        AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
        test.getSystem().disconnect();
    }

    public static void printEventListenerMap() {
        ((MyGatewaySenderEventListener)eventListener1).printMap();
    }

    @Override
    public Properties getDistributedSystemProperties() {
        Properties props = new Properties();
        props.setProperty("off-heap-memory-size", "300m");
        return props;
    }

    public boolean isOffHeap() {
        return false;
    }

    static {
        destroyFlag = false;
        dispatcherThreads = new ArrayList<Integer>(Arrays.asList(1, 3, 5));
        numDispatcherThreadsForTheRun = 1;
    }

    private static class MyFixedPartitionResolver
    implements FixedPartitionResolver {
        private final List<String> allPartitions;

        public MyFixedPartitionResolver(List<String> allPartitions) {
            this.allPartitions = allPartitions;
        }

        public String getPartitionName(EntryOperation opDetails, @Deprecated Set targetPartitions) {
            int hash = Math.abs(opDetails.getKey().hashCode() % this.allPartitions.size());
            return this.allPartitions.get(hash);
        }

        public Object getRoutingObject(EntryOperation opDetails) {
            return opDetails.getKey();
        }

        public String getName() {
            return this.getClass().getName();
        }

        public void close() {
        }
    }

    public static class MyLocatorCallback
    extends LocatorDiscoveryCallbackAdapter {
        private final Set discoveredLocators = new HashSet();
        private final Set removedLocators = new HashSet();

        public synchronized void locatorsDiscovered(List locators) {
            this.discoveredLocators.addAll(locators);
            ((Object)((Object)this)).notifyAll();
        }

        public synchronized void locatorsRemoved(List locators) {
            this.removedLocators.addAll(locators);
            ((Object)((Object)this)).notifyAll();
        }

        public boolean waitForDiscovery(InetSocketAddress locator, long time) throws InterruptedException {
            return this.waitFor(this.discoveredLocators, locator, time);
        }

        public boolean waitForRemove(InetSocketAddress locator, long time) throws InterruptedException {
            return this.waitFor(this.removedLocators, locator, time);
        }

        private synchronized boolean waitFor(Set set, InetSocketAddress locator, long time) throws InterruptedException {
            long remaining = time;
            long endTime = System.currentTimeMillis() + time;
            while (!set.contains(locator) && remaining >= 0L) {
                ((Object)((Object)this)).wait(remaining);
                remaining = endTime - System.currentTimeMillis();
            }
            return set.contains(locator);
        }

        public synchronized Set getDiscovered() {
            return new HashSet(this.discoveredLocators);
        }

        public synchronized Set getRemoved() {
            return new HashSet(this.removedLocators);
        }
    }
}

