/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.wan;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.logging.LogService;
import org.apache.logging.log4j.Logger;

public class GatewaySenderQueueEntrySynchronizationOperation {
    private InternalDistributedMember recipient;
    private InternalRegion region;
    private List<GatewaySenderQueueEntrySynchronizationEntry> entriesToSynchronize;
    private static final Logger logger = LogService.getLogger();

    protected GatewaySenderQueueEntrySynchronizationOperation(InternalDistributedMember recipient, InternalRegion internalRegion, List<InitialImageOperation.Entry> giiEntriesToSynchronize) {
        this.recipient = recipient;
        this.region = internalRegion;
        this.initializeEntriesToSynchronize(giiEntriesToSynchronize);
    }

    protected void synchronizeEntries() {
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Requesting synchronization from member={}; regionPath={}; entriesToSynchronize={}", (Object)this.getClass().getSimpleName(), (Object)this.recipient, (Object)this.region.getFullPath(), this.entriesToSynchronize);
        }
        DistributionManager dm = this.region.getDistributionManager();
        GatewaySenderQueueEntrySynchronizationReplyProcessor processor = new GatewaySenderQueueEntrySynchronizationReplyProcessor(dm, this.recipient, this);
        GatewaySenderQueueEntrySynchronizationMessage message = new GatewaySenderQueueEntrySynchronizationMessage(this.recipient, processor.getProcessorId(), this);
        dm.putOutgoing(message);
        try {
            processor.waitForReplies();
        }
        catch (ReplyException e) {
            e.handleCause();
        }
        catch (InterruptedException e) {
            dm.getCancelCriterion().checkCancelInProgress(e);
            Thread.currentThread().interrupt();
        }
    }

    protected GemFireCacheImpl getCache() {
        return (GemFireCacheImpl)CacheFactory.getAnyInstance();
    }

    private void initializeEntriesToSynchronize(List<InitialImageOperation.Entry> giiEntriesToSynchronize) {
        this.entriesToSynchronize = new ArrayList<GatewaySenderQueueEntrySynchronizationEntry>();
        for (InitialImageOperation.Entry entry : giiEntriesToSynchronize) {
            this.entriesToSynchronize.add(new GatewaySenderQueueEntrySynchronizationEntry(entry.getKey(), entry.getVersionTag()));
        }
    }

    public static class GatewaySenderQueueEntrySynchronizationEntry
    implements DataSerializableFixedID {
        private Object key;
        private VersionTag entryVersion;

        public GatewaySenderQueueEntrySynchronizationEntry() {
        }

        public GatewaySenderQueueEntrySynchronizationEntry(Object key, VersionTag entryVersion) {
            this.key = key;
            this.entryVersion = entryVersion;
        }

        @Override
        public int getDSFID() {
            return 2182;
        }

        @Override
        public Version[] getSerializationVersions() {
            return null;
        }

        @Override
        public void toData(DataOutput out) throws IOException {
            DataSerializer.writeObject(this.key, out);
            DataSerializer.writeObject(this.entryVersion, out);
        }

        @Override
        public void fromData(DataInput in) throws IOException, ClassNotFoundException {
            this.key = DataSerializer.readObject(in);
            this.entryVersion = (VersionTag)DataSerializer.readObject(in);
        }

        public String toString() {
            return this.getClass().getSimpleName() + "[" + "key=" + this.key + "; entryVersion=" + this.entryVersion + "]";
        }
    }

    public static class GatewaySenderQueueEntrySynchronizationMessage
    extends PooledDistributionMessage
    implements MessageWithReply {
        private int processorId;
        private String regionPath;
        private List<GatewaySenderQueueEntrySynchronizationEntry> entriesToSynchronize;

        public GatewaySenderQueueEntrySynchronizationMessage() {
        }

        protected GatewaySenderQueueEntrySynchronizationMessage(InternalDistributedMember recipient, int processorId, GatewaySenderQueueEntrySynchronizationOperation operation) {
            this.setRecipient(recipient);
            this.processorId = processorId;
            this.regionPath = operation.region.getFullPath();
            this.entriesToSynchronize = operation.entriesToSynchronize;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void process(ClusterDistributionManager dm) {
            Object result = null;
            ReplyException replyException = null;
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: Providing synchronization region={}; entriesToSynchronize={}", (Object)this.getClass().getSimpleName(), (Object)this.regionPath, this.entriesToSynchronize);
                }
                result = this.getSynchronizationEvents();
            }
            catch (Throwable t) {
                replyException = new ReplyException(t);
            }
            finally {
                ReplyMessage replyMsg = new ReplyMessage();
                replyMsg.setRecipient(this.getSender());
                replyMsg.setProcessorId(this.processorId);
                if (replyException == null) {
                    replyMsg.setReturnValue(result);
                } else {
                    replyMsg.setException(replyException);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: Sending synchronization reply returnValue={}; exception={}", (Object)this.getClass().getSimpleName(), replyMsg.getReturnValue(), (Object)replyMsg.getException());
                }
                dm.putOutgoing(replyMsg);
            }
        }

        private Object getSynchronizationEvents() {
            ArrayList results = new ArrayList();
            GemFireCacheImpl gfci = (GemFireCacheImpl)this.getCache();
            LocalRegion region = (LocalRegion)gfci.getRegion(this.regionPath);
            Set<String> allGatewaySenderIds = region.getAllGatewaySenderIds();
            for (GatewaySender sender : gfci.getAllGatewaySenders()) {
                if (!allGatewaySenderIds.contains(sender.getId())) continue;
                for (GatewaySenderQueueEntrySynchronizationEntry entry : this.entriesToSynchronize) {
                    HashMap<String, GatewayQueueEvent> resultForOneEntry = new HashMap<String, GatewayQueueEvent>();
                    GatewayQueueEvent event = ((AbstractGatewaySender)sender).getSynchronizationEvent(entry.key, entry.entryVersion.getVersionTimeStamp());
                    if (event != null) {
                        resultForOneEntry.put(sender.getId(), event);
                    }
                    results.add(resultForOneEntry);
                }
            }
            return results;
        }

        private Cache getCache() {
            return CacheFactory.getAnyInstance();
        }

        @Override
        public int getDSFID() {
            return 2181;
        }

        @Override
        public void toData(DataOutput out) throws IOException {
            super.toData(out);
            out.writeInt(this.processorId);
            DataSerializer.writeString(this.regionPath, out);
            DataSerializer.writeArrayList((ArrayList)this.entriesToSynchronize, out);
        }

        @Override
        public void fromData(DataInput in) throws IOException, ClassNotFoundException {
            super.fromData(in);
            this.processorId = in.readInt();
            this.regionPath = DataSerializer.readString(in);
            this.entriesToSynchronize = DataSerializer.readArrayList(in);
        }
    }

    public static class GatewaySenderQueueEntrySynchronizationReplyProcessor
    extends ReplyProcessor21 {
        private GatewaySenderQueueEntrySynchronizationOperation operation;

        public GatewaySenderQueueEntrySynchronizationReplyProcessor(DistributionManager dm, InternalDistributedMember recipient, GatewaySenderQueueEntrySynchronizationOperation operation) {
            super(dm, recipient);
            this.operation = operation;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void process(DistributionMessage msg) {
            try {
                ReplyMessage reply;
                if (msg instanceof ReplyMessage && (reply = (ReplyMessage)msg).getException() == null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("{}: Processing reply from member={}; regionPath={}; key={}; entriesToSynchronize={}", (Object)this.getClass().getSimpleName(), (Object)reply.getSender(), (Object)this.operation.region.getFullPath(), (Object)this.operation.entriesToSynchronize, reply.getReturnValue());
                    }
                    List events = (List)reply.getReturnValue();
                    for (int i = 0; i < events.size(); ++i) {
                        Map eventsForOneEntry = (Map)events.get(i);
                        if (events.isEmpty()) {
                            GatewaySenderQueueEntrySynchronizationEntry entry = (GatewaySenderQueueEntrySynchronizationEntry)this.operation.entriesToSynchronize.get(i);
                            logger.info("Synchronization event reply from member={}; regionPath={}; key={}; entryVersion={} is empty", new Object[]{reply.getSender(), this.operation.region.getFullPath(), entry.key, entry.entryVersion});
                            continue;
                        }
                        this.putSynchronizationEvents(eventsForOneEntry);
                    }
                }
            }
            finally {
                super.process(msg);
            }
        }

        private void putSynchronizationEvents(Map<String, GatewayQueueEvent> senderIdsAndEvents) {
            for (Map.Entry<String, GatewayQueueEvent> senderIdAndEvent : senderIdsAndEvents.entrySet()) {
                AbstractGatewaySender sender = (AbstractGatewaySender)this.getCache().getGatewaySender(senderIdAndEvent.getKey());
                sender.putSynchronizationEvent(senderIdAndEvent.getValue());
            }
        }

        private Cache getCache() {
            return CacheFactory.getAnyInstance();
        }
    }
}

