/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.management.internal.cli.functions;

import java.io.IOException;
import java.io.Serializable;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.EntryDestroyedException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.AllConnectionsInUseException;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ExecutablePool;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.DefaultEntryEventFactory;
import org.apache.geode.internal.cache.DestroyedEntry;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EntrySnapshot;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.NonTXEntry;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.BatchException70;
import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.internal.functions.CliFunctionResult;
import org.apache.geode.management.internal.i18n.CliStrings;
import org.apache.logging.log4j.Logger;

public class WanCopyRegionFunctionDelegate
implements Serializable {
    private static final int MAX_BATCH_SEND_RETRIES = 1;
    private static final int WAIT_BEFORE_COPY_MS = 500;
    private int batchId = 0;
    private final Clock clock;
    private final ThreadSleeper threadSleeper;
    private final EventCreator eventCreator;
    private long functionStartTimestamp = 0L;
    private final int waitBeforeCopyMs;
    private static final Logger logger = LogService.getLogger();
    public static final String WAN_COPY_REGION__MSG__NO__CONNECTION__POOL = "No connection pool available to receiver";
    public static final String WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE = "Command not supported at remote site.";
    public static final String WAN_COPY_REGION__MSG__NO__CONNECTION = "No connection available to receiver after having copied {0} entries";
    public static final String WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED = "Error ({0}) in operation after having copied {1} entries";
    public static final String WAN_COPY_REGION__MSG__COPIED__ENTRIES = "Entries copied: {0}";

    WanCopyRegionFunctionDelegate() {
        this(Clock.systemDefaultZone(), new ThreadSleeperImpl(), new EventCreatorImpl(), 500);
    }

    @VisibleForTesting
    WanCopyRegionFunctionDelegate(Clock clock, ThreadSleeper threadSleeper, EventCreator eventCreator, int waitBeforeCopyMs) {
        this.clock = clock;
        this.threadSleeper = threadSleeper;
        this.eventCreator = eventCreator;
        this.waitBeforeCopyMs = waitBeforeCopyMs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CliFunctionResult wanCopyRegion(InternalCache cache, String memberName, Region<?, ?> region, GatewaySender sender, long maxRate, int batchSize) throws InterruptedException {
        this.functionStartTimestamp = ((InternalRegion)region).getCache().cacheTimeMillis();
        Thread.sleep(this.waitBeforeCopyMs);
        ConnectionState connectionState = new ConnectionState();
        int copiedEntries = 0;
        Iterator<?> entriesIter = this.getEntries(region, sender).iterator();
        long startTime = this.clock.millis();
        try {
            while (entriesIter.hasNext()) {
                List<GatewayQueueEvent<?, ?>> batch = this.createBatch((InternalRegion)region, sender, batchSize, cache, entriesIter);
                if (batch.size() == 0) continue;
                Optional<CliFunctionResult> connectionError = connectionState.connectIfNeeded(memberName, sender);
                if (connectionError.isPresent()) {
                    CliFunctionResult cliFunctionResult = connectionError.get();
                    return cliFunctionResult;
                }
                Optional<CliFunctionResult> error = this.sendBatch(memberName, sender, batch, connectionState, copiedEntries);
                if (error.isPresent()) {
                    CliFunctionResult cliFunctionResult = error.get();
                    return cliFunctionResult;
                }
                this.doPostSendBatchActions(startTime, copiedEntries += batch.size(), maxRate);
            }
        }
        finally {
            connectionState.close();
        }
        if (region.isDestroyed()) {
            return new CliFunctionResult(memberName, CliFunctionResult.StatusState.ERROR, CliStrings.format((String)WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED, (Object[])new Object[]{"Region destroyed", copiedEntries}));
        }
        return new CliFunctionResult(memberName, CliFunctionResult.StatusState.OK, CliStrings.format((String)WAN_COPY_REGION__MSG__COPIED__ENTRIES, (Object)copiedEntries));
    }

    private Optional<CliFunctionResult> sendBatch(String memberName, GatewaySender sender, List<GatewayQueueEvent<?, ?>> batch, ConnectionState connectionState, int copiedEntries) {
        GatewaySenderEventDispatcher dispatcher = ((AbstractGatewaySender)sender).getEventProcessor().getDispatcher();
        int retries = 0;
        while (true) {
            try {
                dispatcher.sendBatch(batch, connectionState.getConnection(), (ExecutablePool)connectionState.getSenderPool(), this.getAndIncrementBatchId(), true);
                return Optional.empty();
            }
            catch (BatchException70 e) {
                return Optional.of(new CliFunctionResult(memberName, CliFunctionResult.StatusState.ERROR, CliStrings.format((String)WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED, (Object[])new Object[]{((BatchException70)((Object)e.getExceptions().get(0))).getCause(), copiedEntries})));
            }
            catch (ServerConnectivityException | ConnectionDestroyedException e) {
                Optional<CliFunctionResult> error;
                if (!(error = connectionState.reconnect(memberName, retries++, copiedEntries, (Exception)e)).isPresent()) continue;
                return error;
            }
            break;
        }
    }

    private List<GatewayQueueEvent<?, ?>> createBatch(InternalRegion region, GatewaySender sender, int batchSize, InternalCache cache, Iterator<?> iter) {
        int batchIndex = 0;
        ArrayList batch = new ArrayList();
        while (iter.hasNext() && batchIndex < batchSize) {
            GatewayQueueEvent<?, ?> event = this.eventCreator.createGatewaySenderEvent(cache, region, sender, (Region.Entry)iter.next(), this.functionStartTimestamp);
            if (event == null) continue;
            batch.add(event);
            ++batchIndex;
        }
        return batch;
    }

    private Collection<?> getEntries(Region<?, ?> region, GatewaySender sender) {
        if (region instanceof PartitionedRegion && sender.isParallel()) {
            return PartitionRegionHelper.getLocalPrimaryData(region).entrySet();
        }
        return region.entrySet();
    }

    @VisibleForTesting
    void doPostSendBatchActions(long startTime, int copiedEntries, long maxRate) throws InterruptedException {
        long sleepMs = this.getTimeToSleep(startTime, copiedEntries, maxRate);
        if (sleepMs > 0L) {
            logger.info("{}: Sleeping for {} ms to accommodate to requested maxRate", (Object)this.getClass().getSimpleName(), (Object)sleepMs);
            this.threadSleeper.sleep(sleepMs);
        } else if (Thread.currentThread().isInterrupted()) {
            throw new InterruptedException();
        }
    }

    private int getAndIncrementBatchId() {
        if (this.batchId + 1 == Integer.MAX_VALUE) {
            this.batchId = 0;
        }
        return this.batchId++;
    }

    @VisibleForTesting
    long getTimeToSleep(long startTime, int copiedEntries, long maxRate) {
        if (maxRate == 0L) {
            return 0L;
        }
        long elapsedMs = this.clock.millis() - startTime;
        if (elapsedMs != 0L && (double)copiedEntries * 1000.0 / (double)elapsedMs <= (double)maxRate) {
            return 0L;
        }
        long targetElapsedMs = (long)copiedEntries * 1000L / maxRate;
        return targetElapsedMs - elapsedMs;
    }

    static class ThreadSleeperImpl
    implements ThreadSleeper {
        ThreadSleeperImpl() {
        }

        @Override
        public void sleep(long millis) throws InterruptedException {
            Thread.sleep(millis);
        }
    }

    static class EventCreatorImpl
    implements EventCreator {
        EventCreatorImpl() {
        }

        @Override
        @VisibleForTesting
        public GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache cache, InternalRegion region, GatewaySender sender, Region.Entry<?, ?> entry, long newestTimestampAllowed) {
            EntryEventImpl event = region instanceof PartitionedRegion ? this.createEventForPartitionedRegion(sender, cache, region, entry, newestTimestampAllowed) : this.createEventForReplicatedRegion(cache, region, entry, newestTimestampAllowed);
            if (event == null) {
                return null;
            }
            try {
                return new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS, (CacheEvent)event, null, GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE);
            }
            catch (IOException e) {
                logger.error("Error when creating event in wan-copy: {}", (Object)e.getMessage());
                return null;
            }
        }

        private EntryEventImpl createEventForReplicatedRegion(InternalCache cache, InternalRegion region, Region.Entry<?, ?> entry, long newestTimestampAllowed) {
            return this.createEvent(cache, region, entry, newestTimestampAllowed);
        }

        private EntryEventImpl createEventForPartitionedRegion(GatewaySender sender, InternalCache cache, InternalRegion region, Region.Entry<?, ?> entry, long newestTimestampAllowed) {
            EntryEventImpl event = this.createEvent(cache, region, entry, newestTimestampAllowed);
            if (event == null) {
                return null;
            }
            BucketRegion bucketRegion = ((PartitionedRegion)event.getRegion()).getDataStore().getLocalBucketById(Integer.valueOf(event.getKeyInfo().getBucketId()));
            if (bucketRegion != null) {
                bucketRegion.handleWANEvent(event);
            }
            return event;
        }

        private EntryEventImpl createEvent(InternalCache cache, InternalRegion region, Region.Entry<?, ?> entry, long newestTimestampAllowed) {
            EntryEventImpl event;
            if (entry instanceof DestroyedEntry) {
                return null;
            }
            try {
                if (this.mustDiscardEntry(entry, newestTimestampAllowed, region)) {
                    return null;
                }
                event = new DefaultEntryEventFactory().create(region, Operation.UPDATE, entry.getKey(), entry.getValue(), null, false, (DistributedMember)cache.getInternalDistributedSystem().getDistributedMember(), false);
            }
            catch (EntryDestroyedException e) {
                return null;
            }
            if (region.getAttributes().getConcurrencyChecksEnabled()) {
                if (entry instanceof NonTXEntry) {
                    event.setVersionTag(((NonTXEntry)entry).getRegionEntry().getVersionStamp().asVersionTag());
                } else {
                    event.setVersionTag(((EntrySnapshot)entry).getVersionTag());
                }
            }
            event.setNewEventId((DistributedSystem)cache.getInternalDistributedSystem());
            return event;
        }

        private boolean mustDiscardEntry(Region.Entry<?, ?> entry, long newestTimestampAllowed, InternalRegion region) {
            if (entry instanceof DestroyedEntry) {
                return true;
            }
            if (!region.getAttributes().getConcurrencyChecksEnabled()) {
                return false;
            }
            long timestamp = entry instanceof NonTXEntry ? ((NonTXEntry)entry).getRegionEntry().getVersionStamp().getVersionTimeStamp() : ((EntrySnapshot)entry).getVersionTag().getVersionTimeStamp();
            return timestamp > newestTimestampAllowed;
        }
    }

    @FunctionalInterface
    static interface ThreadSleeper
    extends Serializable {
        public void sleep(long var1) throws InterruptedException;
    }

    @FunctionalInterface
    static interface EventCreator
    extends Serializable {
        public GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache var1, InternalRegion var2, GatewaySender var3, Region.Entry<?, ?> var4, long var5);
    }

    static class ConnectionState {
        private volatile Connection connection = null;
        private volatile PoolImpl senderPool = null;

        ConnectionState() {
        }

        public Connection getConnection() {
            return this.connection;
        }

        public PoolImpl getSenderPool() {
            return this.senderPool;
        }

        public Optional<CliFunctionResult> connectIfNeeded(String memberName, GatewaySender sender) {
            if (this.senderPool == null) {
                this.senderPool = ((AbstractGatewaySender)sender).getProxy();
                if (this.senderPool == null) {
                    return Optional.of(new CliFunctionResult(memberName, CliFunctionResult.StatusState.ERROR, WanCopyRegionFunctionDelegate.WAN_COPY_REGION__MSG__NO__CONNECTION__POOL));
                }
                this.connection = this.senderPool.acquireConnection();
                if (this.connection.getWanSiteVersion() < KnownVersion.GEODE_1_15_0.ordinal()) {
                    return Optional.of(new CliFunctionResult(memberName, CliFunctionResult.StatusState.ERROR, WanCopyRegionFunctionDelegate.WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE));
                }
            }
            return Optional.empty();
        }

        public Optional<CliFunctionResult> reconnect(String memberName, int retries, int copiedEntries, Exception e) {
            this.close();
            if (retries >= 1) {
                return Optional.of(new CliFunctionResult(memberName, CliFunctionResult.StatusState.ERROR, CliStrings.format((String)WanCopyRegionFunctionDelegate.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED, (Object[])new Object[]{"Connection error", copiedEntries})));
            }
            logger.error("Exception {} in sendBatch. Retrying", (Object)e.getClass().getName());
            try {
                this.connection = this.senderPool.acquireConnection();
            }
            catch (AllConnectionsInUseException | NoAvailableServersException e1) {
                return Optional.of(new CliFunctionResult(memberName, CliFunctionResult.StatusState.ERROR, CliStrings.format((String)WanCopyRegionFunctionDelegate.WAN_COPY_REGION__MSG__NO__CONNECTION, (Object)copiedEntries)));
            }
            return Optional.empty();
        }

        public void close() {
            if (this.senderPool != null && this.connection != null) {
                try {
                    this.connection.close(false);
                }
                catch (Exception e) {
                    logger.error("Error closing the connection used to wan-copy region entries");
                }
                this.senderPool.returnConnection(this.connection);
            }
            this.connection = null;
        }
    }
}

