/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.remoting.rpc;

import java.text.NumberFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.config.Configuration;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.remoting.ReplicationQueue;
import org.infinispan.remoting.RpcException;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.ResponseFilter;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.StateTransferException;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.rhq.helpers.pluginAnnotations.agent.DataType;
import org.rhq.helpers.pluginAnnotations.agent.DisplayType;
import org.rhq.helpers.pluginAnnotations.agent.MeasurementType;
import org.rhq.helpers.pluginAnnotations.agent.Metric;
import org.rhq.helpers.pluginAnnotations.agent.Operation;
import org.rhq.helpers.pluginAnnotations.agent.Parameter;
import org.rhq.helpers.pluginAnnotations.agent.Units;

@MBean(objectName="RpcManager", description="Manages all remote calls to remote cache instances in the cluster.")
public class RpcManagerImpl
implements RpcManager {
    private static final Log log = LogFactory.getLog(RpcManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private Transport t;
    private final AtomicLong replicationCount = new AtomicLong(0L);
    private final AtomicLong replicationFailures = new AtomicLong(0L);
    private final AtomicLong totalReplicationTime = new AtomicLong(0L);
    @ManagedAttribute(description="Enables or disables the gathering of statistics by this component", writable=true)
    boolean statisticsEnabled = false;
    private volatile Address currentStateTransferSource;
    private boolean stateTransferEnabled;
    private Configuration configuration;
    private ReplicationQueue replicationQueue;
    private ExecutorService asyncExecutor;
    private CommandsFactory cf;

    @Inject
    public void injectDependencies(Transport t, Configuration configuration, ReplicationQueue replicationQueue, CommandsFactory cf, @ComponentName(value="org.infinispan.executors.transport") ExecutorService e) {
        this.t = t;
        this.configuration = configuration;
        this.replicationQueue = replicationQueue;
        this.asyncExecutor = e;
        this.cf = cf;
    }

    @Start(priority=9)
    private void start() {
        this.stateTransferEnabled = this.configuration.isStateTransferEnabled();
        this.statisticsEnabled = this.configuration.isExposeJmxStatistics();
    }

    private boolean useReplicationQueue(boolean sync) {
        return !sync && this.replicationQueue != null && this.replicationQueue.isEnabled();
    }

    @Override
    public final Map<Address, Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) {
        List<Address> members = this.t.getMembers();
        if (members.size() < 2) {
            if (log.isDebugEnabled()) {
                log.debug("We're the only member in the cluster; Don't invoke remotely.");
            }
            return Collections.emptyMap();
        }
        long startTime = 0L;
        if (this.statisticsEnabled) {
            startTime = System.currentTimeMillis();
        }
        try {
            Map<Address, Response> result = this.t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter, this.stateTransferEnabled);
            if (this.isStatisticsEnabled()) {
                this.replicationCount.incrementAndGet();
            }
            Map<Address, Response> map = result;
            return map;
        }
        catch (CacheException e) {
            if (log.isTraceEnabled()) {
                log.trace((Object)"replication exception: ", e);
            }
            if (this.isStatisticsEnabled()) {
                this.replicationFailures.incrementAndGet();
            }
            throw e;
        }
        catch (Throwable th) {
            log.error((Object)"unexpected error while replicating", th);
            if (this.isStatisticsEnabled()) {
                this.replicationFailures.incrementAndGet();
            }
            throw new CacheException(th);
        }
        finally {
            if (this.statisticsEnabled) {
                long timeTaken = System.currentTimeMillis() - startTime;
                this.totalReplicationTime.getAndAdd(timeTaken);
            }
        }
    }

    @Override
    public final Map<Address, Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) {
        return this.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null);
    }

    @Override
    public final Map<Address, Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout) throws Exception {
        return this.invokeRemotely(recipients, rpcCommand, mode, timeout, false, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void retrieveState(String cacheName, long timeout) throws StateTransferException {
        if (this.t.isSupportStateTransfer()) {
            boolean success;
            block20: {
                long initialWaitTime = this.configuration.getStateRetrievalInitialRetryWaitTime();
                int waitTimeIncreaseFactor = this.configuration.getStateRetrievalRetryWaitTimeIncreaseFactor();
                int numRetries = this.configuration.getStateRetrievalNumRetries();
                List<Address> members = this.t.getMembers();
                if (members.size() < 2) {
                    if (log.isDebugEnabled()) {
                        log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
                    }
                    return;
                }
                success = false;
                try {
                    long wait = initialWaitTime;
                    for (int i = 0; i < numRetries; ++i) {
                        for (Address member : members) {
                            if (member.equals(this.t.getAddress())) continue;
                            try {
                                if (log.isInfoEnabled()) {
                                    log.info((Object)"Trying to fetch state from %s", member);
                                }
                                this.currentStateTransferSource = member;
                                if (!this.t.retrieveState(cacheName, member, timeout)) continue;
                                if (log.isInfoEnabled()) {
                                    log.info((Object)"Successfully retrieved and applied state from %s", member);
                                }
                                success = true;
                                break block20;
                            }
                            catch (StateTransferException e) {
                                if (!log.isDebugEnabled()) continue;
                                log.debug((Object)("Error while fetching state from member " + member), e);
                            }
                            finally {
                                this.currentStateTransferSource = null;
                            }
                        }
                        if (success) continue;
                        if (log.isWarnEnabled()) {
                            log.warn("Could not find available peer for state, backing off and retrying");
                        }
                        try {
                            Thread.sleep(wait *= (long)waitTimeIncreaseFactor);
                            continue;
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
                finally {
                    this.currentStateTransferSource = null;
                }
            }
            if (!success) {
                throw new StateTransferException("Unable to fetch state on startup");
            }
        } else {
            throw new StateTransferException("Transport does not, or is not configured to, support state transfer.  Please disable fetching state on startup, or reconfigure your transport.");
        }
    }

    @Override
    public final void broadcastRpcCommand(ReplicableCommand rpc, boolean sync) throws RpcException {
        this.broadcastRpcCommand(rpc, sync, false);
    }

    @Override
    public final void broadcastRpcCommand(ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws RpcException {
        if (this.useReplicationQueue(sync)) {
            this.replicationQueue.add(rpc);
        } else {
            this.invokeRemotely(null, rpc, sync, usePriorityQueue);
        }
    }

    @Override
    public final void broadcastRpcCommandInFuture(ReplicableCommand rpc, NotifyingNotifiableFuture<Object> l) {
        this.broadcastRpcCommandInFuture(rpc, false, l);
    }

    @Override
    public final void broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> l) {
        this.invokeRemotelyInFuture(null, rpc, usePriorityQueue, l);
    }

    @Override
    public final void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync) throws RpcException {
        this.invokeRemotely(recipients, rpc, sync, false);
    }

    @Override
    public final Map<Address, Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws RpcException {
        return this.invokeRemotely(recipients, rpc, sync, usePriorityQueue, this.configuration.getSyncReplTimeout());
    }

    public final Map<Address, Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue, long timeout) throws RpcException {
        if (trace) {
            log.trace((Object)"%s broadcasting call %s to recipient list %s", this.t.getAddress(), rpc, recipients);
        }
        if (this.useReplicationQueue(sync)) {
            this.replicationQueue.add(rpc);
            return null;
        }
        if (!(rpc instanceof CacheRpcCommand)) {
            rpc = this.cf.buildSingleRpcCommand(rpc);
        }
        Map<Address, Response> rsps = this.invokeRemotely(recipients, rpc, this.getResponseMode(sync), timeout, usePriorityQueue);
        if (trace) {
            log.trace((Object)"Response(s) to %s is %s", rpc, rsps);
        }
        if (sync) {
            this.checkResponses(rsps);
        }
        return rsps;
    }

    @Override
    public final void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCommand rpc, NotifyingNotifiableFuture<Object> l) {
        this.invokeRemotelyInFuture(recipients, rpc, false, l);
    }

    @Override
    public final void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> l) {
        this.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, l, this.configuration.getSyncReplTimeout());
    }

    @Override
    public final void invokeRemotelyInFuture(final Collection<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> l, final long timeout) {
        if (trace) {
            log.trace((Object)"%s invoking in future call %s to recipient list %s", this.t.getAddress(), rpc, recipients);
        }
        if (trace) {
            log.trace((Object)"%s invoking in future call %s to recipient list %s", this.t.getAddress(), rpc, recipients);
        }
        final CountDownLatch futureSet = new CountDownLatch(1);
        Callable<Object> c = new Callable<Object>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Object call() throws Exception {
                Map<Address, Response> result = null;
                try {
                    result = RpcManagerImpl.this.invokeRemotely((Collection<Address>)recipients, rpc, true, usePriorityQueue, timeout);
                }
                finally {
                    try {
                        futureSet.await();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    finally {
                        l.notifyDone();
                    }
                }
                return result;
            }
        };
        l.setNetworkFuture(this.asyncExecutor.submit(c));
        futureSet.countDown();
    }

    @Override
    public Transport getTransport() {
        return this.t;
    }

    @Override
    public Address getCurrentStateTransferSource() {
        return this.currentStateTransferSource;
    }

    private ResponseMode getResponseMode(boolean sync) {
        return sync ? ResponseMode.SYNCHRONOUS : ResponseMode.getAsyncResponseMode(this.configuration);
    }

    private void checkResponses(Map<Address, Response> rsps) {
        if (rsps != null) {
            for (Map.Entry<Address, Response> rsp : rsps.entrySet()) {
                if (rsp == null || !(rsp.getValue() instanceof Throwable)) continue;
                Throwable throwable = (Throwable)((Object)rsp.getValue());
                if (trace) {
                    log.trace((Object)"Received Throwable from remote node %s", throwable, rsp.getKey());
                }
                throw new RpcException(throwable);
            }
        }
    }

    @ManagedOperation(description="Resets statistics gathered by this component")
    @Operation(displayName="Reset statistics")
    public void resetStatistics() {
        this.replicationCount.set(0L);
        this.replicationFailures.set(0L);
        this.totalReplicationTime.set(0L);
    }

    @ManagedAttribute(description="Number of successful replications")
    @Metric(displayName="Number of successful replications", measurementType=MeasurementType.TRENDSUP, displayType=DisplayType.SUMMARY)
    public long getReplicationCount() {
        if (!this.isStatisticsEnabled()) {
            return -1L;
        }
        return this.replicationCount.get();
    }

    @ManagedAttribute(description="Number of failed replications")
    @Metric(displayName="Number of failed replications", measurementType=MeasurementType.TRENDSUP, displayType=DisplayType.SUMMARY)
    public long getReplicationFailures() {
        if (!this.isStatisticsEnabled()) {
            return -1L;
        }
        return this.replicationFailures.get();
    }

    @Metric(displayName="Statistics enabled", dataType=DataType.TRAIT)
    public boolean isStatisticsEnabled() {
        return this.statisticsEnabled;
    }

    @Operation(displayName="Enable/disable statistics")
    public void setStatisticsEnabled(@Parameter(name="enabled", description="Whether statistics should be enabled or disabled (true/false)") boolean statisticsEnabled) {
        this.statisticsEnabled = statisticsEnabled;
    }

    @ManagedAttribute(description="Successful replications as a ratio of total replications")
    public String getSuccessRatio() {
        if (this.replicationCount.get() == 0L || !this.statisticsEnabled) {
            return "N/A";
        }
        double ration = this.calculateSuccessRatio() * 100.0;
        return NumberFormat.getInstance().format(ration) + "%";
    }

    @ManagedAttribute(description="Successful replications as a ratio of total replications in numeric double format")
    @Metric(displayName="Successful replication ratio", units=Units.PERCENTAGE, displayType=DisplayType.SUMMARY)
    public double getSuccessRatioFloatingPoint() {
        if (this.replicationCount.get() == 0L || !this.statisticsEnabled) {
            return 0.0;
        }
        return this.calculateSuccessRatio();
    }

    private double calculateSuccessRatio() {
        double totalCount = this.replicationCount.get() + this.replicationFailures.get();
        return (double)this.replicationCount.get() / totalCount;
    }

    @ManagedAttribute(description="The average time spent in the transport layer, in milliseconds")
    @Metric(displayName="Average time spent in the transport layer", units=Units.MILLISECONDS, displayType=DisplayType.SUMMARY)
    public long getAverageReplicationTime() {
        if (this.replicationCount.get() == 0L) {
            return 0L;
        }
        return this.totalReplicationTime.get() / this.replicationCount.get();
    }

    public void setTransport(Transport t) {
        this.t = t;
    }

    @Override
    public Address getAddress() {
        return this.t != null ? this.t.getAddress() : null;
    }
}

