/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.query.backend;

import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.InvocationStage;
import org.infinispan.interceptors.InvocationSuccessFunction;
import org.infinispan.query.backend.QueryInterceptor;
import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.WithinThreadExecutor;

public final class TxQueryInterceptor
extends DDAsyncInterceptor {
    private final ConcurrentMap<GlobalTransaction, Map<Object, Object>> txOldValues;
    private final QueryInterceptor queryInterceptor;
    private final InvocationSuccessFunction<VisitableCommand> commitModificationsToIndex = this::commitModificationsToIndexFuture;

    public TxQueryInterceptor(ConcurrentMap<GlobalTransaction, Map<Object, Object>> txOldValues, QueryInterceptor queryInterceptor) {
        this.txOldValues = txOldValues;
        this.queryInterceptor = queryInterceptor;
    }

    public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) {
        if (command.isOnePhaseCommit()) {
            return this.invokeNextThenApply((InvocationContext)ctx, (VisitableCommand)command, this.commitModificationsToIndex);
        }
        return this.invokeNext((InvocationContext)ctx, (VisitableCommand)command);
    }

    public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) {
        return this.invokeNextThenApply((InvocationContext)ctx, (VisitableCommand)command, this.commitModificationsToIndex);
    }

    private InvocationStage commitModificationsToIndexFuture(InvocationContext ctx, VisitableCommand cmd, Object rv) {
        TxInvocationContext txCtx = (TxInvocationContext)ctx;
        Map removed = (Map)this.txOldValues.remove(txCtx.getGlobalTransaction());
        Map oldValues = removed == null ? Collections.emptyMap() : removed;
        AbstractCacheTransaction transaction = txCtx.getCacheTransaction();
        CompletionStage stage = CompletionStages.performConcurrently(transaction.getAllModifications().stream().filter(mod -> !mod.hasAnyFlag(FlagBitSets.SKIP_INDEXING)).flatMap(mod -> mod.getAffectedKeys().stream()), (int)100, (Scheduler)Schedulers.from((Executor)new WithinThreadExecutor()), key -> {
            CacheEntry entry = txCtx.lookupEntry(key);
            if (entry != null) {
                Object oldValue = oldValues.getOrDefault(key, QueryInterceptor.UNKNOWN);
                return this.queryInterceptor.processChange(ctx, null, key, oldValue, entry.getValue());
            }
            return CompletableFutures.completedNull();
        });
        return TxQueryInterceptor.asyncValue((CompletionStage)stage);
    }
}

