/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.wan.parallel;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.logging.log4j.Logger;

public class ParallelGatewaySenderEventProcessor
extends AbstractGatewaySenderEventProcessor {
    private static final Logger logger = LogService.getLogger();
    final int index;
    final int nDispatcher;

    protected ParallelGatewaySenderEventProcessor(AbstractGatewaySender sender, ThreadsMonitoring tMonitoring) {
        super("Event Processor for GatewaySender_" + sender.getId(), sender, tMonitoring);
        this.index = 0;
        this.nDispatcher = 1;
        this.initializeMessageQueue(sender.getId());
    }

    protected ParallelGatewaySenderEventProcessor(AbstractGatewaySender sender, Set<Region> userRegions, int id, int nDispatcher, ThreadsMonitoring tMonitoring) {
        super("Event Processor for GatewaySender_" + sender.getId() + "_" + id, sender, tMonitoring);
        this.index = id;
        this.nDispatcher = nDispatcher;
        this.initializeMessageQueue(sender.getId());
    }

    @Override
    protected void initializeMessageQueue(String id) {
        HashSet<Region> targetRs = new HashSet<Region>();
        for (InternalRegion region : this.sender.getCache().getApplicationRegions()) {
            if (!region.getAllGatewaySenderIds().contains(id)) continue;
            targetRs.add(region);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("The target Regions are(PGSEP) {}", targetRs);
        }
        ParallelGatewaySenderQueue queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
        queue.start();
        this.queue = queue;
        if (queue.localSize() > 0) {
            queue.notifyEventProcessorIfRequired();
        }
    }

    @Override
    public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue) throws IOException, CacheException {
        GatewaySenderEventImpl gatewayQueueEvent = null;
        Region region = event.getRegion();
        if (!(region instanceof DistributedRegion) && ((EntryEventImpl)event).getTailKey() == -1L) {
            if (logger.isDebugEnabled()) {
                logger.debug("ParallelGatewaySenderEventProcessor not enqueing the following event since tailKey is not set. {}", (Object)event);
            }
            return;
        }
        EventID eventID = ((EntryEventImpl)event).getEventId();
        gatewayQueueEvent = new GatewaySenderEventImpl(operation, event, substituteValue, true, eventID.getBucketID());
        this.enqueueEvent(gatewayQueueEvent);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void enqueueEvent(GatewayQueueEvent gatewayQueueEvent) {
        block8: {
            boolean queuedEvent = false;
            try {
                if (this.getSender().beforeEnqueue(gatewayQueueEvent)) {
                    long start = this.getSender().getStatistics().startTime();
                    try {
                        queuedEvent = this.queue.put(gatewayQueueEvent);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    this.getSender().getStatistics().endPut(start);
                    break block8;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("The Event {} is filtered.", (Object)gatewayQueueEvent);
                }
                this.getSender().getStatistics().incEventsFiltered();
            }
            finally {
                if (!queuedEvent) {
                    ((GatewaySenderEventImpl)gatewayQueueEvent).release();
                }
            }
        }
    }

    @Override
    protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
        logger.info("ParallelGatewaySenderEventProcessor should not process dropped event {}", (Object)droppedEvent);
    }

    @Override
    public void clear(PartitionedRegion pr, int bucketId) {
        ((ParallelGatewaySenderQueue)this.queue).clear(pr, bucketId);
    }

    @Override
    public void notifyEventProcessorIfRequired(int bucketId) {
        ((ParallelGatewaySenderQueue)this.queue).notifyEventProcessorIfRequired();
    }

    @Override
    public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) {
        return ((ParallelGatewaySenderQueue)this.queue).getBucketToTempQueueMap().get(bucketId);
    }

    @Override
    public PartitionedRegion getRegion(String prRegionName) {
        return ((ParallelGatewaySenderQueue)this.queue).getRegion(prRegionName);
    }

    @Override
    public void removeShadowPR(String prRegionName) {
        ((ParallelGatewaySenderQueue)this.queue).removeShadowPR(prRegionName);
    }

    @Override
    public void conflateEvent(Conflatable conflatableObject, int bucketId, Long tailKey) {
        ((ParallelGatewaySenderQueue)this.queue).conflateEvent(conflatableObject, bucketId, tailKey);
    }

    @Override
    public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) {
        ((ParallelGatewaySenderQueue)this.queue).addShadowPartitionedRegionForUserPR(pr);
    }

    @Override
    public void addShadowPartitionedRegionForUserRR(DistributedRegion userRegion) {
        ((ParallelGatewaySenderQueue)this.queue).addShadowPartitionedRegionForUserRR(userRegion);
    }

    @Override
    protected void rebalance() {
    }

    @Override
    public void initializeEventDispatcher() {
        if (logger.isDebugEnabled()) {
            logger.debug(" Creating the GatewayEventCallbackDispatcher");
        }
        this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);
    }
}

