/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.xsite.irac;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.RequestUUID;
import org.infinispan.commands.irac.IracCleanupKeysCommand;
import org.infinispan.commands.irac.IracRequestStateCommand;
import org.infinispan.commands.irac.IracStateResponseCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.XSiteStateTransferConfiguration;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.versioning.irac.IracTombstoneManager;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.jmx.JmxStatisticsExposer;
import org.infinispan.jmx.annotations.DataType;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.jmx.annotations.MeasurementType;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.XSiteResponse;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.ExponentialBackOff;
import org.infinispan.util.ExponentialBackOffImpl;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.commands.remote.IracClearKeysRequest;
import org.infinispan.xsite.commands.remote.IracPutManyRequest;
import org.infinispan.xsite.commands.remote.IracTouchKeyRequest;
import org.infinispan.xsite.commands.remote.XSiteCacheRequest;
import org.infinispan.xsite.irac.IracBatchSendResult;
import org.infinispan.xsite.irac.IracClearResponseCollector;
import org.infinispan.xsite.irac.IracExecutor;
import org.infinispan.xsite.irac.IracManager;
import org.infinispan.xsite.irac.IracManagerKeyChangedState;
import org.infinispan.xsite.irac.IracManagerKeyInfo;
import org.infinispan.xsite.irac.IracManagerKeyState;
import org.infinispan.xsite.irac.IracManagerStateTransferState;
import org.infinispan.xsite.irac.IracResponseCollector;
import org.infinispan.xsite.irac.IracXSiteBackup;
import org.infinispan.xsite.statetransfer.XSiteState;
import org.infinispan.xsite.status.SiteState;
import org.infinispan.xsite.status.TakeOfflineManager;

