/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.wan;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.GatewayCancelledException;
import org.apache.geode.distributed.internal.DistributionAdvisee;
import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.CachePerfStats;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.GatewayEventFilter;
import org.apache.geode.internal.cache.HasCachePerfStats;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.execute.BucketMovedException;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator;
import org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.offheap.Releasable;
import org.apache.logging.log4j.Logger;

public abstract class AbstractGatewaySender
implements InternalGatewaySender,
DistributionAdvisee {
    private static final Logger logger = LogService.getLogger();
    protected InternalCache cache;
    protected String id;
    protected long startTime;
    protected PoolImpl proxy;
    protected int remoteDSId;
    protected String locName;
    protected int socketBufferSize;
    protected int socketReadTimeout;
    protected int queueMemory;
    protected int maxMemoryPerDispatcherQueue;
    protected int batchSize;
    protected int batchTimeInterval;
    protected boolean isConflation;
    protected boolean isPersistence;
    protected int alertThreshold;
    protected boolean manualStart;
    protected boolean isParallel;
    protected boolean isForInternalUse;
    protected boolean isDiskSynchronous;
    protected String diskStoreName;
    protected List<org.apache.geode.cache.wan.GatewayEventFilter> eventFilters;
    protected List<GatewayTransportFilter> transFilters;
    protected List<AsyncEventListener> listeners;
    protected boolean forwardExpirationDestroy;
    protected GatewayEventSubstitutionFilter substitutionFilter;
    protected LocatorDiscoveryCallback locatorDiscoveryCallback;
    private final ReentrantReadWriteLock lifeCycleLock = new ReentrantReadWriteLock();
    protected GatewaySenderAdvisor senderAdvisor;
    private int serialNumber;
    protected GatewaySenderStats statistics;
    private Stopper stopper;
    private GatewaySender.OrderPolicy policy;
    private int dispatcherThreads;
    protected boolean isBucketSorted;
    protected boolean isMetaQueue;
    private int parallelismForReplicatedRegion;
    protected AbstractGatewaySenderEventProcessor eventProcessor;
    private GatewayEventFilter filter = DefaultGatewayEventFilter.getInstance();
    private ServerLocation serverLocation;
    protected Object queuedEventsSync = new Object();
    protected volatile boolean enqueuedAllTempQueueEvents = false;
    protected volatile ConcurrentLinkedQueue<TmpQueueEvent> tmpQueuedEvents = new ConcurrentLinkedQueue();
    protected volatile ConcurrentLinkedQueue<EntryEventImpl> tmpDroppedEvents = new ConcurrentLinkedQueue();
    @MutableForTesting
    public static int MAXIMUM_SHUTDOWN_WAIT_TIME = Integer.getInteger("GatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME", 0);
    protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer.getInteger("GatewaySender.MAXIMUM_SHUTDOWN_PEEKS", 20);
    public static final int QUEUE_SIZE_THRESHOLD = Integer.getInteger("GatewaySender.QUEUE_SIZE_THRESHOLD", 5000);
    @MutableForTesting
    public static int TOKEN_TIMEOUT = Integer.getInteger("GatewaySender.TOKEN_TIMEOUT", 120000);
    public static final String LOCK_SERVICE_NAME = "gatewayEventIdIndexMetaData_lockService";
    protected static final String META_DATA_REGION_NAME = "gatewayEventIdIndexMetaData";
    protected boolean startEventProcessorInPausedState = false;
    protected int myDSId = -1;
    protected int connectionIdleTimeOut = GATEWAY_CONNECTION_IDLE_TIMEOUT;
    private boolean removeFromQueueOnException = GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION;
    private int eventIdIndex;
    private Region<String, Integer> eventIdIndexMetaDataRegion;
    final Object lockForConcurrentDispatcher = new Object();

    protected AbstractGatewaySender() {
    }

    public AbstractGatewaySender(InternalCache cache, GatewaySenderAttributes attrs) {
        this.cache = cache;
        this.id = attrs.getId();
        this.socketBufferSize = attrs.getSocketBufferSize();
        this.socketReadTimeout = attrs.getSocketReadTimeout();
        this.queueMemory = attrs.getMaximumQueueMemory();
        this.batchSize = attrs.getBatchSize();
        this.batchTimeInterval = attrs.getBatchTimeInterval();
        this.isConflation = attrs.isBatchConflationEnabled();
        this.isPersistence = attrs.isPersistenceEnabled();
        this.alertThreshold = attrs.getAlertThreshold();
        this.manualStart = attrs.isManualStart();
        this.isParallel = attrs.isParallel();
        this.isForInternalUse = attrs.isForInternalUse();
        this.diskStoreName = attrs.getDiskStoreName();
        this.remoteDSId = attrs.getRemoteDSId();
        this.eventFilters = attrs.getGatewayEventFilters();
        this.transFilters = attrs.getGatewayTransportFilters();
        this.listeners = attrs.getAsyncEventListeners();
        this.substitutionFilter = attrs.getGatewayEventSubstitutionFilter();
        this.locatorDiscoveryCallback = attrs.getGatewayLocatoDiscoveryCallback();
        this.isDiskSynchronous = attrs.isDiskSynchronous();
        this.policy = attrs.getOrderPolicy();
        this.dispatcherThreads = attrs.getDispatcherThreads();
        this.parallelismForReplicatedRegion = attrs.getParallelismForReplicatedRegion();
        this.maxMemoryPerDispatcherQueue = this.queueMemory / this.dispatcherThreads;
        this.serialNumber = DistributionAdvisor.createSerialNumber();
        this.isMetaQueue = attrs.isMetaQueue();
        if (!(this.cache instanceof CacheCreation)) {
            this.myDSId = this.cache.getInternalDistributedSystem().getDistributionManager().getDistributedSystemId();
            this.stopper = new Stopper(cache.getCancelCriterion());
            this.senderAdvisor = GatewaySenderAdvisor.createGatewaySenderAdvisor(this);
            if (!this.isForInternalUse()) {
                this.statistics = new GatewaySenderStats(cache.getDistributedSystem(), this.id);
            }
            this.initializeEventIdIndex();
        }
        this.isBucketSorted = attrs.isBucketSorted();
        this.forwardExpirationDestroy = attrs.isForwardExpirationDestroy();
    }

    public GatewaySenderAdvisor getSenderAdvisor() {
        return this.senderAdvisor;
    }

    @Override
    public GatewaySenderStats getStatistics() {
        return this.statistics;
    }

    public void initProxy() {
    }

    @Override
    public boolean isPrimary() {
        return this.getSenderAdvisor().isPrimary();
    }

    public void setIsPrimary(boolean isPrimary) {
        this.getSenderAdvisor().setIsPrimary(isPrimary);
    }

    @Override
    public InternalCache getCache() {
        return this.cache;
    }

    @Override
    public int getAlertThreshold() {
        return this.alertThreshold;
    }

    @Override
    public int getBatchSize() {
        return this.batchSize;
    }

    @Override
    public int getBatchTimeInterval() {
        return this.batchTimeInterval;
    }

    @Override
    public String getDiskStoreName() {
        return this.diskStoreName;
    }

    @Override
    public List<org.apache.geode.cache.wan.GatewayEventFilter> getGatewayEventFilters() {
        return this.eventFilters;
    }

    @Override
    public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() {
        return this.substitutionFilter;
    }

    @Override
    public String getId() {
        return this.id;
    }

    public long getStartTime() {
        return this.startTime;
    }

    @Override
    public int getRemoteDSId() {
        return this.remoteDSId;
    }

    @Override
    public List<GatewayTransportFilter> getGatewayTransportFilters() {
        return this.transFilters;
    }

    public List<AsyncEventListener> getAsyncEventListeners() {
        return this.listeners;
    }

    public boolean hasListeners() {
        return !this.listeners.isEmpty();
    }

    @Override
    public boolean isForwardExpirationDestroy() {
        return this.forwardExpirationDestroy;
    }

    @Override
    public boolean isManualStart() {
        return this.manualStart;
    }

    @Override
    public int getMaximumQueueMemory() {
        return this.queueMemory;
    }

    public int getMaximumMemeoryPerDispatcherQueue() {
        return this.maxMemoryPerDispatcherQueue;
    }

    @Override
    public int getSocketBufferSize() {
        return this.socketBufferSize;
    }

    @Override
    public int getSocketReadTimeout() {
        return this.socketReadTimeout;
    }

    @Override
    public boolean isBatchConflationEnabled() {
        return this.isConflation;
    }

    public void test_setBatchConflationEnabled(boolean enableConflation) {
        this.isConflation = enableConflation;
    }

    @Override
    public boolean isPersistenceEnabled() {
        return this.isPersistence;
    }

    @Override
    public boolean isDiskSynchronous() {
        return this.isDiskSynchronous;
    }

    @Override
    public int getMaxParallelismForReplicatedRegion() {
        return this.parallelismForReplicatedRegion;
    }

    public LocatorDiscoveryCallback getLocatorDiscoveryCallback() {
        return this.locatorDiscoveryCallback;
    }

    @Override
    public DistributionAdvisor getDistributionAdvisor() {
        return this.senderAdvisor;
    }

    @Override
    public DistributionManager getDistributionManager() {
        return this.getSystem().getDistributionManager();
    }

    @Override
    public String getFullPath() {
        return this.getId();
    }

    @Override
    public String getName() {
        return this.getId();
    }

    @Override
    public DistributionAdvisee getParentAdvisee() {
        return null;
    }

    @Override
    public int getDispatcherThreads() {
        return this.dispatcherThreads;
    }

    @Override
    public GatewaySender.OrderPolicy getOrderPolicy() {
        return this.policy;
    }

    @Override
    public DistributionAdvisor.Profile getProfile() {
        return this.senderAdvisor.createProfile();
    }

    @Override
    public int getSerialNumber() {
        return this.serialNumber;
    }

    public boolean getBucketSorted() {
        return this.isBucketSorted;
    }

    @Override
    public boolean getIsMetaQueue() {
        return this.isMetaQueue;
    }

    @Override
    public InternalDistributedSystem getSystem() {
        return this.cache.getInternalDistributedSystem();
    }

    public int getEventIdIndex() {
        return this.eventIdIndex;
    }

    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (this == obj) {
            return true;
        }
        if (!(obj instanceof GatewaySender)) {
            return false;
        }
        AbstractGatewaySender sender = (AbstractGatewaySender)obj;
        return sender.getId().equals(this.getId());
    }

    public int hashCode() {
        return this.getId().hashCode();
    }

    public PoolImpl getProxy() {
        return this.proxy;
    }

    @Override
    public void removeGatewayEventFilter(org.apache.geode.cache.wan.GatewayEventFilter filter) {
        this.eventFilters.remove(filter);
    }

    @Override
    public void addGatewayEventFilter(org.apache.geode.cache.wan.GatewayEventFilter filter) {
        if (this.eventFilters.isEmpty()) {
            this.eventFilters = new ArrayList<org.apache.geode.cache.wan.GatewayEventFilter>();
        }
        if (filter == null) {
            throw new IllegalStateException("null value can not be added to gateway-event-filters list");
        }
        this.eventFilters.add(filter);
    }

    @Override
    public boolean isParallel() {
        return this.isParallel;
    }

    public boolean isForInternalUse() {
        return this.isForInternalUse;
    }

    @Override
    public abstract void start();

    @Override
    public abstract void stop();

    @Override
    public void destroy() {
        this.destroy(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void destroy(boolean initiator) {
        block11: {
            try {
                Set<RegionQueue> regionQueues;
                this.getLifeCycleLock().writeLock().lock();
                Set<InternalRegion> regions = this.cache.getApplicationRegions();
                for (LocalRegion localRegion : regions) {
                    if (!localRegion.getAttributes().getGatewaySenderIds().contains(this.id)) continue;
                    throw new GatewaySenderException(String.format("The GatewaySender %s could not be destroyed as it is still used by region(s).", this));
                }
                GatewaySenderAdvisor gatewaySenderAdvisor = this.getSenderAdvisor();
                if (gatewaySenderAdvisor != null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Stopping the GatewaySender advisor");
                    }
                    gatewaySenderAdvisor.close();
                }
                this.cache.removeGatewaySender(this);
                if (!initiator || (regionQueues = this.getQueues()) == null) break block11;
                for (RegionQueue regionQueue : regionQueues) {
                    try {
                        if (regionQueue instanceof ConcurrentParallelGatewaySenderQueue) {
                            Set<PartitionedRegion> queueRegions = ((ConcurrentParallelGatewaySenderQueue)regionQueue).getRegions();
                            for (PartitionedRegion queueRegion : queueRegions) {
                                queueRegion.destroyRegion();
                            }
                            continue;
                        }
                        regionQueue.getRegion().localDestroyRegion();
                    }
                    catch (RegionDestroyedException e) {
                        logger.info("Region {} that underlies the GatewaySender {} is already destroyed.", (Object)e.getRegionFullPath(), (Object)this);
                    }
                }
            }
            finally {
                this.getLifeCycleLock().writeLock().unlock();
            }
        }
    }

    @Override
    public void rebalance() {
        try {
            this.pause();
            if (this.eventProcessor != null) {
                this.eventProcessor.rebalance();
            }
        }
        finally {
            this.resume();
        }
        logger.info("GatewaySender {} has been rebalanced", (Object)this);
    }

    public boolean beforeEnqueue(GatewayQueueEvent gatewayEvent) {
        boolean enqueue = true;
        for (org.apache.geode.cache.wan.GatewayEventFilter filter : this.getGatewayEventFilters()) {
            enqueue = filter.beforeEnqueue(gatewayEvent);
            if (enqueue) continue;
            return enqueue;
        }
        return enqueue;
    }

    protected void stopProcessing() {
        AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
        if (ev != null && !ev.isStopped()) {
            ev.stopProcessing();
        }
        if (ev != null && ev.getDispatcher() != null) {
            ev.getDispatcher().shutDownAckReaderConnection();
        }
    }

    protected void stompProxyDead() {
        Runnable stomper = new Runnable(){

            @Override
            public void run() {
                PoolImpl bpi = AbstractGatewaySender.this.proxy;
                if (bpi != null) {
                    try {
                        bpi.destroy();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        };
        LoggingThread t = new LoggingThread("GatewaySender Proxy Stomper", stomper);
        t.start();
        try {
            t.join(GATEWAY_SENDER_TIMEOUT * 1000L);
            return;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Gateway <{}> is not closing cleanly; forcing cancellation.", (Object)this);
            t.interrupt();
            this.proxy.emergencyClose();
            this.proxy = null;
            return;
        }
    }

    public int getMyDSId() {
        return this.myDSId;
    }

    public void setRemoveFromQueueOnException(boolean removeFromQueueOnException) {
        this.removeFromQueueOnException = removeFromQueueOnException;
    }

    public boolean isRemoveFromQueueOnException() {
        return this.removeFromQueueOnException;
    }

    public CancelCriterion getStopper() {
        return this.stopper;
    }

    @Override
    public CancelCriterion getCancelCriterion() {
        return this.stopper;
    }

    public synchronized ServerLocation getServerLocation() {
        return this.serverLocation;
    }

    public synchronized boolean setServerLocation(ServerLocation location) {
        this.serverLocation = location;
        return true;
    }

    public RegionQueue getQueue() {
        if (this.eventProcessor != null) {
            if (!(this.eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor)) {
                return this.eventProcessor.getQueue();
            }
            throw new IllegalArgumentException("getQueue() for concurrent serial gateway sender");
        }
        return null;
    }

    @Override
    public Set<RegionQueue> getQueues() {
        if (this.eventProcessor != null) {
            if (!(this.eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor)) {
                HashSet<RegionQueue> queues = new HashSet<RegionQueue>();
                queues.add(this.eventProcessor.getQueue());
                return queues;
            }
            return ((ConcurrentSerialGatewaySenderEventProcessor)this.eventProcessor).getQueues();
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForRunningStatus() {
        Object object = this.eventProcessor.getRunningStateLock();
        synchronized (object) {
            while (this.eventProcessor.getException() == null && this.eventProcessor.isStopped()) {
                try {
                    this.eventProcessor.getRunningStateLock().wait();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            Exception ex = this.eventProcessor.getException();
            if (ex != null) {
                throw new GatewaySenderException(String.format("Could not start a gateway sender %s because of exception %s", this.getId(), ex.getMessage()), ex.getCause());
            }
        }
    }

    public boolean isStartEventProcessorInPausedState() {
        return this.startEventProcessorInPausedState;
    }

    @Override
    public void setStartEventProcessorInPausedState() {
        this.startEventProcessorInPausedState = true;
    }

    public void pauseEvenIfProcessorStopped() {
        if (this.eventProcessor != null) {
            this.getLifeCycleLock().writeLock().lock();
            try {
                this.eventProcessor.pauseDispatching();
                InternalDistributedSystem system = (InternalDistributedSystem)this.cache.getDistributedSystem();
                system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_PAUSE, this);
                logger.info("Paused {}", (Object)this);
                this.enqueueTempEvents();
            }
            finally {
                this.getLifeCycleLock().writeLock().unlock();
            }
        }
    }

    @Override
    public void pause() {
        if (this.eventProcessor != null) {
            this.getLifeCycleLock().writeLock().lock();
            try {
                if (this.eventProcessor.isStopped()) {
                    return;
                }
                this.eventProcessor.pauseDispatching();
                InternalDistributedSystem system = (InternalDistributedSystem)this.cache.getDistributedSystem();
                system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_PAUSE, this);
                logger.info("Paused {}", (Object)this);
                this.enqueueTempEvents();
            }
            finally {
                this.getLifeCycleLock().writeLock().unlock();
            }
        }
    }

    @Override
    public void resume() {
        if (this.eventProcessor != null) {
            this.getLifeCycleLock().writeLock().lock();
            try {
                if (this.eventProcessor.isStopped()) {
                    return;
                }
                this.eventProcessor.resumeDispatching();
                InternalDistributedSystem system = (InternalDistributedSystem)this.cache.getDistributedSystem();
                system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_RESUME, this);
                logger.info("Resumed {}", (Object)this);
                this.enqueueTempEvents();
            }
            finally {
                this.getLifeCycleLock().writeLock().unlock();
            }
        }
    }

    @Override
    public boolean isPaused() {
        if (this.eventProcessor != null) {
            return this.eventProcessor.isPaused();
        }
        return false;
    }

    @Override
    public boolean isRunning() {
        if (this.eventProcessor != null) {
            return !this.eventProcessor.isStopped();
        }
        return false;
    }

    @Override
    public AbstractGatewaySenderEventProcessor getEventProcessor() {
        return this.eventProcessor;
    }

    private boolean checkForDistribution(EntryEventImpl event, GatewaySenderStats stats) {
        if (event.getRegion().getDataPolicy().equals(DataPolicy.NORMAL)) {
            return false;
        }
        if (event.getOperation().isLocal() || event.getOperation().isExpiration()) {
            return event.getOperation().isExpiration() && this.isAsyncEventQueue() && this.isForwardExpirationDestroy();
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    public void distribute(EnumListenerEvent operation, EntryEventImpl event, List<Integer> allRemoteDSIds) {
        boolean isDebugEnabled = logger.isDebugEnabled();
        EntryEventImpl clonedEvent = new EntryEventImpl(event, false);
        boolean freeClonedEvent = true;
        try {
            Object substituteValue;
            Object geCallbackArg;
            GatewaySenderStats stats = this.getStatistics();
            stats.incEventsReceived();
            if (!this.checkForDistribution(event, stats)) {
                stats.incEventsNotQueued();
                return;
            }
            if (!this.filter.enqueueEvent(event)) {
                stats.incEventsFiltered();
                return;
            }
            this.setModifiedEventId(clonedEvent);
            Object callbackArg = clonedEvent.getRawCallbackArgument();
            if (isDebugEnabled) {
                logger.debug("{} : About to notify {} to perform operation {} for {} callback arg {}", (Object)this.isPrimary(), (Object)this.getId(), (Object)operation, (Object)clonedEvent, callbackArg);
            }
            if (callbackArg instanceof GatewaySenderEventCallbackArgument) {
                GatewaySenderEventCallbackArgument seca = (GatewaySenderEventCallbackArgument)callbackArg;
                if (isDebugEnabled) {
                    logger.debug("{}: Event originated in {}. My DS id is {}. The remote DS id is {}. The recipients are: {}", (Object)this, (Object)seca.getOriginatingDSId(), (Object)this.getMyDSId(), (Object)this.getRemoteDSId(), (Object)seca.getRecipientDSIds());
                }
                if (seca.getOriginatingDSId() == -1) {
                    if (isDebugEnabled) {
                        logger.debug("{}: Event originated in {}. My DS id is {}. The remote DS id is {}. The recipients are: {}", (Object)this, (Object)seca.getOriginatingDSId(), (Object)this.getMyDSId(), (Object)this.getRemoteDSId(), (Object)seca.getRecipientDSIds());
                    }
                    seca.setOriginatingDSId(this.getMyDSId());
                    seca.initializeReceipientDSIds(allRemoteDSIds);
                } else {
                    AbstractGatewaySenderEventProcessor ep = this.getEventProcessor();
                    if (ep == null || !(ep.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
                        if (seca.getOriginatingDSId() == this.getRemoteDSId()) {
                            if (!isDebugEnabled) return;
                            logger.debug("{}: Event originated in {}. My DS id is {}. It is being dropped as remote is originator.", (Object)this, (Object)seca.getOriginatingDSId(), (Object)this.getMyDSId());
                            return;
                        }
                        if (seca.getRecipientDSIds().contains(this.getRemoteDSId())) {
                            if (!isDebugEnabled) return;
                            logger.debug("{}: Event originated in {}. My DS id is {}. The remote DS id is {}.. It is being dropped as remote ds is already a recipient. Recipients are: {}", (Object)this, (Object)seca.getOriginatingDSId(), (Object)this.getMyDSId(), (Object)this.getRemoteDSId(), (Object)seca.getRecipientDSIds());
                            return;
                        }
                    }
                    seca.getRecipientDSIds().addAll(allRemoteDSIds);
                }
            } else {
                geCallbackArg = new GatewaySenderEventCallbackArgument(callbackArg, this.getMyDSId(), allRemoteDSIds);
                clonedEvent.setCallbackArgument(geCallbackArg);
            }
            if (!this.isRunning()) {
                if (this.isPrimary()) {
                    this.tmpDroppedEvents.add(clonedEvent);
                    if (isDebugEnabled) {
                        logger.debug("add to tmpDroppedEvents for evnet {}", (Object)clonedEvent);
                    }
                }
                if (!isDebugEnabled) return;
                logger.debug("Returning back without putting into the gateway sender queue:" + event);
                return;
            }
            if (!this.getLifeCycleLock().readLock().tryLock()) {
                geCallbackArg = this.queuedEventsSync;
                // MONITORENTER : geCallbackArg
                if (!this.enqueuedAllTempQueueEvents && !this.getLifeCycleLock().readLock().tryLock()) {
                    substituteValue = this.getSubstituteValue(clonedEvent, operation);
                    this.tmpQueuedEvents.add(new TmpQueueEvent(operation, clonedEvent, substituteValue));
                    freeClonedEvent = false;
                    stats.incTempQueueSize();
                    if (isDebugEnabled) {
                        logger.debug("Event : {} is added to TempQueue", (Object)clonedEvent);
                    }
                    // MONITOREXIT : geCallbackArg
                    return;
                }
                // MONITOREXIT : geCallbackArg
                if (this.enqueuedAllTempQueueEvents) {
                    this.getLifeCycleLock().readLock().lock();
                }
            }
            try {
                if (!this.isRunning()) {
                    if (isDebugEnabled) {
                        logger.debug("Returning back without putting into the gateway sender queue:" + event);
                    }
                    if (this.eventProcessor == null) return;
                    this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
                    return;
                }
                try {
                    AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
                    if (ev == null) {
                        this.getStopper().checkCancelInProgress(null);
                        this.getCache().getDistributedSystem().getCancelCriterion().checkCancelInProgress(null);
                        if (ev == null) {
                            throw new GatewayCancelledException("Event processor thread is gone");
                        }
                    }
                    substituteValue = this.getSubstituteValue(clonedEvent, operation);
                    ev.enqueueEvent(operation, clonedEvent, substituteValue);
                    return;
                }
                catch (CancelException e) {
                    logger.debug("caught cancel exception", (Throwable)e);
                    throw e;
                }
                catch (RegionDestroyedException e) {
                    logger.warn(String.format("%s: An Exception occurred while queueing %s to perform operation %s for %s", this, this.getId(), operation, clonedEvent), (Throwable)e);
                    return;
                }
                catch (Exception e) {
                    logger.fatal(String.format("%s: An Exception occurred while queueing %s to perform operation %s for %s", this, this.getId(), operation, clonedEvent), (Throwable)e);
                    return;
                }
            }
            finally {
                this.getLifeCycleLock().readLock().unlock();
            }
        }
        finally {
            if (freeClonedEvent) {
                clonedEvent.release();
            }
        }
    }

    @VisibleForTesting
    int getTmpDroppedEventSize() {
        return this.tmpDroppedEvents.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void enqueueTempEvents() {
        if (this.eventProcessor != null) {
            EntryEventImpl droppedEvent = null;
            while ((droppedEvent = this.tmpDroppedEvents.poll()) != null) {
                this.eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent);
            }
            TmpQueueEvent nextEvent = null;
            GatewaySenderStats stats = this.getStatistics();
            try {
                Object object = this.queuedEventsSync;
                synchronized (object) {
                    while ((nextEvent = this.tmpQueuedEvents.poll()) != null) {
                        try {
                            if (logger.isDebugEnabled()) {
                                logger.debug("Event :{} is enqueued to GatewaySenderQueue from TempQueue", (Object)nextEvent);
                            }
                            stats.decTempQueueSize();
                            this.eventProcessor.enqueueEvent(nextEvent.getOperation(), nextEvent.getEvent(), nextEvent.getSubstituteValue());
                        }
                        finally {
                            nextEvent.release();
                        }
                    }
                    this.enqueuedAllTempQueueEvents = true;
                }
            }
            catch (CacheException e) {
                logger.debug("caught cancel exception", (Throwable)e);
            }
            catch (IOException e) {
                logger.fatal(String.format("%s: An Exception occurred while queueing %s to perform operation %s for %s", this, this.getId(), nextEvent.getOperation(), nextEvent), (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean removeFromTempQueueEvents(Object tailKey) {
        Object object = this.queuedEventsSync;
        synchronized (object) {
            Iterator<TmpQueueEvent> itr = this.tmpQueuedEvents.iterator();
            while (itr.hasNext()) {
                TmpQueueEvent event = itr.next();
                if (!tailKey.equals(event.getEvent().getTailKey())) continue;
                if (logger.isDebugEnabled()) {
                    logger.debug("shadowKey {} is found in tmpQueueEvents at AbstractGatewaySender level. Removing from there..", tailKey);
                }
                event.release();
                itr.remove();
                return true;
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clearTempEventsAfterSenderStopped() {
        TmpQueueEvent nextEvent = null;
        while ((nextEvent = this.tmpQueuedEvents.poll()) != null) {
            nextEvent.release();
        }
        Object object = this.queuedEventsSync;
        synchronized (object) {
            while ((nextEvent = this.tmpQueuedEvents.poll()) != null) {
                nextEvent.release();
            }
            this.enqueuedAllTempQueueEvents = false;
        }
        this.statistics.setQueueSize(0);
        this.statistics.setSecondaryQueueSize(0);
        this.statistics.setEventsProcessedByPQRM(0);
        this.statistics.setTempQueueSize(0);
    }

    public Object getSubstituteValue(EntryEventImpl clonedEvent, EnumListenerEvent operation) {
        Object substituteValue = null;
        if (this.substitutionFilter != null) {
            try {
                substituteValue = this.substitutionFilter.getSubstituteValue(clonedEvent);
                if (substituteValue == null) {
                    substituteValue = GatewaySenderEventImpl.TOKEN_NULL;
                }
            }
            catch (Exception e) {
                logger.warn(String.format("%s: An Exception occurred while queueing %s to perform operation %s for %s", this, this.getId(), operation, clonedEvent), (Throwable)e);
            }
        }
        return substituteValue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initializeEventIdIndex() {
        boolean isDebugEnabled = logger.isDebugEnabled();
        boolean gotLock = false;
        try {
            gotLock = this.getCache().getGatewaySenderLockService().lock(META_DATA_REGION_NAME, -1L, -1L);
            if (!gotLock) {
                throw new IllegalStateException(String.format("%s: Failed to lock gateway event id index metadata region", this));
            }
            if (isDebugEnabled) {
                logger.debug("{}: Locked the metadata region", (Object)this);
            }
            Region<String, Integer> region = this.getEventIdIndexMetaDataRegion();
            int index = 0;
            String messagePrefix = null;
            if (region.containsKey(this.getId())) {
                index = region.get(this.getId());
                if (isDebugEnabled) {
                    messagePrefix = "Using existing";
                }
            } else {
                index = region.size();
                if ((long)index > ThreadIdentifier.Bits.GATEWAY_ID.mask()) {
                    throw new IllegalStateException(String.format("Cannot create GatewaySender %s because the maximum (%s) has been reached", this.getId(), ThreadIdentifier.Bits.GATEWAY_ID.mask() + 1L));
                }
                region.put(this.getId(), index);
                if (isDebugEnabled) {
                    messagePrefix = "Created new";
                }
            }
            this.eventIdIndex = index;
            if (logger.isDebugEnabled()) {
                logger.debug("{}: {} event id index: {}", (Object)this, (Object)messagePrefix, (Object)this.eventIdIndex);
            }
        }
        finally {
            if (gotLock) {
                this.getCache().getGatewaySenderLockService().unlock(META_DATA_REGION_NAME);
                if (isDebugEnabled) {
                    logger.debug("{}: Unlocked the metadata region", (Object)this);
                }
            }
        }
    }

    private Region<String, Integer> getEventIdIndexMetaDataRegion() {
        if (this.eventIdIndexMetaDataRegion == null) {
            this.eventIdIndexMetaDataRegion = AbstractGatewaySender.initializeEventIdIndexMetaDataRegion(this);
        }
        return this.eventIdIndexMetaDataRegion;
    }

    private static synchronized Region<String, Integer> initializeEventIdIndexMetaDataRegion(AbstractGatewaySender sender) {
        final InternalCache cache = sender.getCache();
        Region<String, Integer> region = cache.getRegion(META_DATA_REGION_NAME);
        if (region == null) {
            AttributesFactory factory = new AttributesFactory();
            factory.setScope(Scope.DISTRIBUTED_ACK);
            factory.setDataPolicy(DataPolicy.REPLICATE);
            RegionAttributes ra = factory.create();
            HasCachePerfStats statsHolder = new HasCachePerfStats(){

                @Override
                public CachePerfStats getCachePerfStats() {
                    return new CachePerfStats(cache.getDistributedSystem(), AbstractGatewaySender.META_DATA_REGION_NAME);
                }
            };
            InternalRegionArguments ira = new InternalRegionArguments().setIsUsedForMetaRegion(true).setCachePerfStatsHolder(statsHolder);
            try {
                region = cache.createVMRegion(META_DATA_REGION_NAME, ra, ira);
            }
            catch (RegionExistsException e) {
                region = cache.getRegion(META_DATA_REGION_NAME);
            }
            catch (Exception e) {
                throw new IllegalStateException(String.format("%s: Caught the following exception attempting to create gateway event id index metadata region:", sender), e);
            }
        }
        return region;
    }

    public abstract void setModifiedEventId(EntryEventImpl var1);

    public int getTmpQueuedEventSize() {
        if (this.tmpQueuedEvents != null) {
            return this.tmpQueuedEvents.size();
        }
        return 0;
    }

    public int getEventQueueSize() {
        AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
        return localProcessor == null ? 0 : localProcessor.eventQueueSize();
    }

    public int getSecondaryEventQueueSize() {
        AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
        return localProcessor == null ? 0 : localProcessor.secondaryEventQueueSize();
    }

    public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
        this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
    }

    protected boolean isAsyncEventQueue() {
        return this.getAsyncEventListeners() != null && !this.getAsyncEventListeners().isEmpty();
    }

    public Object getLockForConcurrentDispatcher() {
        return this.lockForConcurrentDispatcher;
    }

    public ReentrantReadWriteLock getLifeCycleLock() {
        return this.lifeCycleLock;
    }

    @Override
    public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException {
        boolean result = false;
        if (this.isParallel()) {
            try {
                WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = new WaitUntilParallelGatewaySenderFlushedCoordinator(this, timeout, unit, true);
                result = coordinator.waitUntilFlushed();
            }
            catch (CancelException | RegionDestroyedException | BucketMovedException e) {
                logger.warn("Caught the following exception attempting waitUntilFlushed and will retry:", (Throwable)e);
                throw e;
            }
            catch (Throwable t) {
                logger.warn("Caught the following exception attempting waitUntilFlushed and will return:", t);
                throw new InternalGemFireError(t);
            }
            return result;
        }
        throw new UnsupportedOperationException("waitUntilFlushed is not currently supported for serial gateway senders");
    }

    protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
        GatewaySenderEventImpl event = null;
        block0: for (RegionQueue queue : this.getQueues()) {
            Region region = queue.getRegion();
            for (GatewaySenderEventImpl gsei : region.values()) {
                if (!gsei.getKey().equals(key) || gsei.getVersionTimeStamp() != timestamp) continue;
                event = gsei;
                logger.info("{}: Providing synchronization event for key={}; timestamp={}: {}", (Object)this, key, (Object)timestamp, (Object)event);
                this.statistics.incSynchronizationEventsProvided();
                continue block0;
            }
        }
        return event;
    }

    protected void putSynchronizationEvent(GatewayQueueEvent event) {
        if (this.eventProcessor != null) {
            this.lifeCycleLock.readLock().lock();
            try {
                logger.info("{}: Enqueueing synchronization event: {}", (Object)this, (Object)event);
                this.eventProcessor.enqueueEvent(event);
                this.statistics.incSynchronizationEventsEnqueued();
            }
            catch (Throwable t) {
                logger.warn(String.format("%s: Caught the following exception attempting to enqueue synchronization event=%s:", this, event), t);
            }
            finally {
                this.lifeCycleLock.readLock().unlock();
            }
        }
    }

    public static class TmpQueueEvent
    implements Releasable {
        private final EnumListenerEvent operation;
        private final EntryEventImpl event;
        private final Object substituteValue;

        public TmpQueueEvent(EnumListenerEvent op, EntryEventImpl e, Object subValue) {
            this.operation = op;
            this.event = e;
            this.substituteValue = subValue;
        }

        public EnumListenerEvent getOperation() {
            return this.operation;
        }

        public EntryEventImpl getEvent() {
            return this.event;
        }

        public Object getSubstituteValue() {
            return this.substituteValue;
        }

        @Override
        public void release() {
            this.event.release();
        }
    }

    public static class EventWrapper {
        private static final int EVENT_TIMEOUT = Integer.getInteger("Gateway.EVENT_TIMEOUT", 300000);
        public final long timeout;
        public final GatewaySenderEventImpl event;

        public EventWrapper(GatewaySenderEventImpl e) {
            this.event = e;
            this.timeout = System.currentTimeMillis() + (long)EVENT_TIMEOUT;
        }
    }

    public static class DefaultGatewayEventFilter
    implements GatewayEventFilter {
        @Immutable
        private static final DefaultGatewayEventFilter singleton = new DefaultGatewayEventFilter();

        private DefaultGatewayEventFilter() {
        }

        public static GatewayEventFilter getInstance() {
            return singleton;
        }

        @Override
        public boolean enqueueEvent(EntryEventImpl event) {
            return true;
        }
    }

    private class Stopper
    extends CancelCriterion {
        final CancelCriterion stper;

        Stopper(CancelCriterion stopper) {
            this.stper = stopper;
        }

        @Override
        public String cancelInProgress() {
            return this.stper.cancelInProgress();
        }

        @Override
        public RuntimeException generateCancelledException(Throwable e) {
            RuntimeException result = this.stper.generateCancelledException(e);
            return result;
        }
    }
}

