/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.context.impl.RemoteTxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.io.UnclosableObjectInputStream;
import org.infinispan.io.UnclosableObjectOutputStream;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.marshall.Marshaller;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.DistributedSync;
import org.infinispan.statetransfer.StateTransferException;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.transaction.TransactionLog;
import org.infinispan.transaction.xa.RemoteTransaction;
import org.infinispan.transaction.xa.TransactionTable;
import org.infinispan.util.Util;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class StateTransferManagerImpl
implements StateTransferManager {
    RpcManager rpcManager;
    AdvancedCache cache;
    Configuration configuration;
    DataContainer dataContainer;
    CacheLoaderManager clm;
    CacheStore cs;
    Marshaller marshaller;
    TransactionLog transactionLog;
    InvocationContextContainer invocationContextContainer;
    InterceptorChain interceptorChain;
    CommandsFactory commandsFactory;
    TransactionTable txTable;
    private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final Byte DELIMITER = 123;
    boolean transientState;
    boolean persistentState;
    volatile boolean needToUnblockRPC = false;
    volatile Address stateSender;

    @Inject
    public void injectDependencies(RpcManager rpcManager, AdvancedCache cache, Configuration configuration, DataContainer dataContainer, CacheLoaderManager clm, Marshaller marshaller, TransactionLog transactionLog, InterceptorChain interceptorChain, InvocationContextContainer invocationContextContainer, CommandsFactory commandsFactory, TransactionTable txTable) {
        this.rpcManager = rpcManager;
        this.cache = cache;
        this.configuration = configuration;
        this.dataContainer = dataContainer;
        this.clm = clm;
        this.marshaller = marshaller;
        this.transactionLog = transactionLog;
        this.invocationContextContainer = invocationContextContainer;
        this.interceptorChain = interceptorChain;
        this.commandsFactory = commandsFactory;
        this.txTable = txTable;
    }

    @Start(priority=55)
    public void start() throws StateTransferException {
        log.trace((Object)"Data container is {0}", System.identityHashCode(this.dataContainer));
        this.cs = this.clm == null ? null : this.clm.getCacheStore();
        this.transientState = this.configuration.isFetchInMemoryState();
        this.persistentState = this.cs != null && this.clm.isEnabled() && this.clm.isFetchPersistentState() && !this.clm.isShared();
        long startTime = 0L;
        if (log.isDebugEnabled()) {
            log.debug("Initiating state transfer process");
            startTime = System.currentTimeMillis();
        }
        this.rpcManager.retrieveState(this.cache.getName(), this.configuration.getStateRetrievalTimeout());
        if (log.isDebugEnabled()) {
            long duration = System.currentTimeMillis() - startTime;
            log.debug((Object)"State transfer process completed in {0}", Util.prettyPrintTime(duration));
        }
    }

    @Start(priority=1000)
    public void releaseRPCBlock() throws Exception {
        if (this.needToUnblockRPC) {
            if (trace) {
                log.trace("Stopping RPC block");
            }
            this.mimicPartialFlushViaRPC(this.stateSender, false);
        }
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void generateState(OutputStream out) throws StateTransferException {
        ObjectOutput oo = null;
        boolean txLogActivated = false;
        try {
            boolean canProvideState;
            boolean bl = canProvideState = (this.transientState || this.persistentState) && (txLogActivated = this.transactionLog.activate());
            if (log.isDebugEnabled()) {
                log.debug((Object)"Generating state.  Can provide? {0}", canProvideState);
            }
            oo = this.marshaller.startObjectOutput(out);
            this.marshaller.objectToObjectStream(canProvideState, oo);
            if (canProvideState) {
                this.delimit(oo);
                if (this.transientState) {
                    this.generateInMemoryState(oo);
                }
                this.delimit(oo);
                if (this.persistentState) {
                    this.generatePersistentState(oo);
                }
                this.delimit(oo);
                this.generateTransactionLog(oo);
                if (log.isDebugEnabled()) {
                    log.debug("State generated, closing object stream");
                }
            } else if (log.isDebugEnabled()) {
                log.debug("Not providing state!");
            }
            this.marshaller.finishObjectOutput(oo);
            if (!txLogActivated) return;
            this.transactionLog.deactivate();
            return;
        }
        catch (StateTransferException ste) {
            try {
                throw ste;
                catch (Exception e) {
                    throw new StateTransferException(e);
                }
            }
            catch (Throwable throwable) {
                this.marshaller.finishObjectOutput(oo);
                if (!txLogActivated) throw throwable;
                this.transactionLog.deactivate();
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void generateTransactionLog(ObjectOutput oo) throws Exception {
        int maxNonProgressingLogWrites = 100;
        int flushTimeout = 60000;
        DistributedSync distributedSync = this.rpcManager.getTransport().getDistributedSync();
        try {
            if (trace) {
                log.trace((Object)"Transaction log size is {0}", this.transactionLog.size());
            }
            int nonProgress = 0;
            int size = this.transactionLog.size();
            while (size > 0) {
                if (trace) {
                    log.trace("Tx Log remaining entries = " + size);
                }
                this.transactionLog.writeCommitLog(this.marshaller, oo);
                int newSize = this.transactionLog.size();
                if (newSize >= size && ++nonProgress >= maxNonProgressingLogWrites) break;
                size = newSize;
            }
            distributedSync.acquireProcessingLock(true, this.configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS);
            this.delimit(oo);
            oo.flush();
            if (trace) {
                log.trace("Waiting for a distributed sync block");
            }
            distributedSync.blockUntilAcquired(flushTimeout, TimeUnit.MILLISECONDS);
            if (trace) {
                log.trace("Distributed sync block received, proceeding with writing commit log");
            }
            this.transactionLog.writeCommitLog(this.marshaller, oo);
            this.delimit(oo);
            this.transactionLog.writePendingPrepares(this.marshaller, oo);
            this.delimit(oo);
            oo.flush();
        }
        finally {
            distributedSync.releaseProcessingLock();
        }
    }

    private void processCommitLog(ObjectInput oi) throws Exception {
        if (trace) {
            log.trace("Applying commit log");
        }
        Object object = this.marshaller.objectFromObjectStream(oi);
        while (object instanceof TransactionLog.LogEntry) {
            TransactionLog.LogEntry logEntry = (TransactionLog.LogEntry)object;
            InvocationContext ctx = this.invocationContextContainer.createRemoteInvocationContext();
            Object[] mods = logEntry.getModifications();
            if (trace) {
                log.trace((Object)"Mods = {0}", Arrays.toString(mods));
            }
            for (Object mod : mods) {
                this.commandsFactory.initializeReplicableCommand((ReplicableCommand)mod);
                ctx.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_STATUS_CHECK);
                this.interceptorChain.invoke(ctx, (VisitableCommand)mod);
            }
            object = this.marshaller.objectFromObjectStream(oi);
        }
        this.assertDelimited(object);
        if (trace) {
            log.trace("Finished applying commit log");
        }
    }

    private void applyTransactionLog(ObjectInput oi) throws Exception {
        if (trace) {
            log.trace("Integrating transaction log");
        }
        this.processCommitLog(oi);
        this.stateSender = this.rpcManager.getCurrentStateTransferSource();
        this.mimicPartialFlushViaRPC(this.stateSender, true);
        this.needToUnblockRPC = true;
        try {
            if (trace) {
                log.trace("Retrieving/Applying post-flush commits");
            }
            this.processCommitLog(oi);
            if (trace) {
                log.trace("Retrieving/Applying pending prepares");
            }
            Object object = this.marshaller.objectFromObjectStream(oi);
            while (object instanceof PrepareCommand) {
                PrepareCommand command = (PrepareCommand)object;
                if (!this.transactionLog.hasPendingPrepare(command)) {
                    if (trace) {
                        log.trace((Object)"Applying pending prepare {0}", command);
                    }
                    this.commandsFactory.initializeReplicableCommand(command);
                    RemoteTxInvocationContext ctx = this.invocationContextContainer.createRemoteTxInvocationContext();
                    RemoteTransaction transaction = this.txTable.createRemoteTransaction(command.getGlobalTransaction(), command.getModifications());
                    ctx.setRemoteTransaction(transaction);
                    ctx.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_STATUS_CHECK);
                    this.interceptorChain.invoke(ctx, command);
                } else if (trace) {
                    log.trace((Object)"Prepare {0} not in tx log; not applying", command);
                }
                object = this.marshaller.objectFromObjectStream(oi);
            }
            this.assertDelimited(object);
        }
        catch (Exception e) {
            if (trace) {
                log.trace("Stopping RPC block");
            }
            this.mimicPartialFlushViaRPC(this.stateSender, false);
            this.needToUnblockRPC = false;
            throw e;
        }
    }

    private void mimicPartialFlushViaRPC(Address addressToFlush, boolean block) throws Exception {
        StateTransferControlCommand cmd = this.commandsFactory.buildStateTransferControlCommand(block);
        if (!block) {
            this.rpcManager.getTransport().getDistributedSync().releaseSync();
        }
        this.rpcManager.invokeRemotely(Collections.singletonList(addressToFlush), cmd, ResponseMode.SYNCHRONOUS, this.configuration.getStateRetrievalTimeout(), true);
        if (block) {
            this.rpcManager.getTransport().getDistributedSync().acquireSync();
        }
    }

    public void applyState(InputStream in) throws StateTransferException {
        block12: {
            if (log.isDebugEnabled()) {
                log.debug("Applying state");
            }
            ObjectInput oi = null;
            try {
                oi = this.marshaller.startObjectInput(in);
                boolean canProvideState = (Boolean)this.marshaller.objectFromObjectStream(oi);
                if (canProvideState) {
                    this.assertDelimited(oi);
                    if (this.transientState) {
                        this.applyInMemoryState(oi);
                    }
                    this.assertDelimited(oi);
                    if (this.persistentState) {
                        this.applyPersistentState(oi);
                    }
                    this.assertDelimited(oi);
                    this.applyTransactionLog(oi);
                    if (log.isDebugEnabled()) {
                        log.debug("State applied, closing object stream");
                    }
                    break block12;
                }
                String msg = "Provider cannot provide state!";
                if (log.isDebugEnabled()) {
                    log.debug(msg);
                }
                throw new StateTransferException(msg);
            }
            catch (StateTransferException ste) {
                throw ste;
            }
            catch (Exception e) {
                throw new StateTransferException(e);
            }
            finally {
                this.marshaller.finishObjectInput(oi);
            }
        }
    }

    private void applyInMemoryState(ObjectInput i) throws StateTransferException {
        this.dataContainer.clear();
        try {
            Set set = (Set)this.marshaller.objectFromObjectStream(i);
            for (InternalCacheEntry se : set) {
                this.cache.put(se.getKey(), se.getValue(), se.getLifespan(), TimeUnit.MILLISECONDS, se.getMaxIdle(), TimeUnit.MILLISECONDS, Flag.CACHE_MODE_LOCAL);
            }
        }
        catch (Exception e) {
            this.dataContainer.clear();
            throw new StateTransferException(e);
        }
    }

    private void generateInMemoryState(ObjectOutput oo) throws StateTransferException {
        try {
            HashSet<InternalCacheEntry> entries = new HashSet<InternalCacheEntry>();
            for (InternalCacheEntry e : this.dataContainer) {
                if (e.isExpired()) continue;
                entries.add(e);
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)"Writing {0} StoredEntries to stream", entries.size());
            }
            this.marshaller.objectToObjectStream(entries, oo);
        }
        catch (Exception e) {
            throw new StateTransferException(e);
        }
    }

    private void applyPersistentState(ObjectInput i) throws StateTransferException {
        try {
            this.cs.fromStream(new UnclosableObjectInputStream(i));
        }
        catch (CacheLoaderException cle) {
            throw new StateTransferException(cle);
        }
    }

    private void generatePersistentState(ObjectOutput oo) throws StateTransferException {
        try {
            this.cs.toStream(new UnclosableObjectOutputStream(oo));
        }
        catch (CacheLoaderException cle) {
            throw new StateTransferException(cle);
        }
    }

    private void delimit(ObjectOutput oo) throws IOException {
        this.marshaller.objectToObjectStream(DELIMITER, oo);
    }

    private void assertDelimited(ObjectInput i) throws StateTransferException {
        Object o;
        try {
            o = this.marshaller.objectFromObjectStream(i);
        }
        catch (Exception e) {
            throw new StateTransferException(e);
        }
        this.assertDelimited(o);
    }

    private void assertDelimited(Object o) throws StateTransferException {
        if (o instanceof Exception) {
            throw new StateTransferException((Exception)o);
        }
        if (!DELIMITER.equals(o)) {
            throw new StateTransferException("Expected a delimiter, recieved " + o);
        }
    }
}

