/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.wan.parallel;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;

public class ParallelQueueRemovalMessage
extends PooledDistributionMessage {
    private static final Logger logger = LogService.getLogger();
    private HashMap regionToDispatchedKeysMap;

    public ParallelQueueRemovalMessage() {
    }

    public ParallelQueueRemovalMessage(HashMap rgnToDispatchedKeysMap) {
        this.regionToDispatchedKeysMap = rgnToDispatchedKeysMap;
    }

    @Override
    public int getDSFID() {
        return 2161;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void process(DistributionManager dm) {
        boolean isDebugEnabled = logger.isDebugEnabled();
        GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
        if (cache != null) {
            int oldLevel = LocalRegion.setThreadInitLevelRequirement(1);
            try {
                for (Object name : this.regionToDispatchedKeysMap.keySet()) {
                    String regionName = (String)name;
                    PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
                    if (region == null) continue;
                    AbstractGatewaySender abstractSender = region.getParallelGatewaySender();
                    Map bucketIdToDispatchedKeys = (Map)this.regionToDispatchedKeysMap.get(regionName);
                    for (Object bId : bucketIdToDispatchedKeys.keySet()) {
                        List dispatchedKeys;
                        String bucketFullPath = "/__PR/" + region.getBucketName((Integer)bId);
                        AbstractBucketRegionQueue brq = (AbstractBucketRegionQueue)cache.getRegionByPath(bucketFullPath);
                        if (isDebugEnabled) {
                            logger.debug("ParallelQueueRemovalMessage : The bucket in the cache is bucketRegionName : {} bucket: {}", (Object)bucketFullPath, (Object)brq);
                        }
                        if ((dispatchedKeys = (List)bucketIdToDispatchedKeys.get((Integer)bId)) == null) continue;
                        for (Object key : dispatchedKeys) {
                            abstractSender.removeFromTempQueueEvents(key);
                            if (brq != null) {
                                if (brq.isInitialized()) {
                                    if (isDebugEnabled) {
                                        logger.debug("ParallelQueueRemovalMessage : The bucket {} is initialized. Destroying the key {} from BucketRegionQueue.", (Object)bucketFullPath, key);
                                    }
                                    this.afterAckForSecondary_EventInBucket(abstractSender, brq, key);
                                    this.destroyKeyFromBucketQueue(brq, key, region);
                                    continue;
                                }
                                boolean isDestroyed = false;
                                if (isDebugEnabled) {
                                    logger.debug("ParallelQueueRemovalMessage : The bucket {} is not yet initialized.", (Object)bucketFullPath);
                                }
                                brq.getInitializationLock().readLock().lock();
                                try {
                                    if (brq.containsKey(key)) {
                                        this.afterAckForSecondary_EventInBucket(abstractSender, brq, key);
                                        this.destroyKeyFromBucketQueue(brq, key, region);
                                        isDestroyed = true;
                                    }
                                    this.destroyFromTempQueue(brq.getPartitionedRegion(), (Integer)bId, key);
                                    brq.addToFailedBatchRemovalMessageKeys(key);
                                    continue;
                                }
                                finally {
                                    brq.getInitializationLock().readLock().unlock();
                                    continue;
                                }
                            }
                            this.destroyFromTempQueue(region, (Integer)bId, key);
                        }
                    }
                }
            }
            finally {
                LocalRegion.setThreadInitLevelRequirement(oldLevel);
            }
        }
    }

    private void afterAckForSecondary_EventInBucket(AbstractGatewaySender abstractSender, AbstractBucketRegionQueue brq, Object key) {
        for (GatewayEventFilter filter : abstractSender.getGatewayEventFilters()) {
            GatewayQueueEvent eventForFilter = (GatewayQueueEvent)brq.get(key);
            try {
                if (eventForFilter == null) continue;
                filter.afterAcknowledgement(eventForFilter);
            }
            catch (Exception e) {
                logger.fatal((Message)LocalizedMessage.create(LocalizedStrings.GatewayEventFilter_EXCEPTION_OCCURRED_WHILE_HANDLING_CALL_TO_0_AFTER_ACKNOWLEDGEMENT_FOR_EVENT_1, new Object[]{filter.toString(), eventForFilter}), (Throwable)e);
            }
        }
    }

    private void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key, PartitionedRegion prQ) {
        boolean isDebugEnabled = logger.isDebugEnabled();
        try {
            brq.destroyKey(key);
            if (isDebugEnabled) {
                logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, (Object)prQ.getName(), (Object)brq.getId());
            }
        }
        catch (EntryNotFoundException e) {
            if (isDebugEnabled) {
                logger.debug("Got EntryNotFoundException while destroying the key {} for bucket {}", key, (Object)brq.getId());
            }
            brq.addToFailedBatchRemovalMessageKeys(key);
        }
        catch (ForceReattemptException fe) {
            if (isDebugEnabled) {
                logger.debug("Got ForceReattemptException while getting bucket {} to destroyLocally the keys.", (Object)brq.getId());
            }
        }
        catch (CancelException e) {
            return;
        }
        catch (CacheException e) {
            logger.error((Message)LocalizedMessage.create(LocalizedStrings.ParallelQueueRemovalMessage_QUEUEREMOVALMESSAGEPROCESSEXCEPTION_IN_PROCESSING_THE_LAST_DISPTACHED_KEY_FOR_A_SHADOWPR_THE_PROBLEM_IS_WITH_KEY__0_FOR_SHADOWPR_WITH_NAME_1, new Object[]{key, prQ.getName()}), (Throwable)e);
        }
    }

    private boolean destroyFromTempQueue(PartitionedRegion qPR, int bId, Object key) {
        ConcurrentParallelGatewaySenderQueue prq;
        BlockingQueue<GatewaySenderEventImpl> tempQueue;
        boolean isDestroyed = false;
        Set<RegionQueue> queues = qPR.getParallelGatewaySender().getQueues();
        if (queues != null && (tempQueue = (prq = (ConcurrentParallelGatewaySenderQueue)queues.toArray()[0]).getBucketTmpQueue(bId)) != null) {
            Iterator itr = tempQueue.iterator();
            while (itr.hasNext()) {
                GatewaySenderEventImpl eventForFilter = (GatewaySenderEventImpl)itr.next();
                this.afterAckForSecondary_EventInTempQueue(qPR.getParallelGatewaySender(), eventForFilter);
                if (!eventForFilter.getShadowKey().equals(key)) continue;
                itr.remove();
                eventForFilter.release();
                isDestroyed = true;
            }
        }
        return isDestroyed;
    }

    private void afterAckForSecondary_EventInTempQueue(AbstractGatewaySender parallelGatewaySenderImpl, GatewaySenderEventImpl eventForFilter) {
        for (GatewayEventFilter filter : parallelGatewaySenderImpl.getGatewayEventFilters()) {
            try {
                if (eventForFilter == null) continue;
                filter.afterAcknowledgement(eventForFilter);
            }
            catch (Exception e) {
                logger.fatal((Message)LocalizedMessage.create(LocalizedStrings.GatewayEventFilter_EXCEPTION_OCCURRED_WHILE_HANDLING_CALL_TO_0_AFTER_ACKNOWLEDGEMENT_FOR_EVENT_1, new Object[]{filter.toString(), eventForFilter}), (Throwable)e);
            }
        }
    }

    @Override
    public void toData(DataOutput out) throws IOException {
        super.toData(out);
        DataSerializer.writeHashMap(this.regionToDispatchedKeysMap, out);
    }

    @Override
    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
        super.fromData(in);
        this.regionToDispatchedKeysMap = DataSerializer.readHashMap(in);
    }
}

