/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.remoting.transport.jgroups;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.infinispan.CacheException;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.config.GlobalConfiguration;
import org.infinispan.config.parsing.XmlConfigHelper;
import org.infinispan.marshall.Marshaller;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.remoting.InboundInvocationHandler;
import org.infinispan.remoting.ReplicationException;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.ResponseFilter;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.DistributedSync;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.jgroups.CommandAwareRpcDispatcher;
import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
import org.infinispan.remoting.transport.jgroups.JGroupsDistSync;
import org.infinispan.remoting.transport.jgroups.JGroupsResponseFilterAdapter;
import org.infinispan.remoting.transport.jgroups.MarshallerAdapter;
import org.infinispan.remoting.transport.jgroups.StateTransferMonitor;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.statetransfer.StateTransferException;
import org.infinispan.util.FileLookup;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
import org.jgroups.ExtendedMembershipListener;
import org.jgroups.ExtendedMessageListener;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.RspFilter;
import org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class JGroupsTransport
implements Transport,
ExtendedMembershipListener,
ExtendedMessageListener {
    public static final String CONFIGURATION_STRING = "configurationString";
    public static final String CONFIGURATION_XML = "configurationXml";
    public static final String CONFIGURATION_FILE = "configurationFile";
    private static final String DEFAULT_JGROUPS_CONFIGURATION_FILE = "flush-udp.xml";
    Channel channel;
    Address address;
    volatile List<Address> members = Collections.emptyList();
    volatile boolean coordinator = false;
    final Object membersListLock = new Object();
    CommandAwareRpcDispatcher dispatcher;
    static final Log log = LogFactory.getLog(JGroupsTransport.class);
    static final boolean trace = log.isTraceEnabled();
    GlobalConfiguration c;
    Properties props;
    InboundInvocationHandler inboundInvocationHandler;
    Marshaller marshaller;
    ExecutorService asyncExecutor;
    CacheManagerNotifier notifier;
    final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new ConcurrentHashMap<String, StateTransferMonitor>();
    private final JGroupsDistSync flushTracker = new JGroupsDistSync();
    long distributedSyncTimeout;

    @Override
    public void initialize(GlobalConfiguration c, Marshaller marshaller, ExecutorService asyncExecutor, InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier) {
        this.c = c;
        this.marshaller = marshaller;
        this.asyncExecutor = asyncExecutor;
        this.inboundInvocationHandler = inboundInvocationHandler;
        this.notifier = notifier;
    }

    @Override
    public void start() {
        this.props = this.c.getTransportProperties();
        this.distributedSyncTimeout = this.c.getDistributedSyncTimeout();
        log.info("Starting JGroups Channel");
        this.initChannelAndRPCDispatcher();
        try {
            this.channel.connect(this.c.getClusterName());
        }
        catch (ChannelException e) {
            throw new CacheException("Unable to start JGroups Channel", e);
        }
        log.info((Object)"Cache local address is {0}", this.getAddress());
        if (!this.channel.flushSupported()) {
            log.warn("FLUSH is not present in your JGroups stack!  FLUSH is needed to ensure messages are not dropped while new nodes join the cluster.  Will proceed, but inconsistencies may arise!");
        }
    }

    @Override
    public int getViewId() {
        return (int)this.channel.getView().getVid().getId();
    }

    @Override
    public void stop() {
        try {
            if (this.channel != null && this.channel.isOpen()) {
                log.info("Disconnecting and closing JGroups Channel");
                this.channel.disconnect();
                this.channel.close();
            }
        }
        catch (Exception toLog) {
            log.error((Object)"Problem closing channel; setting it to null", toLog);
        }
        this.channel = null;
        if (this.dispatcher != null) {
            log.info("Stopping the RpcDispatcher");
            this.dispatcher.stop();
        }
        if (this.members != null) {
            this.members = null;
        }
        this.coordinator = false;
        this.dispatcher = null;
    }

    private void initChannelAndRPCDispatcher() throws CacheException {
        this.buildChannel();
        this.channel.setOpt(3, (Object)false);
        this.channel.setOpt(5, (Object)true);
        this.channel.setOpt(6, (Object)false);
        this.channel.setOpt(0, (Object)true);
        this.dispatcher = new CommandAwareRpcDispatcher(this.channel, this, this.asyncExecutor, this.inboundInvocationHandler, this.flushTracker, this.distributedSyncTimeout);
        MarshallerAdapter adapter = new MarshallerAdapter(this.marshaller);
        this.dispatcher.setRequestMarshaller((RpcDispatcher.Marshaller)adapter);
        this.dispatcher.setResponseMarshaller((RpcDispatcher.Marshaller)adapter);
    }

    private void buildChannel() {
        if (this.props != null) {
            String cfg;
            if (this.props.containsKey(CONFIGURATION_FILE)) {
                cfg = this.props.getProperty(CONFIGURATION_FILE);
                try {
                    this.channel = new JChannel(new FileLookup().lookupFileLocation(cfg));
                }
                catch (Exception e) {
                    log.error("Error while trying to create a channel using config files: " + cfg);
                    throw new CacheException(e);
                }
            }
            if (this.props.containsKey(CONFIGURATION_XML)) {
                cfg = this.props.getProperty(CONFIGURATION_XML);
                try {
                    this.channel = new JChannel(XmlConfigHelper.stringToElement(cfg));
                }
                catch (Exception e) {
                    log.error("Error while trying to create a channel using config XML: " + cfg);
                    throw new CacheException(e);
                }
            }
            if (this.props.containsKey(CONFIGURATION_STRING)) {
                cfg = this.props.getProperty(CONFIGURATION_STRING);
                try {
                    this.channel = new JChannel(cfg);
                }
                catch (Exception e) {
                    log.error("Error while trying to create a channel using config string: " + cfg);
                    throw new CacheException(e);
                }
            }
        }
        if (this.channel == null) {
            log.info((Object)"Unable to use any JGroups configuration mechanisms provided in properties {0}.  Using default JGroups configuration!", this.props);
            try {
                this.channel = new JChannel(new FileLookup().lookupFileLocation(DEFAULT_JGROUPS_CONFIGURATION_FILE));
            }
            catch (ChannelException e) {
                throw new CacheException("Unable to start JGroups channel", e);
            }
        }
    }

    @Override
    public boolean isCoordinator() {
        return this.coordinator;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Address getCoordinator() {
        if (this.channel == null) {
            return null;
        }
        Object object = this.membersListLock;
        synchronized (object) {
            while (this.members == null || this.members.isEmpty()) {
                log.debug("Waiting on view being accepted");
                try {
                    this.membersListLock.wait();
                }
                catch (InterruptedException e) {
                    log.error((Object)"getCoordinator(): Interrupted while waiting for members to be set", e);
                    break;
                }
            }
            return this.members == null || this.members.isEmpty() ? null : this.members.get(0);
        }
    }

    @Override
    public List<Address> getMembers() {
        return this.members;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean retrieveState(String cacheName, Address address, long timeout) throws StateTransferException {
        boolean cleanup = false;
        try {
            StateTransferMonitor mon = new StateTransferMonitor();
            if (this.stateTransfersInProgress.putIfAbsent(cacheName, mon) != null) {
                throw new StateTransferException("There already appears to be a state transfer in progress for the cache named " + cacheName);
            }
            cleanup = true;
            ((JChannel)this.channel).getState(this.toJGroupsAddress(address), cacheName, timeout, false);
            mon.waitForState();
            boolean bl = mon.getSetStateException() == null;
            return bl;
        }
        catch (StateTransferException ste) {
            throw ste;
        }
        catch (Exception e) {
            if (log.isInfoEnabled()) {
                log.info((Object)("Unable to retrieve state from member " + address), e);
            }
            boolean bl = false;
            return bl;
        }
        finally {
            if (cleanup) {
                this.stateTransfersInProgress.remove(cacheName);
            }
        }
    }

    @Override
    public DistributedSync getDistributedSync() {
        return this.flushTracker;
    }

    @Override
    public boolean isSupportStateTransfer() {
        ProtocolStack stack;
        if (this.channel != null && (stack = this.channel.getProtocolStack()) != null) {
            if (stack.findProtocol(STREAMING_STATE_TRANSFER.class) == null) {
                log.error("Channel does not contain STREAMING_STATE_TRANSFER.  Cannot support state transfers!");
                return false;
            }
        } else {
            log.warn("Channel not set up properly!");
            return false;
        }
        return true;
    }

    @Override
    public Address getAddress() {
        if (this.address == null && this.channel != null) {
            this.address = new JGroupsAddress(this.channel.getAddress());
        }
        return this.address;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean supportReplay) throws Exception {
        if (recipients != null && recipients.isEmpty()) {
            log.trace("Destination list is empty: no need to send message");
            return Collections.emptyList();
        }
        log.trace((Object)"dests={0}, command={1}, mode={2}, timeout={3}", new Object[]{recipients, rpcCommand, mode, timeout});
        this.flushTracker.acquireProcessingLock(false, this.distributedSyncTimeout, TimeUnit.MILLISECONDS);
        boolean unlock = true;
        this.flushTracker.blockUntilReleased(this.distributedSyncTimeout, TimeUnit.MILLISECONDS);
        boolean asyncMarshalling = mode == ResponseMode.ASYNCHRONOUS;
        try {
            RspList rsps = this.dispatcher.invokeRemoteCommands(this.toJGroupsAddressVector(recipients), rpcCommand, this.toJGroupsMode(mode), timeout, false, usePriorityQueue, this.toJGroupsFilter(responseFilter), supportReplay, asyncMarshalling);
            if (mode.isAsynchronous()) {
                List<Response> list = Collections.emptyList();
                return list;
            }
            if (trace) {
                log.trace((Object)"Cache [{0}]: responses for command {1}:\n{2}", this.getAddress(), rpcCommand.getClass().getSimpleName(), rsps);
            }
            if (rsps == null) {
                List<Response> list = Collections.emptyList();
                return list;
            }
            ArrayList<Response> retval = new ArrayList<Response>(rsps.size());
            boolean noValidResponses = true;
            for (Rsp rsp : rsps.values()) {
                Exception e;
                if (rsp.wasSuspected() || !rsp.wasReceived()) {
                    if (rsp.wasSuspected()) {
                        throw new SuspectException("Suspected member: " + rsp.getSender());
                    }
                    if (responseFilter != null) continue;
                    throw new TimeoutException("Replication timeout for " + rsp.getSender());
                }
                noValidResponses = false;
                if (rsp.getValue() == null) continue;
                Response value = (Response)rsp.getValue();
                if (value instanceof ExceptionResponse && !((e = ((ExceptionResponse)value).getException()) instanceof ReplicationException)) {
                    if (trace) {
                        log.trace((Object)"Recieved exception '{0}' from {1}", e, rsp.getSender());
                    }
                    throw e;
                }
                retval.add(value);
            }
            if (noValidResponses) {
                throw new TimeoutException("Timed out waiting for valid responses!");
            }
            ArrayList<Response> arrayList = retval;
            return arrayList;
        }
        finally {
            if (unlock) {
                this.flushTracker.releaseProcessingLock();
            }
        }
    }

    private int toJGroupsMode(ResponseMode mode) {
        switch (mode) {
            case ASYNCHRONOUS: 
            case ASYNCHRONOUS_WITH_SYNC_MARSHALLING: {
                return 6;
            }
            case SYNCHRONOUS: {
                return 2;
            }
            case WAIT_FOR_VALID_RESPONSE: {
                return 3;
            }
        }
        throw new CacheException("Unknown response mode " + (Object)((Object)mode));
    }

    private RspFilter toJGroupsFilter(ResponseFilter responseFilter) {
        return responseFilter == null ? null : new JGroupsResponseFilterAdapter(responseFilter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void viewAccepted(View newView) {
        Vector newMembers = newView.getMembers();
        if (log.isInfoEnabled()) {
            log.info((Object)"Received new cluster view: {0}", newView);
        }
        Object object = this.membersListLock;
        synchronized (object) {
            boolean needNotification = false;
            if (newMembers != null) {
                this.members = this.fromJGroupsAddressList(newMembers);
                needNotification = true;
            }
            boolean bl = this.coordinator = this.members != null && this.members.size() != 0 && this.members.get(0).equals(this.getAddress());
            if (needNotification && this.notifier != null) {
                this.notifier.notifyViewChange(this.members, this.getAddress(), (int)newView.getVid().getId());
            }
            this.membersListLock.notifyAll();
        }
    }

    public void suspect(org.jgroups.Address suspected_mbr) {
    }

    public void block() {
        this.flushTracker.signalJoinInProgress();
    }

    public void unblock() {
        this.flushTracker.signalJoinCompleted();
    }

    public void receive(Message msg) {
    }

    public byte[] getState() {
        throw new UnsupportedOperationException("Retrieving state for the entire cache system is not supported!");
    }

    public void setState(byte[] state) {
        throw new UnsupportedOperationException("Setting state for the entire cache system is not supported!");
    }

    public byte[] getState(String state_id) {
        throw new UnsupportedOperationException("Non-stream-based state retrieval is not supported!  Make sure you use the JGroups STREAMING_STATE_TRANSFER protocol!");
    }

    public void setState(String state_id, byte[] state) {
        throw new UnsupportedOperationException("Non-stream-based state retrieval is not supported!  Make sure you use the JGroups STREAMING_STATE_TRANSFER protocol!");
    }

    public void getState(OutputStream ostream) {
        throw new UnsupportedOperationException("Retrieving state for the entire cache system is not supported!");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void getState(String cacheName, OutputStream ostream) {
        if (trace) {
            log.trace((Object)"Received request to generate state for cache named '{0}'.  Attempting to generate state.", cacheName);
        }
        try {
            this.inboundInvocationHandler.generateState(cacheName, ostream);
        }
        catch (StateTransferException e) {
            log.error((Object)"Caught while responding to state transfer request", e);
        }
        finally {
            Util.flushAndCloseStream(ostream);
        }
    }

    public void setState(InputStream istream) {
        throw new UnsupportedOperationException("Setting state for the entire cache system is not supported!");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setState(String cacheName, InputStream istream) {
        StateTransferMonitor mon = null;
        try {
            if (trace) {
                log.trace((Object)"Received state for cache named '{0}'.  Attempting to apply state.", cacheName);
            }
            mon = (StateTransferMonitor)this.stateTransfersInProgress.get(cacheName);
            this.inboundInvocationHandler.applyState(cacheName, istream);
            mon.notifyStateReceiptSucceeded();
        }
        catch (Exception e) {
            mon.notifyStateReceiptFailed(e instanceof StateTransferException ? (StateTransferException)e : new StateTransferException(e));
            log.error((Object)"Caught while requesting or applying state", e);
        }
        finally {
            Util.closeStream(istream);
        }
    }

    private Vector<org.jgroups.Address> toJGroupsAddressVector(List<Address> list) {
        if (list == null) {
            return null;
        }
        if (list.isEmpty()) {
            return new Vector<org.jgroups.Address>();
        }
        Vector<org.jgroups.Address> retval = new Vector<org.jgroups.Address>(list.size());
        for (Address a : list) {
            JGroupsAddress ja = (JGroupsAddress)a;
            retval.add(ja.address);
        }
        return retval;
    }

    private org.jgroups.Address toJGroupsAddress(Address a) {
        return ((JGroupsAddress)a).address;
    }

    private List<Address> fromJGroupsAddressList(List<org.jgroups.Address> list) {
        if (list == null || list.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<Address> retval = new ArrayList<Address>(list.size());
        for (org.jgroups.Address a : list) {
            retval.add(new JGroupsAddress(a));
        }
        return retval;
    }
}

