/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import org.infinispan.commands.AbstractVisitor;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.ReadOnlyKeyCommand;
import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.GetKeysInGroupCommand;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.InvocationFinallyFunction;
import org.infinispan.remoting.RemoteException;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.statetransfer.AllOwnersLostException;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public abstract class BaseStateTransferInterceptor
extends DDAsyncInterceptor {
    private final InvocationFinallyFunction<VisitableCommand> handleReadCommandReturn = this::handleReadCommandReturn;
    @Inject
    Configuration configuration;
    @Inject
    protected StateTransferLock stateTransferLock;
    @Inject
    @ComponentName(value="org.infinispan.executors.non-blocking")
    Executor nonBlockingExecutor;
    @Inject
    DistributionManager distributionManager;
    @Inject
    @ComponentName(value="org.infinispan.executors.timeout")
    ScheduledExecutorService timeoutExecutor;
    private long transactionDataTimeout;
    private boolean isScattered;
    private final InvocationFinallyFunction<GetKeysInGroupCommand> handleLocalGetKeysInGroupReturn = this::handleLocalGetKeysInGroupReturn;

    @Start
    public void start() {
        this.transactionDataTimeout = this.configuration.clustering().remoteTimeout();
        this.isScattered = this.configuration.clustering().cacheMode().isScattered();
    }

    @Override
    public Object visitGetKeysInGroupCommand(InvocationContext ctx, GetKeysInGroupCommand command) throws Throwable {
        this.updateTopologyId(command);
        if (ctx.isOriginLocal()) {
            return this.invokeNextAndHandle(ctx, command, this.handleLocalGetKeysInGroupReturn);
        }
        return this.invokeNextThenAccept(ctx, command, (rCtx, cmd, rv) -> {
            int commandTopologyId = cmd.getTopologyId();
            if (this.currentTopologyId() != commandTopologyId && !this.distributionManager.getCacheTopology().isReadOwner(cmd.getGroupName())) {
                throw OutdatedTopologyException.RETRY_NEXT_TOPOLOGY;
            }
        });
    }

    private Object handleLocalGetKeysInGroupReturn(InvocationContext ctx, GetKeysInGroupCommand cmd, Object rv, Throwable throwable) throws Throwable {
        boolean shouldRetry;
        int commandTopologyId = cmd.getTopologyId();
        if (throwable != null) {
            Throwable ce = throwable;
            while (ce instanceof RemoteException) {
                ce = ce.getCause();
            }
            shouldRetry = ce instanceof OutdatedTopologyException || ce instanceof SuspectException;
        } else {
            boolean bl = shouldRetry = this.currentTopologyId() != commandTopologyId && this.distributionManager.getCacheTopology().isWriteOwner(cmd.getGroupName());
        }
        if (shouldRetry) {
            this.logRetry(this.currentTopologyId(), cmd);
            int newTopologyId = Math.max(this.currentTopologyId(), commandTopologyId + 1);
            cmd.setTopologyId(newTopologyId);
            CompletableFuture<Void> transactionDataFuture = this.stateTransferLock.transactionDataFuture(newTopologyId);
            return this.retryWhenDone(transactionDataFuture, newTopologyId, ctx, cmd, this.handleLocalGetKeysInGroupReturn);
        }
        return BaseStateTransferInterceptor.valueOrException(rv, throwable);
    }

    protected final void logRetry(int currentTopologyId, TopologyAffectedCommand cmd) {
        if (this.getLog().isTraceEnabled()) {
            this.getLog().tracef("Retrying command because of topology change, current topology is %d, command topology %d: %s", currentTopologyId, cmd.getTopologyId(), cmd);
        }
    }

    protected final int currentTopologyId() {
        LocalizedCacheTopology cacheTopology = this.distributionManager.getCacheTopology();
        return cacheTopology == null ? -1 : cacheTopology.getTopologyId();
    }

    protected final void updateTopologyId(TopologyAffectedCommand command) {
        if (command.getTopologyId() == -1) {
            int topologyId;
            LocalizedCacheTopology cacheTopology = this.distributionManager.getCacheTopology();
            int n = topologyId = cacheTopology == null ? 0 : cacheTopology.getTopologyId();
            if (this.getLog().isTraceEnabled()) {
                this.getLog().tracef("Setting command topology to %d", topologyId);
            }
            command.setTopologyId(topologyId);
        }
    }

    protected <T extends VisitableCommand> Object retryWhenDone(CompletableFuture<Void> future, int topologyId, InvocationContext ctx, T command, InvocationFinallyFunction<T> callback) {
        if (future.isDone()) {
            this.getLog().tracef("Retrying command %s for topology %d", command, topologyId);
            return this.invokeNextAndHandle(ctx, command, callback);
        }
        CancellableRetry<T> cancellableRetry = new CancellableRetry<T>(command, topologyId);
        CompletionStage retryFuture = future.handleAsync(cancellableRetry, this.nonBlockingExecutor);
        cancellableRetry.setRetryFuture((CompletableFuture<Void>)retryFuture);
        ScheduledFuture<?> timeoutFuture = this.timeoutExecutor.schedule(cancellableRetry, this.transactionDataTimeout, TimeUnit.MILLISECONDS);
        cancellableRetry.setTimeoutFuture(timeoutFuture);
        return BaseStateTransferInterceptor.makeStage(this.asyncInvokeNext(ctx, command, retryFuture)).andHandle(ctx, command, callback);
    }

    @Override
    public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    @Override
    public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    @Override
    public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    protected <C extends VisitableCommand & TopologyAffectedCommand> Object handleReadCommand(InvocationContext ctx, C command) {
        this.updateTopologyId(command);
        if (this.isScattered && ((TopologyAffectedCommand)command).getTopologyId() > this.distributionManager.getCacheTopology().getTopologyId()) {
            Object invokeNext = this.asyncInvokeNext(ctx, command, this.stateTransferLock.transactionDataFuture(((TopologyAffectedCommand)command).getTopologyId()));
            return BaseStateTransferInterceptor.makeStage(invokeNext).andHandle(ctx, command, this.handleReadCommandReturn);
        }
        return this.invokeNextAndHandle(ctx, command, this.handleReadCommandReturn);
    }

    private Object handleExceptionOnReadCommandReturn(InvocationContext rCtx, VisitableCommand rCommand, Throwable t) throws Throwable {
        int requestedTopologyId;
        Throwable ce = t;
        while (ce instanceof RemoteException) {
            ce = ce.getCause();
        }
        TopologyAffectedCommand cmd = (TopologyAffectedCommand)((Object)rCommand);
        LocalizedCacheTopology cacheTopology = this.distributionManager.getCacheTopology();
        int currentTopologyId = cacheTopology.getTopologyId();
        if (ce instanceof SuspectException) {
            throw new IllegalStateException("Read commands must ignore leavers");
        }
        if (ce instanceof OutdatedTopologyException) {
            this.logRetry(currentTopologyId, cmd);
            OutdatedTopologyException ote = (OutdatedTopologyException)((Object)ce);
            requestedTopologyId = cmd.getTopologyId() + ote.topologyIdDelta;
        } else if (ce instanceof AllOwnersLostException) {
            if (this.getLog().isTraceEnabled()) {
                this.getLog().tracef("All owners for command %s have been lost.", cmd);
            }
            requestedTopologyId = cmd.getTopologyId() + 1;
        } else {
            throw t;
        }
        int retryTopologyId = Math.max(currentTopologyId, requestedTopologyId);
        cmd.setTopologyId(retryTopologyId);
        ((FlagAffectedCommand)((Object)cmd)).addFlags(FlagBitSets.COMMAND_RETRY);
        if (retryTopologyId == currentTopologyId) {
            return this.invokeNextAndHandle(rCtx, rCommand, this.handleReadCommandReturn);
        }
        return BaseStateTransferInterceptor.makeStage(this.asyncInvokeNext(rCtx, rCommand, this.stateTransferLock.transactionDataFuture(retryTopologyId))).andHandle(rCtx, rCommand, this.handleReadCommandReturn);
    }

    private Object handleReadCommandReturn(InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable t) throws Throwable {
        if (t == null) {
            return rv;
        }
        return this.handleExceptionOnReadCommandReturn(rCtx, rCommand, t);
    }

    protected int getNewTopologyId(Throwable ce, int currentTopologyId, TopologyAffectedCommand command) {
        int requestedDelta = ce instanceof OutdatedTopologyException ? ((OutdatedTopologyException)((Object)ce)).topologyIdDelta : 1;
        return Math.max(currentTopologyId, command.getTopologyId() + requestedDelta);
    }

    @Override
    public Object visitReadOnlyKeyCommand(InvocationContext ctx, ReadOnlyKeyCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    @Override
    public Object visitReadOnlyManyCommand(InvocationContext ctx, ReadOnlyManyCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    protected abstract Log getLog();

    protected static class LostDataVisitor
    extends AbstractVisitor {
        public static final LostDataVisitor INSTANCE = new LostDataVisitor();

        protected LostDataVisitor() {
        }

        @Override
        public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
            return null;
        }

        @Override
        public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
            return null;
        }

        @Override
        public Object visitReadOnlyKeyCommand(InvocationContext ctx, ReadOnlyKeyCommand command) throws Throwable {
            return command.performOnLostData();
        }
    }

    private static class CancellableRetry<T extends VisitableCommand>
    implements BiFunction<Void, Throwable, Void>,
    Runnable {
        private static final AtomicReferenceFieldUpdater<CancellableRetry, Throwable> cancellableRetryUpdater = AtomicReferenceFieldUpdater.newUpdater(CancellableRetry.class, Throwable.class, "cancelled");
        private static final AtomicReferenceFieldUpdater<CancellableRetry, Object> timeoutFutureUpdater = AtomicReferenceFieldUpdater.newUpdater(CancellableRetry.class, Object.class, "timeoutFuture");
        private static final Log log = LogFactory.getLog(CancellableRetry.class);
        private static final Throwable DUMMY = new Throwable("Command is retried");
        private final T command;
        private final int topologyId;
        private volatile Throwable cancelled = null;
        private CompletableFuture<Void> retryFuture;
        private volatile Object timeoutFuture;

        CancellableRetry(T command, int topologyId) {
            this.command = command;
            this.topologyId = topologyId;
        }

        @Override
        public Void apply(Void nil, Throwable throwable) {
            if (!timeoutFutureUpdater.compareAndSet(this, null, DUMMY)) {
                ((ScheduledFuture)this.timeoutFuture).cancel(false);
            }
            if (throwable != null) {
                throw CompletableFutures.asCompletionException(throwable);
            }
            if (!cancellableRetryUpdater.compareAndSet(this, null, DUMMY)) {
                log.tracef("Not retrying command %s as it has been cancelled.", this.command);
                throw CompletableFutures.asCompletionException(this.cancelled);
            }
            log.tracef("Retrying command %s for topology %d", this.command, this.topologyId);
            return null;
        }

        @Override
        public void run() {
            TimeoutException timeoutException = new TimeoutException("Timed out waiting for topology " + this.topologyId);
            if (cancellableRetryUpdater.compareAndSet(this, null, (Throwable)((Object)timeoutException))) {
                this.retryFuture.completeExceptionally((Throwable)((Object)timeoutException));
            }
        }

        void setRetryFuture(CompletableFuture<Void> retryFuture) {
            this.retryFuture = retryFuture;
        }

        void setTimeoutFuture(ScheduledFuture<?> timeoutFuture) {
            if (!timeoutFutureUpdater.compareAndSet(this, null, timeoutFuture)) {
                timeoutFuture.cancel(false);
            }
        }
    }
}

