/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.impl;

import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import javax.transaction.InvalidTransactionException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.SegmentSpecificCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.FunctionalCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand;
import org.infinispan.commands.functional.ReadWriteManyEntriesCommand;
import org.infinispan.commands.functional.WriteOnlyKeyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.ComputeCommand;
import org.infinispan.commands.write.ComputeIfAbsentCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.functional.Param;
import org.infinispan.interceptors.InvocationSuccessFunction;
import org.infinispan.interceptors.impl.JmxStatsCommandInterceptor;
import org.infinispan.interceptors.impl.TxBatchUpdater;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.MeasurementType;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.MarshallableEntryFactory;
import org.infinispan.persistence.support.BatchModification;
import org.infinispan.stream.StreamMarshalling;
import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@MBean(objectName="CacheStore", description="Component that handles storing of entries to a CacheStore from memory.")
public class CacheWriterInterceptor
extends JmxStatsCommandInterceptor {
    private static final Log log = LogFactory.getLog(CacheWriterInterceptor.class);
    private final boolean trace = this.getLog().isTraceEnabled();
    @Inject
    protected PersistenceManager persistenceManager;
    @Inject
    InternalEntryFactory entryFactory;
    @Inject
    TransactionManager transactionManager;
    @Inject
    KeyPartitioner keyPartitioner;
    @Inject
    MarshallableEntryFactory marshalledEntryFactory;
    final AtomicLong cacheStores = new AtomicLong(0L);
    protected InvocationSuccessFunction<PutMapCommand> handlePutMapCommandReturn = this::handlePutMapCommandReturn;

    protected Log getLog() {
        return log;
    }

    @Start(priority=15)
    protected void start() {
        this.setStatisticsEnabled(this.cacheConfiguration.jmxStatistics().enabled());
    }

    @Override
    public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
        return this.asyncInvokeNext((InvocationContext)ctx, (VisitableCommand)command, this.commitCommand(ctx));
    }

    @Override
    public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
        if (command.isOnePhaseCommit()) {
            return this.asyncInvokeNext((InvocationContext)ctx, (VisitableCommand)command, this.commitCommand(ctx));
        }
        return this.invokeNext(ctx, command);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletionStage<Void> commitCommand(TxInvocationContext ctx) throws Throwable {
        if (!((AbstractCacheTransaction)ctx.getCacheTransaction()).getAllModifications().isEmpty()) {
            GlobalTransaction tx = ctx.getGlobalTransaction();
            if (this.trace) {
                this.getLog().tracef("Calling loader.commit() for transaction %s", tx);
            }
            Transaction xaTx = null;
            try {
                xaTx = this.suspendRunningTx(ctx);
                CompletionStage<Void> completionStage = this.store(ctx);
                return completionStage;
            }
            finally {
                this.resumeRunningTx(xaTx);
            }
        }
        if (this.trace) {
            this.getLog().trace("Commit called with no modifications; ignoring.");
        }
        return null;
    }

    private void resumeRunningTx(Transaction xaTx) throws InvalidTransactionException, SystemException {
        if (this.transactionManager != null && xaTx != null) {
            this.transactionManager.resume(xaTx);
        }
    }

    private Transaction suspendRunningTx(TxInvocationContext ctx) throws SystemException {
        Transaction xaTx = null;
        if (this.transactionManager != null && (xaTx = this.transactionManager.suspend()) != null && !ctx.isOriginLocal()) {
            throw new IllegalStateException("It is only possible to be in the context of an JRA transaction in the local node.");
        }
        return xaTx;
    }

    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, (rCtx, removeCommand, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)removeCommand) || rCtx.isInTxScope() || !removeCommand.isSuccessful() || !this.isProperWriter(rCtx, (FlagAffectedCommand)removeCommand, removeCommand.getKey())) {
                return rv;
            }
            Object key = removeCommand.getKey();
            CompletionStage<Object> stage = this.persistenceManager.deleteFromAllStores(key, removeCommand.getSegment(), PersistenceManager.AccessMode.BOTH);
            if (this.trace) {
                stage = stage.thenAccept(removed -> this.getLog().tracef("Removed entry under key %s and got response %s from CacheStore", key, removed));
            }
            return CacheWriterInterceptor.delayedValue(stage, rv);
        });
    }

    @Override
    public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
        if (this.isStoreEnabled(command) && !ctx.isInTxScope()) {
            return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.persistenceManager.clearAllStores(ctx.isOriginLocal() ? PersistenceManager.AccessMode.BOTH : PersistenceManager.AccessMode.PRIVATE));
        }
        return this.invokeNext(ctx, command);
    }

    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, (rCtx, putKeyValueCommand, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)putKeyValueCommand) || rCtx.isInTxScope() || !putKeyValueCommand.isSuccessful()) {
                return rv;
            }
            if (!this.isProperWriter(rCtx, (FlagAffectedCommand)putKeyValueCommand, putKeyValueCommand.getKey())) {
                return rv;
            }
            Object key = putKeyValueCommand.getKey();
            return CacheWriterInterceptor.delayedValue(this.storeEntry(rCtx, key, (FlagAffectedCommand)putKeyValueCommand), rv);
        });
    }

    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, (rCtx, replaceCommand, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)replaceCommand) || rCtx.isInTxScope() || !replaceCommand.isSuccessful()) {
                return rv;
            }
            if (!this.isProperWriter(rCtx, (FlagAffectedCommand)replaceCommand, replaceCommand.getKey())) {
                return rv;
            }
            Object key = replaceCommand.getKey();
            return CacheWriterInterceptor.delayedValue(this.storeEntry(rCtx, key, (FlagAffectedCommand)replaceCommand), rv);
        });
    }

    @Override
    public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, (rCtx, computeCommand, rv) -> {
            CompletionStage<Object> resultStage;
            if (!this.isStoreEnabled((FlagAffectedCommand)computeCommand) || rCtx.isInTxScope() || !computeCommand.isSuccessful() || !this.isProperWriter(rCtx, (FlagAffectedCommand)computeCommand, computeCommand.getKey())) {
                return rv;
            }
            Object key = computeCommand.getKey();
            if (rv == null) {
                CompletionStage<Boolean> stage = this.persistenceManager.deleteFromAllStores(key, computeCommand.getSegment(), PersistenceManager.AccessMode.BOTH);
                resultStage = this.trace ? stage.thenAccept(removed -> this.getLog().tracef("Removed entry under key %s and got response %s from CacheStore", key, removed)) : stage;
            } else {
                resultStage = this.storeEntry(rCtx, key, (FlagAffectedCommand)computeCommand);
            }
            return CacheWriterInterceptor.delayedValue(resultStage, rv);
        });
    }

    @Override
    public Object visitComputeIfAbsentCommand(InvocationContext ctx, ComputeIfAbsentCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, (rCtx, computeIfAbsentCommand, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)computeIfAbsentCommand) || rCtx.isInTxScope() || !computeIfAbsentCommand.isSuccessful()) {
                return rv;
            }
            if (!this.isProperWriter(rCtx, (FlagAffectedCommand)computeIfAbsentCommand, computeIfAbsentCommand.getKey())) {
                return rv;
            }
            if (rv != null) {
                Object key = computeIfAbsentCommand.getKey();
                return CacheWriterInterceptor.delayedValue(this.storeEntry(rCtx, key, (FlagAffectedCommand)computeIfAbsentCommand), rv);
            }
            return rv;
        });
    }

    @Override
    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handlePutMapCommandReturn);
    }

    protected Object handlePutMapCommandReturn(InvocationContext rCtx, PutMapCommand putMapCommand, Object rv) {
        if (!this.isStoreEnabled(putMapCommand) || rCtx.isInTxScope()) {
            return rv;
        }
        CompletionStage<Void> writeStage = CompletionStages.allOf(this.processIterableBatch(rCtx, putMapCommand, PersistenceManager.AccessMode.BOTH, key -> !this.skipSharedStores(rCtx, key, putMapCommand)), this.processIterableBatch(rCtx, putMapCommand, PersistenceManager.AccessMode.PRIVATE, key -> this.skipSharedStores(rCtx, key, putMapCommand)));
        return CacheWriterInterceptor.delayedValue(writeStage, rv);
    }

    protected CompletionStage<Void> processIterableBatch(InvocationContext ctx, PutMapCommand cmd, PersistenceManager.AccessMode mode, Predicate<Object> filter) {
        if (this.getStatisticsEnabled()) {
            this.cacheStores.addAndGet(cmd.getMap().size());
        }
        Iterable<MarshallableEntry> iterable = () -> cmd.getMap().keySet().stream().filter(filter).map(key -> this.marshalledEntry(ctx, key)).filter(StreamMarshalling.nonNullPredicate()).iterator();
        return this.persistenceManager.writeBatchToAllNonTxStores(iterable, mode, cmd.getFlagsBitSet());
    }

    @Override
    public Object visitReadWriteKeyCommand(InvocationContext ctx, ReadWriteKeyCommand command) throws Throwable {
        return this.visitWriteCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteKeyValueCommand(InvocationContext ctx, ReadWriteKeyValueCommand command) throws Throwable {
        return this.visitWriteCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyKeyCommand(InvocationContext ctx, WriteOnlyKeyCommand command) throws Throwable {
        return this.visitWriteCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyValueCommand command) throws Throwable {
        return this.visitWriteCommand(ctx, command);
    }

    private <T extends DataWriteCommand & FunctionalCommand> Object visitWriteCommand(InvocationContext ctx, T command) {
        return this.invokeNextThenApply(ctx, command, (rCtx, dataWriteCommand, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)dataWriteCommand) || rCtx.isInTxScope() || !dataWriteCommand.isSuccessful() || !this.isProperWriter(rCtx, (FlagAffectedCommand)dataWriteCommand, dataWriteCommand.getKey())) {
                return rv;
            }
            CompletionStage<Object> stage = CompletableFutures.completedNull();
            Param persistMode = ((FunctionalCommand)((Object)dataWriteCommand)).getParams().get(0);
            switch ((Param.PersistenceMode)persistMode.get()) {
                case LOAD_PERSIST: 
                case SKIP_LOAD: {
                    Object key = dataWriteCommand.getKey();
                    CacheEntry entry = rCtx.lookupEntry(key);
                    if (entry != null) {
                        if (entry.isRemoved()) {
                            stage = this.persistenceManager.deleteFromAllStores(key, dataWriteCommand.getSegment(), PersistenceManager.AccessMode.BOTH);
                            if (this.trace) {
                                stage = stage.thenAccept(removed -> this.getLog().tracef("Removed entry under key %s and got response %s from CacheStore", key, removed));
                            }
                        } else if (entry.isChanged()) {
                            stage = this.storeEntry(rCtx, key, (FlagAffectedCommand)dataWriteCommand);
                            if (this.trace) {
                                stage = stage.thenAccept(removed -> this.getLog().tracef("Stored entry for key %s in CacheStore", key));
                            }
                        } else if (this.trace) {
                            this.getLog().tracef("Skipping write for key %s as entry wasn't changed", new Object[0]);
                        }
                    }
                    log.trace("Skipping cache store since entry was not found in context");
                    break;
                }
                case SKIP_PERSIST: 
                case SKIP: {
                    log.trace("Skipping cache store since persistence mode parameter is SKIP");
                }
            }
            return CacheWriterInterceptor.delayedValue(stage, rv);
        });
    }

    @Override
    public Object visitWriteOnlyManyCommand(InvocationContext ctx, WriteOnlyManyCommand command) throws Throwable {
        return this.visitWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyManyEntriesCommand(InvocationContext ctx, WriteOnlyManyEntriesCommand command) throws Throwable {
        return this.visitWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteManyCommand(InvocationContext ctx, ReadWriteManyCommand command) throws Throwable {
        return this.visitWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteManyEntriesCommand(InvocationContext ctx, ReadWriteManyEntriesCommand command) throws Throwable {
        return this.visitWriteManyCommand(ctx, command);
    }

    private <T extends WriteCommand & FunctionalCommand> Object visitWriteManyCommand(InvocationContext ctx, T command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, (rCtx, manyEntriesCommand, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)manyEntriesCommand) || rCtx.isInTxScope()) {
                return rv;
            }
            CompletionStage<Object> stage = CompletableFutures.completedNull();
            Param persistMode = ((FunctionalCommand)((Object)manyEntriesCommand)).getParams().get(0);
            switch ((Param.PersistenceMode)persistMode.get()) {
                case LOAD_PERSIST: 
                case SKIP_LOAD: {
                    AggregateCompletionStage<Void> composedCompletionStage = CompletionStages.aggregateCompletionStage();
                    int storedCount = 0;
                    for (Object key : manyEntriesCommand.getAffectedKeys()) {
                        CacheEntry entry = rCtx.lookupEntry(key);
                        if (entry == null) continue;
                        if (entry.isRemoved()) {
                            CompletionStage<Object> innerStage = this.persistenceManager.deleteFromAllStores(key, this.keyPartitioner.getSegment(key), PersistenceManager.AccessMode.BOTH);
                            if (this.trace) {
                                innerStage = innerStage.thenAccept(removed -> this.getLog().tracef("Removed entry under key %s and got response %s from CacheStore", key, removed));
                            }
                            composedCompletionStage.dependsOn(innerStage);
                            continue;
                        }
                        if (!entry.isChanged() || !this.isProperWriter(rCtx, (FlagAffectedCommand)manyEntriesCommand, key)) continue;
                        composedCompletionStage.dependsOn(this.storeEntry(rCtx, key, (FlagAffectedCommand)manyEntriesCommand, false));
                        ++storedCount;
                    }
                    if (this.getStatisticsEnabled()) {
                        this.cacheStores.getAndAdd(storedCount);
                    }
                    stage = composedCompletionStage.freeze();
                    break;
                }
                case SKIP_PERSIST: 
                case SKIP: {
                    log.trace("Skipping cache store since persistence mode parameter is SKIP");
                }
            }
            return CacheWriterInterceptor.delayedValue(stage, rv);
        });
    }

    protected final CompletionStage<Void> store(TxInvocationContext ctx) throws Throwable {
        List<WriteCommand> modifications = ((AbstractCacheTransaction)ctx.getCacheTransaction()).getAllModifications();
        if (modifications.isEmpty()) {
            if (this.trace) {
                this.getLog().trace("Transaction has not logged any modifications!");
            }
            return CompletableFutures.completedNull();
        }
        if (this.trace) {
            this.getLog().tracef("Cache loader modification list: %s", modifications);
        }
        TxBatchUpdater modsBuilder = TxBatchUpdater.createNonTxStoreUpdater(this, this.persistenceManager, this.entryFactory, this.marshalledEntryFactory);
        for (WriteCommand cacheCommand : modifications) {
            if (!this.isStoreEnabled(cacheCommand)) continue;
            cacheCommand.acceptVisitor(ctx, modsBuilder);
        }
        BatchModification sharedMods = modsBuilder.getModifications();
        BatchModification nonSharedMods = modsBuilder.getNonSharedModifications();
        CompletionStage<Void> writeStage = CompletionStages.allOf(this.persistenceManager.writeBatchToAllNonTxStores(sharedMods.getMarshallableEntries(), PersistenceManager.AccessMode.BOTH, 0L), this.persistenceManager.writeBatchToAllNonTxStores(nonSharedMods.getMarshallableEntries(), PersistenceManager.AccessMode.PRIVATE, 0L), this.persistenceManager.deleteBatchFromAllNonTxStores(sharedMods.getKeysToRemove(), PersistenceManager.AccessMode.BOTH, 0L), this.persistenceManager.deleteBatchFromAllNonTxStores(nonSharedMods.getKeysToRemove(), PersistenceManager.AccessMode.PRIVATE, 0L));
        if (this.trace) {
            this.getLog().tracef("Writing shared batch with #entries=%d and non-shared batch with #entries=%d", sharedMods.getMarshallableEntries().size(), nonSharedMods.getMarshallableEntries().size());
            this.getLog().tracef("Deleting shared batch with #entries=%d and non-shared batch with #entries=%d", sharedMods.getKeysToRemove().size(), nonSharedMods.getKeysToRemove().size());
        }
        if (this.getStatisticsEnabled() && modsBuilder.getPutCount() > 0) {
            this.cacheStores.getAndAdd(modsBuilder.getPutCount());
        }
        return writeStage;
    }

    protected boolean isStoreEnabled(FlagAffectedCommand command) {
        if (command.hasAnyFlag(FlagBitSets.SKIP_CACHE_STORE)) {
            log.trace("Skipping cache store since the call contain a skip cache store flag");
            return false;
        }
        return true;
    }

    protected boolean isProperWriter(InvocationContext ctx, FlagAffectedCommand command, Object key) {
        return true;
    }

    @Override
    public void resetStatistics() {
        this.cacheStores.set(0L);
    }

    @ManagedAttribute(description="Number of writes to the store", displayName="Number of writes to the store", measurementType=MeasurementType.TRENDSUP)
    public long getWritesToTheStores() {
        return this.cacheStores.get();
    }

    @ManagedAttribute(description="Number of entries currently persisted excluding expired entries", displayName="Number of persisted entries")
    public int getNumberOfPersistedEntries() {
        return CompletionStages.join(this.persistenceManager.size());
    }

    CompletionStage<Void> storeEntry(InvocationContext ctx, Object key, FlagAffectedCommand command) {
        return this.storeEntry(ctx, key, command, true);
    }

    CompletionStage<Void> storeEntry(InvocationContext ctx, Object key, FlagAffectedCommand command, boolean incrementStats) {
        if (this.persistenceManager.isReadOnly()) {
            return CompletableFutures.completedNull();
        }
        MarshallableEntry entry = this.marshalledEntry(ctx, key);
        if (entry != null) {
            CompletionStage<Void> stage = this.persistenceManager.writeToAllNonTxStores(entry, SegmentSpecificCommand.extractSegment(command, key, this.keyPartitioner), this.skipSharedStores(ctx, key, command) ? PersistenceManager.AccessMode.PRIVATE : PersistenceManager.AccessMode.BOTH, command.getFlagsBitSet());
            if (this.trace) {
                stage = stage.thenAccept(ignore -> this.getLog().tracef("Stored entry %s under key %s", entry.getValue(), key));
            }
            if (incrementStats && this.getStatisticsEnabled()) {
                stage = stage.thenAccept(ignore -> this.cacheStores.incrementAndGet());
            }
            return stage;
        }
        return CompletableFutures.completedNull();
    }

    MarshallableEntry marshalledEntry(InvocationContext ctx, Object key) {
        InternalCacheValue sv = this.entryFactory.getValueFromCtx(key, ctx);
        return sv != null ? this.marshalledEntryFactory.create(key, sv.getValue(), sv.getMetadata(), sv.getCreated(), sv.getLastUsed()) : null;
    }

    protected boolean skipSharedStores(InvocationContext ctx, Object key, FlagAffectedCommand command) {
        return !ctx.isOriginLocal() || command.hasAnyFlag(FlagBitSets.SKIP_SHARED_CACHE_STORE);
    }
}

