/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.distribution;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.IncrementableEntryVersion;
import org.infinispan.container.versioning.InequalVersionComparisonResult;
import org.infinispan.container.versioning.VersionGenerator;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.distribution.TxDistributionInterceptor;
import org.infinispan.interceptors.distribution.VersionedResult;
import org.infinispan.interceptors.distribution.VersionedResults;
import org.infinispan.remoting.responses.PrepareResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.impl.WriteSkewHelper;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class VersionedDistributionInterceptor
extends TxDistributionInterceptor {
    private static final Log log = LogFactory.getLog(VersionedDistributionInterceptor.class);
    @Inject
    VersionGenerator versionGenerator;

    @Override
    protected Log getLog() {
        return log;
    }

    @Override
    protected void wrapRemoteEntry(InvocationContext ctx, Object key, CacheEntry ice, boolean isWrite) {
        Object cacheTransaction;
        EntryVersion seenVersion;
        if (ctx.isInTxScope() && (seenVersion = (EntryVersion)((AbstractCacheTransaction)(cacheTransaction = ((TxInvocationContext)ctx).getCacheTransaction())).getVersionsRead().get(key)) != null) {
            IncrementableEntryVersion newVersion = WriteSkewHelper.versionFromEntry(ice);
            if (newVersion == null) {
                throw new IllegalStateException("Wrapping entry without version");
            }
            if (seenVersion.compareTo(newVersion) != InequalVersionComparisonResult.EQUAL) {
                if (ctx.lookupEntry(key) == null) {
                    throw Log.CONTAINER.writeSkewOnRead(key, key, seenVersion, newVersion);
                }
                return;
            }
        }
        super.wrapRemoteEntry(ctx, key, ice, isWrite);
    }

    @Override
    protected Object wrapFunctionalResultOnNonOriginOnReturn(Object rv, CacheEntry entry) {
        IncrementableEntryVersion version = WriteSkewHelper.versionFromEntry(entry);
        return new VersionedResult(rv, version == null ? this.versionGenerator.nonExistingVersion() : version);
    }

    @Override
    protected Object wrapFunctionalManyResultOnNonOrigin(InvocationContext ctx, Collection<?> keys, Object[] values) {
        EntryVersion[] versions = new EntryVersion[keys.size()];
        int i = 0;
        for (Object key : keys) {
            IncrementableEntryVersion version = WriteSkewHelper.versionFromEntry(ctx.lookupEntry(key));
            versions[i++] = version == null ? this.versionGenerator.nonExistingVersion() : version;
        }
        return new VersionedResults(values, versions);
    }

    @Override
    protected Object[] unwrapFunctionalManyResultOnOrigin(InvocationContext ctx, List<Object> keys, Object responseValue) {
        if (responseValue instanceof VersionedResults) {
            VersionedResults vrs = (VersionedResults)responseValue;
            if (ctx.isInTxScope()) {
                Object tx = ((TxInvocationContext)ctx).getCacheTransaction();
                for (int i = 0; i < vrs.versions.length; ++i) {
                    this.checkAndAddReadVersion((AbstractCacheTransaction)tx, keys.get(i), vrs.versions[i]);
                }
            }
            return vrs.values;
        }
        return null;
    }

    @Override
    protected Object unwrapFunctionalResultOnOrigin(InvocationContext ctx, Object key, Object responseValue) {
        VersionedResult vr = (VersionedResult)responseValue;
        if (ctx.isInTxScope()) {
            Object tx = ((TxInvocationContext)ctx).getCacheTransaction();
            this.checkAndAddReadVersion((AbstractCacheTransaction)tx, key, vr.version);
        }
        return vr.result;
    }

    private void checkAndAddReadVersion(AbstractCacheTransaction tx, Object key, EntryVersion version) {
        EntryVersion lastVersionSeen = tx.getVersionsRead().get(key);
        if (lastVersionSeen != null && lastVersionSeen.compareTo(version) != InequalVersionComparisonResult.EQUAL) {
            throw Log.CONTAINER.writeSkewOnRead(key, key, lastVersionSeen, version);
        }
        tx.addVersionRead(key, version);
    }

    @Override
    protected CompletionStage<Object> prepareOnAffectedNodes(TxInvocationContext<?> ctx, PrepareCommand command, Collection<Address> recipients) {
        CompletionStage<Map<Address, Response>> remoteInvocation;
        if (recipients != null) {
            MapResponseCollector collector = MapResponseCollector.ignoreLeavers(recipients.size());
            remoteInvocation = this.rpcManager.invokeCommand(recipients, (CacheRpcCommand)command, collector, this.rpcManager.getSyncRpcOptions());
        } else {
            MapResponseCollector collector = MapResponseCollector.ignoreLeavers();
            remoteInvocation = this.rpcManager.invokeCommandOnAll(command, collector, this.rpcManager.getSyncRpcOptions());
        }
        return remoteInvocation.handle((responses, t) -> {
            VersionedDistributionInterceptor.transactionRemotelyPrepared(ctx);
            CompletableFutures.rethrowExceptionIfPresent((Throwable)t);
            PrepareResponse prepareResponse = new PrepareResponse();
            this.checkTxCommandResponses((Map<Address, Response>)responses, command, (TxInvocationContext<LocalTransaction>)ctx, recipients, prepareResponse);
            Object ct = ctx.getCacheTransaction();
            ct.setUpdatedEntryVersions(prepareResponse.mergeEntryVersions(ct.getUpdatedEntryVersions()));
            return prepareResponse;
        });
    }
}