@MBean(objectName="AsyncXSiteStatistics", description="Statistics for Asynchronous cross-site replication")
@Scope(value=Scopes.NAMED_CACHE)
public class DefaultIracManager
implements IracManager,
JmxStatisticsExposer {
    private static final Log log = LogFactory.getLog(DefaultIracManager.class);
    private static final Predicate<Map.Entry<Object, IracManagerKeyState>> CLEAR_PREDICATE = e -> {
        ((IracManagerKeyState)e.getValue()).discard();
        return true;
    };
    @Inject
    RpcManager rpcManager;
    @Inject
    TakeOfflineManager takeOfflineManager;
    @Inject
    ClusteringDependentLogic clusteringDependentLogic;
    @Inject
    CommandsFactory commandsFactory;
    @Inject
    IracTombstoneManager iracTombstoneManager;
    private final Map<Object, IracManagerKeyState> updatedKeys;
    private final Collection<IracXSiteBackup> asyncBackups;
    private final IracExecutor iracExecutor;
    private final int batchSize;
    private volatile boolean hasClear;
    private boolean statisticsEnabled;
    private final LongAdder discardCounts = new LongAdder();
    private final LongAdder conflictLocalWinsCount = new LongAdder();
    private final LongAdder conflictRemoteWinsCount = new LongAdder();
    private final LongAdder conflictMergedCount = new LongAdder();

    public DefaultIracManager(Configuration config, Collection<IracXSiteBackup> backups) {
        this.updatedKeys = new ConcurrentHashMap<Object, IracManagerKeyState>(64);
        this.iracExecutor = new IracExecutor(this::run);
        this.asyncBackups = backups;
        this.statisticsEnabled = config.statistics().enabled();
        this.batchSize = config.sites().asyncBackupsStream().map(BackupConfiguration::stateTransfer).mapToInt(XSiteStateTransferConfiguration::chunkSize).reduce(1, Integer::max);
    }

    @Inject
    public void inject(@ComponentName(value="org.infinispan.executors.timeout") ScheduledExecutorService executorService, @ComponentName(value="org.infinispan.executors.blocking") Executor blockingExecutor) {
        this.iracExecutor.setExecutor(blockingExecutor);
        this.setBackOff(backup -> new ExponentialBackOffImpl(executorService));
    }

    @Start
    public void start() {
        this.hasClear = false;
    }

    @Override
    public void trackUpdatedKey(int segment, Object key, RequestUUID owner) {
        this.trackState(new IracManagerKeyChangedState(segment, key, owner, false, this.asyncBackups.size()));
    }

    @Override
    public void trackExpiredKey(int segment, Object key, RequestUUID owner) {
        this.trackState(new IracManagerKeyChangedState(segment, key, owner, true, this.asyncBackups.size()));
    }

    @Override
    public CompletionStage<Void> trackForStateTransfer(Collection<XSiteState> stateList) {
        if (log.isTraceEnabled()) {
            log.tracef("[IRAC] Tracking state for state transfer: %s", Util.toStr(stateList));
        }
        AggregateCompletionStage cf = CompletionStages.aggregateCompletionStage();
        LocalizedCacheTopology topology = this.clusteringDependentLogic.getCacheTopology();
        for (XSiteState state : stateList) {
            int segment = topology.getSegment(state.key());
            IracManagerStateTransferState iracState = new IracManagerStateTransferState(segment, state.key(), this.asyncBackups.size());
            this.updatedKeys.put(iracState.getKey(), iracState);
            cf.dependsOn(iracState.getCompletionStage());
        }
        this.iracExecutor.run();
        return cf.freeze();
    }

    @Override
    public void trackClear(boolean sendClear) {
        if (log.isTraceEnabled()) {
            log.tracef("Tracking clear request. Replicate to backup sites? %s", sendClear);
        }
        this.hasClear = sendClear;
        this.updatedKeys.entrySet().removeIf(CLEAR_PREDICATE);
        if (sendClear) {
            this.iracExecutor.run();
        }
    }

    @Override
    public void removeState(IracManagerKeyInfo state) {
        this.removeStateFromLocal(state);
    }

    @Override
    public void onTopologyUpdate(CacheTopology oldCacheTopology, CacheTopology newCacheTopology) {
        if (log.isTraceEnabled()) {
            log.trace("[IRAC] Topology Updated. Checking pending keys.");
        }
        Address local = this.rpcManager.getAddress();
        if (!newCacheTopology.getMembers().contains(local)) {
            return;
        }
        IntSet addedSegments = IntSets.mutableCopyFrom(newCacheTopology.getWriteConsistentHash().getSegmentsForOwner(local));
        if (oldCacheTopology.getMembers().contains(local)) {
            addedSegments.removeAll(oldCacheTopology.getWriteConsistentHash().getSegmentsForOwner(local));
        }
        if (!addedSegments.isEmpty()) {
            HashMap<Address, IntSet> primarySegments = new HashMap<Address, IntSet>(newCacheTopology.getMembers().size());
            int numOfSegments = this.clusteringDependentLogic.getCacheTopology().getNumSegments();
            Function<Address, IntSet> intSetConstructor = address -> IntSets.mutableEmptySet((int)numOfSegments);
            PrimitiveIterator.OfInt it = addedSegments.iterator();
            while (it.hasNext()) {
                int segment = it.nextInt();
                Address primary = newCacheTopology.getWriteConsistentHash().locatePrimaryOwnerForSegment(segment);
                primarySegments.computeIfAbsent(primary, intSetConstructor).set(segment);
            }
            primarySegments.forEach(this::sendStateRequest);
        }
        if (!this.updatedKeys.isEmpty()) {
            this.iracExecutor.run();
        }
    }

    @Override
    public void requestState(Address requestor, IntSet segments) {
        this.transferStateTo(requestor, segments, this.updatedKeys.values());
        this.iracTombstoneManager.sendStateTo(requestor, segments);
    }

    @Override
    public void receiveState(int segment, Object key, RequestUUID owner, IracMetadata tombstone) {
        this.iracTombstoneManager.storeTombstoneIfAbsent(segment, key, tombstone);
        this.updatedKeys.putIfAbsent(key, new IracManagerKeyChangedState(segment, key, owner, false, this.asyncBackups.size()));
        this.iracExecutor.run();
    }

    @Override
    public CompletionStage<Boolean> checkAndTrackExpiration(Object key) {
        if (log.isTraceEnabled()) {
            log.tracef("Checking remote backup sites to see if key %s has been touched recently", key);
        }
        IracTouchKeyRequest command = this.commandsFactory.buildIracTouchCommand(key);
        AtomicBoolean expired = new AtomicBoolean(true);
        AggregateCompletionStage collector = CompletionStages.aggregateCompletionStage((Object)expired);
        for (XSiteBackup xSiteBackup : this.asyncBackups) {
            if (this.takeOfflineManager.getSiteState(xSiteBackup.getSiteName()) == SiteState.OFFLINE) {
                if (!log.isTraceEnabled()) continue;
                log.tracef("Skipping %s as it is offline", xSiteBackup.getSiteName());
                continue;
            }
            if (log.isTraceEnabled()) {
                log.tracef("Sending irac touch key command to %s", xSiteBackup);
            }
            XSiteResponse<Boolean> response = this.sendToRemoteSite(xSiteBackup, command);
            collector.dependsOn(response.thenAccept(touched -> {
                if (touched.booleanValue()) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Key %s was recently touched on a remote site %s", key, backup);
                    }
                    expired.set(false);
                } else if (log.isTraceEnabled()) {
                    log.tracef("Entry %s was expired on remote site %s", key, backup);
                }
            }));
        }
        return collector.freeze().thenApply(AtomicBoolean::get);
    }

    void transferStateTo(Address dst, IntSet segments, Collection<? extends IracManagerKeyState> stateCollection) {
        if (log.isTraceEnabled()) {
            log.tracef("Starting state transfer to %s. Segments=%s, %s keys to check", dst, segments, stateCollection.size());
        }
        Flowable.fromIterable(stateCollection).filter(s -> !s.isStateTransfer() && !s.isExpiration() && segments.contains(s.getSegment())).buffer(this.batchSize).concatMapCompletableDelayError(batch -> this.createAndSendBatch(dst, (Collection<? extends IracManagerKeyState>)batch)).subscribe(() -> {
            if (log.isTraceEnabled()) {
                log.tracef("State transfer to %s finished!", dst);
            }
        }, throwable -> {
            if (log.isTraceEnabled()) {
                log.tracef((Throwable)throwable, "State transfer to %s failed!", dst);
            }
        });
    }

    private Completable createAndSendBatch(Address dst, Collection<? extends IracManagerKeyState> batch) {
        if (log.isTraceEnabled()) {
            log.tracef("Sending state response to %s. Batch=%s", dst, Util.toStr(batch));
        }
        RpcOptions rpcOptions = this.rpcManager.getSyncRpcOptions();
        VoidResponseCollector rspCollector = VoidResponseCollector.ignoreLeavers();
        IracStateResponseCommand cmd = this.commandsFactory.buildIracStateResponseCommand(batch.size());
        for (IracManagerKeyState iracManagerKeyState : batch) {
            IracMetadata tombstone = this.iracTombstoneManager.getTombstone(iracManagerKeyState.getKey());
            cmd.add(iracManagerKeyState.getKeyInfo(), tombstone);
        }
        return Completable.fromCompletionStage(this.rpcManager.invokeCommand(dst, (CacheRpcCommand)cmd, rspCollector, rpcOptions).exceptionally(throwable -> {
            if (log.isTraceEnabled()) {
                log.tracef((Throwable)throwable, "Batch sent to %s failed! Batch=%s", dst, Util.toStr((Collection)batch));
            }
            return null;
        }));
    }

    private void trackState(IracManagerKeyState state) {
        IracManagerKeyState old;
        if (log.isTraceEnabled()) {
            log.tracef("[IRAC] Tracking state %s", state);
        }
        if ((old = this.updatedKeys.put(state.getKey(), state)) != null) {
            old.discard();
        }
        this.iracExecutor.run();
    }

    private CompletionStage<Void> run() {
        if (log.isTraceEnabled()) {
            log.tracef("[IRAC] Sending keys to remote site(s). Is clear? %s, keys: %s", this.hasClear, Util.toStr(this.updatedKeys.keySet()));
        }
        if (this.hasClear) {
            return this.sendClearUpdate();
        }
        return Flowable.fromIterable(this.updatedKeys.values()).filter(this::canStateBeSent).concatMapMaybe(this::fetchEntry).buffer(this.batchSize).concatMapCompletable(this::sendUpdateBatch).onErrorComplete(t -> {
            this.onUnexpectedThrowable((Throwable)t);
            return true;
        }).toCompletionStage(null);
    }

    public void setBackOff(Function<IracXSiteBackup, ExponentialBackOff> builder) {
        this.asyncBackups.forEach(backup -> backup.useBackOff((ExponentialBackOff)builder.apply((IracXSiteBackup)backup), this.iracExecutor));
    }

    public boolean isEmpty() {
        return this.updatedKeys.isEmpty();
    }

    private boolean canStateBeSent(IracManagerKeyState state) {
        LocalizedCacheTopology cacheTopology = this.clusteringDependentLogic.getCacheTopology();
        DistributionInfo dInfo = cacheTopology.getSegmentDistribution(state.getSegment());
        if (!dInfo.isWriteOwner() && !dInfo.isReadOwner()) {
            state.discard();
            this.removeStateFromLocal(state.getKeyInfo());
            return false;
        }
        return dInfo.isPrimary() && state.canSend();
    }

    private Maybe<IracStateData> fetchEntry(IracManagerKeyState state) {
        return Maybe.fromCompletionStage(this.clusteringDependentLogic.getEntryLoader().loadAndStoreInDataContainer(state.getKey(), state.getSegment()).thenApply(e -> new IracStateData(state, (InternalCacheEntry<Object, Object>)e, this.iracTombstoneManager.getTombstone(state.getKey()))).exceptionally(throwable -> {
            log.debugf((Throwable)throwable, "[IRAC] Failed to load entry to send to remote sites. It will be retried. State=%s", state);
            state.retry();
            return null;
        }));
    }

    private CompletableSource sendUpdateBatch(Collection<? extends IracStateData> batch) {
        int size = batch.size();
        boolean trace = log.isTraceEnabled();
        if (trace) {
            log.tracef("[IRAC] Batch ready to send remote site with %d keys", size);
        }
        if (size == 0) {
            if (trace) {
                log.trace("[IRAC] Batch not sent, reason: batch is empty");
            }
            return Completable.complete();
        }
        AggregateCompletionStage aggregation = CompletionStages.aggregateCompletionStage();
        for (IracXSiteBackup backup : this.asyncBackups) {
            if (backup.isBackOffEnabled()) {
                for (IracStateData iracStateData : batch) {
                    iracStateData.state.retry();
                }
                continue;
            }
            IracPutManyRequest cmd = this.commandsFactory.buildIracPutManyCommand(size);
            ArrayList<IracManagerKeyState> arrayList = new ArrayList<IracManagerKeyState>(size);
            ArrayList<IracManagerKeyState> validState = new ArrayList<IracManagerKeyState>(size);
            for (IracStateData iracStateData : batch) {
                if (iracStateData.entry == null && iracStateData.tombstone == null) {
                    arrayList.add(iracStateData.state);
                    continue;
                }
                if (iracStateData.state.wasSuccessful(backup)) continue;
                validState.add(iracStateData.state);
                if (iracStateData.state.isExpiration()) {
                    if (iracStateData.tombstone == null || iracStateData.entry != null) {
                        if (log.isTraceEnabled()) {
                            log.tracef("[IRAC] Skipping expiration command. Tombstone not found or entry written by another command. State=%s", iracStateData.state.getKeyInfo());
                        }
                        arrayList.add(iracStateData.state);
                        continue;
                    }
                    cmd.addExpire(iracStateData.state.getKey(), iracStateData.tombstone);
                    continue;
                }
                if (iracStateData.entry == null) {
                    cmd.addRemove(iracStateData.state.getKey(), iracStateData.tombstone);
                    continue;
                }
                cmd.addUpdate(iracStateData.state.getKey(), iracStateData.entry.getValue(), iracStateData.entry.getMetadata(), iracStateData.entry.getInternalMetadata().iracMetadata());
            }
            if (!cmd.isEmpty()) {
                IracResponseCollector rspCollector = new IracResponseCollector(this.commandsFactory.getCacheName(), backup, validState, this::onBatchResponse);
                try {
                    if (this.takeOfflineManager.getSiteState(backup.getSiteName()) != SiteState.OFFLINE) {
                        this.sendToRemoteSite(backup, cmd).whenCompleteAsync(rspCollector, this.iracExecutor.executor());
                        aggregation.dependsOn((CompletionStage)rspCollector);
                    } else {
                        rspCollector.onSiteOffline();
                    }
                }
                catch (Throwable throwable) {
                    this.onUnexpectedThrowable(throwable);
                    for (IracStateData iracStateData : batch) {
                        iracStateData.state.retry();
                    }
                }
            }
            if (arrayList.isEmpty()) continue;
            if (trace) {
                log.tracef("[IRAC] Removing %d invalid state(s)", arrayList.size());
            }
            arrayList.forEach(IracManagerKeyState::discard);
            this.removeStateFromCluster(arrayList);
        }
        return Completable.fromCompletionStage((CompletionStage)aggregation.freeze());
    }

    private CompletionStage<Void> sendClearUpdate() {
        IracClearKeysRequest cmd = this.commandsFactory.buildIracClearKeysCommand();
        IracClearResponseCollector collector = new IracClearResponseCollector(this.commandsFactory.getCacheName());
        for (IracXSiteBackup backup : this.asyncBackups) {
            if (this.takeOfflineManager.getSiteState(backup.getSiteName()) == SiteState.OFFLINE) continue;
            if (backup.isBackOffEnabled()) {
                collector.forceBackOffAndRetry();
                continue;
            }
            collector.dependsOn(backup, this.sendToRemoteSite(backup, cmd));
        }
        return collector.freeze().handle(this::onClearCompleted);
    }

    private Void onClearCompleted(IracBatchSendResult result, Throwable throwable) {
        if (throwable != null) {
            this.onUnexpectedThrowable(throwable);
            return null;
        }
        switch (result) {
            case OK: {
                this.hasClear = false;
            }
            case RETRY: 
            case BACK_OFF_AND_RETRY: {
                this.iracExecutor.run();
                break;
            }
            default: {
                this.onUnexpectedThrowable(new IllegalStateException("Unknown result: " + String.valueOf((Object)result)));
            }
        }
        return null;
    }

    private void onUnexpectedThrowable(Throwable throwable) {
        log.unexpectedErrorFromIrac(throwable);
        this.iracExecutor.run();
    }

    private void sendStateRequest(Address primary, IntSet segments) {
        IracRequestStateCommand cmd = this.commandsFactory.buildIracRequestStateCommand(segments);
        this.rpcManager.sendTo(primary, cmd, DeliverOrder.NONE);
    }

    private <O> XSiteResponse<O> sendToRemoteSite(XSiteBackup backup, XSiteCacheRequest<O> cmd) {
        XSiteResponse<O> rsp = this.rpcManager.invokeXSite(backup, cmd);
        this.takeOfflineManager.registerRequest(rsp);
        return rsp;
    }

    private void removeStateFromCluster(Collection<IracManagerKeyState> stateToCleanup) {
        if (stateToCleanup.isEmpty()) {
            return;
        }
        if (log.isTraceEnabled()) {
            log.tracef("[IRAC] Removing states from cluster: %s", Util.toStr(stateToCleanup));
        }
        IntSet segments = IntSets.mutableEmptySet();
        LocalizedCacheTopology cacheTopology = this.clusteringDependentLogic.getCacheTopology();
        HashSet<Address> owners = new HashSet<Address>(cacheTopology.getMembers().size());
        ArrayList<IracManagerKeyInfo> keysToCleanup = new ArrayList<IracManagerKeyInfo>(stateToCleanup.size());
        for (IracManagerKeyState state : stateToCleanup) {
            keysToCleanup.add(state.getKeyInfo());
            if (!segments.add(state.getSegment())) continue;
            owners.addAll(cacheTopology.getSegmentDistribution(state.getSegment()).writeOwners());
        }
        if (!segments.isEmpty()) {
            IracCleanupKeysCommand cmd = this.commandsFactory.buildIracCleanupKeyCommand(keysToCleanup);
            this.rpcManager.sendToMany(owners, cmd, DeliverOrder.NONE);
            keysToCleanup.forEach(this::removeStateFromLocal);
        }
    }

    private void removeStateFromLocal(IracManagerKeyInfo state) {
        this.updatedKeys.computeIfPresent(state.getKey(), (ignored, existingState) -> {
            if (existingState.getKeyInfo().equals(state)) {
                if (log.isTraceEnabled()) {
                    log.tracef("[IRAC] State removed? true, state=%s", existingState);
                }
                return null;
            }
            if (log.isTraceEnabled()) {
                log.tracef("[IRAC] State removed? false, state=%s", existingState);
            }
            return existingState;
        });
    }

    private void onBatchResponse(IracBatchSendResult result, Collection<? extends IracManagerKeyState> successfulSent) {
        if (log.isTraceEnabled()) {
            log.tracef("[IRAC] Batch completed with %d keys applied. Global result=%s", successfulSent.size(), (Object)result);
        }
        ArrayList<IracManagerKeyState> doneKeys = new ArrayList<IracManagerKeyState>(successfulSent.size());
        switch (result) {
            case OK: {
                for (IracManagerKeyState iracManagerKeyState : successfulSent) {
                    if (!iracManagerKeyState.isDone()) continue;
                    doneKeys.add(iracManagerKeyState);
                }
                break;
            }
            case RETRY: {
                this.iracExecutor.run();
                break;
            }
            case BACK_OFF_AND_RETRY: {
                break;
            }
            default: {
                this.onUnexpectedThrowable(new IllegalStateException("Unknown result: " + String.valueOf((Object)result)));
            }
        }
        this.removeStateFromCluster(doneKeys);
    }

    @ManagedAttribute(description="Number of keys that need to be sent to remote site(s)", displayName="Queue size", measurementType=MeasurementType.DYNAMIC)
    public int getQueueSize() {
        return this.statisticsEnabled ? this.updatedKeys.size() : -1;
    }

    @ManagedAttribute(description="Number of tombstones stored", displayName="Number of tombstones", measurementType=MeasurementType.DYNAMIC)
    public int getNumberOfTombstones() {
        return this.statisticsEnabled ? this.iracTombstoneManager.size() : -1;
    }

    @ManagedAttribute(description="The total number of conflicts between local and remote sites.", displayName="Number of conflicts", measurementType=MeasurementType.TRENDSUP)
    public long getNumberOfConflicts() {
        return this.statisticsEnabled ? this.sumConflicts() : -1L;
    }

    @ManagedAttribute(description="The number of updates from remote sites discarded (duplicate or old update).", displayName="Number of discards", measurementType=MeasurementType.TRENDSUP)
    public long getNumberOfDiscards() {
        return this.statisticsEnabled ? this.discardCounts.longValue() : -1L;
    }

    @ManagedAttribute(description="The number of conflicts where the merge policy discards the remote update.", displayName="Number of conflicts where local value is used", measurementType=MeasurementType.TRENDSUP)
    public long getNumberOfConflictsLocalWins() {
        return this.statisticsEnabled ? this.conflictLocalWinsCount.longValue() : -1L;
    }

    @ManagedAttribute(description="The number of conflicts where the merge policy applies the remote update.", displayName="Number of conflicts where remote value is used", measurementType=MeasurementType.TRENDSUP)
    public long getNumberOfConflictsRemoteWins() {
        return this.statisticsEnabled ? this.conflictRemoteWinsCount.longValue() : -1L;
    }

    @ManagedAttribute(description="Number of conflicts where the merge policy created a new entry.", displayName="Number of conflicts merged", measurementType=MeasurementType.TRENDSUP)
    public long getNumberOfConflictsMerged() {
        return this.statisticsEnabled ? this.conflictMergedCount.longValue() : -1L;
    }

    @ManagedAttribute(description="Is tombstone cleanup task running?", displayName="Tombstone cleanup task running", dataType=DataType.TRAIT)
    public boolean isTombstoneCleanupTaskRunning() {
        return this.iracTombstoneManager.isTaskRunning();
    }

    @ManagedAttribute(description="Current delay in milliseconds between tombstone cleanup tasks", displayName="Delay between tombstone cleanup tasks", measurementType=MeasurementType.DYNAMIC)
    public long getTombstoneCleanupTaskCurrentDelay() {
        return this.iracTombstoneManager.getCurrentDelayMillis();
    }

    @Override
    @ManagedAttribute(description="Enables or disables the gathering of statistics by this component", writable=true)
    public boolean getStatisticsEnabled() {
        return this.statisticsEnabled;
    }

    @Override
    public void setStatisticsEnabled(boolean enabled) {
        this.statisticsEnabled = enabled;
    }

    @Override
    @ManagedOperation(displayName="Reset Statistics", description="Resets statistics gathered by this component")
    public void resetStatistics() {
        this.discardCounts.reset();
        this.conflictLocalWinsCount.reset();
        this.conflictRemoteWinsCount.reset();
        this.conflictMergedCount.reset();
    }

    private long sumConflicts() {
        return this.conflictLocalWinsCount.longValue() + this.conflictRemoteWinsCount.longValue() + this.conflictMergedCount.longValue();
    }

    @Override
    public void incrementNumberOfDiscards() {
        this.discardCounts.increment();
    }

    @Override
    public void incrementNumberOfConflictLocalWins() {
        this.conflictLocalWinsCount.increment();
    }

    @Override
    public void incrementNumberOfConflictRemoteWins() {
        this.conflictRemoteWinsCount.increment();
    }

    @Override
    public void incrementNumberOfConflictMerged() {
        this.conflictMergedCount.increment();
    }

    @Override
    public boolean containsKey(Object key) {
        return this.updatedKeys.containsKey(key);
    }

    @Override
    public Stream<IracManagerKeyInfo> pendingKeys() {
        return this.updatedKeys.values().stream().map(IracManagerKeyState::getKeyInfo);
    }

    @Override
    public void checkStaleKeys(Address origin, Collection<IracManagerKeyInfo> keys) {
        LocalizedCacheTopology topology = this.clusteringDependentLogic.getCacheTopology();
        List<IracManagerKeyInfo> toCleanup = keys.stream().filter(info -> topology.getSegmentDistribution(info.getSegment()).isPrimary()).filter(info -> !this.updatedKeys.containsKey(info.getKey())).toList();
        if (toCleanup.isEmpty()) {
            return;
        }
        IracCleanupKeysCommand cmd = this.commandsFactory.buildIracCleanupKeyCommand(toCleanup);
        this.rpcManager.sendTo(origin, cmd, DeliverOrder.NONE);
    }

    private record IracStateData(IracManagerKeyState state, InternalCacheEntry<Object, Object> entry, IracMetadata tombstone) {
        public IracStateData {
            Objects.requireNonNull(state);
        }
    }
}

