/*
 * Decompiled with CFR 0.152.
 */
package com.tc.net.groups;

import com.tc.async.api.EventHandler;
import com.tc.async.api.Sink;
import com.tc.async.api.Stage;
import com.tc.async.api.StageManager;
import com.tc.config.GroupConfiguration;
import com.tc.config.ServerConfigurationManager;
import com.tc.l2.L2DebugLogging;
import com.tc.l2.ha.WeightGeneratorFactory;
import com.tc.l2.msg.L2StateMessage;
import com.tc.net.CommStackMismatchException;
import com.tc.net.MaxConnectionsExceededException;
import com.tc.net.NodeID;
import com.tc.net.ServerID;
import com.tc.net.core.BufferManagerFactory;
import com.tc.net.core.ClearTextBufferManagerFactory;
import com.tc.net.core.ProductID;
import com.tc.net.core.TCConnectionManager;
import com.tc.net.core.TCConnectionManagerImpl;
import com.tc.net.groups.AbstractGroupMessage;
import com.tc.net.groups.DefaultZapNodeRequestProcessor;
import com.tc.net.groups.DiscoveryStateMachine;
import com.tc.net.groups.GroupEventsListener;
import com.tc.net.groups.GroupException;
import com.tc.net.groups.GroupManager;
import com.tc.net.groups.GroupMessage;
import com.tc.net.groups.GroupMessageListener;
import com.tc.net.groups.GroupResponse;
import com.tc.net.groups.GroupZapNodeMessage;
import com.tc.net.groups.GroupZapNodeMessageFactory;
import com.tc.net.groups.MessageID;
import com.tc.net.groups.Node;
import com.tc.net.groups.RouteGroupMessagesToSink;
import com.tc.net.groups.TCGroupHandshakeMessage;
import com.tc.net.groups.TCGroupMember;
import com.tc.net.groups.TCGroupMemberDiscovery;
import com.tc.net.groups.TCGroupMemberDiscoveryStatic;
import com.tc.net.groups.TCGroupMemberImpl;
import com.tc.net.groups.TCGroupMessageWrapper;
import com.tc.net.groups.ZapNodeRequestProcessor;
import com.tc.net.protocol.NetworkStackHarnessFactory;
import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
import com.tc.net.protocol.tcm.ChannelEvent;
import com.tc.net.protocol.tcm.ChannelEventListener;
import com.tc.net.protocol.tcm.ChannelEventType;
import com.tc.net.protocol.tcm.ChannelManagerEventListener;
import com.tc.net.protocol.tcm.ClientMessageChannel;
import com.tc.net.protocol.tcm.CommunicationsManager;
import com.tc.net.protocol.tcm.CommunicationsManagerImpl;
import com.tc.net.protocol.tcm.MessageChannel;
import com.tc.net.protocol.tcm.MessageMonitor;
import com.tc.net.protocol.tcm.NetworkListener;
import com.tc.net.protocol.tcm.NullMessageMonitor;
import com.tc.net.protocol.tcm.TCAction;
import com.tc.net.protocol.tcm.TCMessageHydrateSink;
import com.tc.net.protocol.tcm.TCMessageRouter;
import com.tc.net.protocol.tcm.TCMessageRouterImpl;
import com.tc.net.protocol.tcm.TCMessageSink;
import com.tc.net.protocol.tcm.TCMessageType;
import com.tc.net.protocol.transport.ConnectionIDFactory;
import com.tc.net.protocol.transport.ConnectionPolicy;
import com.tc.net.protocol.transport.DefaultConnectionIdFactory;
import com.tc.net.protocol.transport.HealthCheckerConfig;
import com.tc.net.protocol.transport.HealthCheckerConfigImpl;
import com.tc.net.protocol.transport.NullConnectionPolicy;
import com.tc.net.protocol.transport.TransportHandshakeErrorHandler;
import com.tc.net.protocol.transport.TransportHandshakeErrorHandlerForGroupComm;
import com.tc.net.utils.L2Utils;
import com.tc.objectserver.core.impl.GuardianContext;
import com.tc.objectserver.handler.ReceiveGroupMessageHandler;
import com.tc.objectserver.handler.TCGroupHandshakeMessageHandler;
import com.tc.objectserver.handler.TCGroupMemberDiscoveryHandler;
import com.tc.objectserver.impl.TopologyListener;
import com.tc.objectserver.impl.TopologyManager;
import com.tc.properties.TCProperties;
import com.tc.properties.TCPropertiesImpl;
import com.tc.spi.Guardian;
import com.tc.util.Assert;
import com.tc.util.TCTimeoutException;
import com.tc.util.UUID;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.configuration.ServerConfiguration;
import org.terracotta.server.ServerEnv;

public class TCGroupManagerImpl
implements GroupManager<AbstractGroupMessage>,
ChannelManagerEventListener,
TopologyListener {
    private static final Logger logger = LoggerFactory.getLogger(TCGroupManagerImpl.class);
    public static final String HANDSHAKE_STATE_MACHINE_TAG = "TcGroupCommHandshake";
    private final int serverCount;
    private final String version;
    private final ServerID thisNodeID;
    private final int groupPort;
    private final ConnectionPolicy connectionPolicy;
    private final CopyOnWriteArrayList<GroupEventsListener> groupListeners = new CopyOnWriteArrayList();
    private final Map<String, GroupMessageListener<? extends GroupMessage>> messageListeners = new ConcurrentHashMap<String, GroupMessageListener<? extends GroupMessage>>();
    private final Map<MessageID, GroupResponseImpl> pendingRequests = new ConcurrentHashMap<MessageID, GroupResponseImpl>();
    private final AtomicBoolean isStopped = new AtomicBoolean(false);
    private final ConcurrentHashMap<ServerID, TCGroupMember> members = new ConcurrentHashMap();
    private final Timer handshakeTimer = new Timer(ServerEnv.getServer().getIdentifier() + " - TC Group Manager Handshake timer", true);
    private final Set<NodeID> zappedSet = Collections.synchronizedSet(new HashSet());
    private final StageManager stageManager;
    private final AtomicBoolean alreadyJoined = new AtomicBoolean(false);
    private final WeightGeneratorFactory weightGeneratorFactory;
    private final BufferManagerFactory bufferManagerFactory;
    private final TopologyManager topologyManager;
    private CommunicationsManager communicationsManager;
    private TCConnectionManager connectionManager;
    private NetworkListener groupListener;
    private TCGroupMemberDiscovery discover;
    private ZapNodeRequestProcessor zapNodeRequestProcessor = new DefaultZapNodeRequestProcessor(logger);
    private Stage<TCGroupMessageWrapper> receiveGroupMessageStage;
    private Stage<TCGroupHandshakeMessage> handshakeMessageStage;
    private Stage<DiscoveryStateMachine> discoveryStage;

    public TCGroupManagerImpl(ServerConfigurationManager configSetupManager, StageManager stageManager, ServerID thisNodeID, Node thisNode, WeightGeneratorFactory weightGenerator, BufferManagerFactory bufferManagerFactory, TopologyManager topologyManager) {
        this(configSetupManager, (ConnectionPolicy)new NullConnectionPolicy(), stageManager, thisNodeID, thisNode, weightGenerator, bufferManagerFactory, topologyManager);
    }

    public TCGroupManagerImpl(ServerConfigurationManager configSetupManager, ConnectionPolicy connectionPolicy, StageManager stageManager, ServerID thisNodeID, Node thisNode, WeightGeneratorFactory weightGenerator, BufferManagerFactory bufferManagerFactory, TopologyManager topologyManager) {
        this.connectionPolicy = connectionPolicy;
        this.stageManager = stageManager;
        this.thisNodeID = thisNodeID;
        this.bufferManagerFactory = bufferManagerFactory;
        this.topologyManager = topologyManager;
        this.version = configSetupManager.getProductInfo().version();
        ServerConfiguration l2DSOConfig = configSetupManager.getServerConfiguration();
        this.serverCount = configSetupManager.allCurrentlyKnownServers().length;
        this.groupPort = l2DSOConfig.getGroupPort().getPort();
        this.weightGeneratorFactory = weightGenerator;
        int groupConnectPort = TCPropertiesImpl.getProperties().getInt("l2.nha.tcgroupcomm.l2proxytoport", this.groupPort);
        InetSocketAddress socketAddress = new InetSocketAddress(l2DSOConfig.getGroupPort().getHostString(), groupConnectPort);
        this.init(socketAddress);
        Assert.assertNotNull((Object)thisNodeID);
        this.setDiscover(new TCGroupMemberDiscoveryStatic(this, thisNode));
        this.topologyManager.addListener(this);
    }

    protected final String getVersion() {
        return this.version;
    }

    @Override
    public boolean isNodeConnected(NodeID sid) {
        TCGroupMember m = this.members.get((ServerID)sid);
        return m != null && m.getChannel().isOpen();
    }

    public TCGroupManagerImpl(ConnectionPolicy connectionPolicy, String hostname, int port, int groupPort, StageManager stageManager, WeightGeneratorFactory weightGenerator, TopologyManager topologyManager) {
        this.connectionPolicy = connectionPolicy;
        this.stageManager = stageManager;
        this.bufferManagerFactory = new ClearTextBufferManagerFactory();
        this.topologyManager = topologyManager;
        this.groupPort = groupPort;
        this.version = this.getVersion();
        this.weightGeneratorFactory = weightGenerator;
        this.serverCount = 0;
        this.thisNodeID = new ServerID(new Node(hostname, port).getServerNodeName(), UUID.getUUID().toString().getBytes());
        this.init(new InetSocketAddress("0.0.0.0", groupPort));
    }

    private void init(InetSocketAddress socketAddress) {
        TCProperties tcProperties = TCPropertiesImpl.getProperties();
        this.createTCGroupManagerStages();
        NetworkStackHarnessFactory networkStackHarnessFactory = this.getNetworkStackHarnessFactory();
        TCMessageRouterImpl messageRouter = new TCMessageRouterImpl();
        this.initMessageRouter((TCMessageRouter)messageRouter);
        HashMap<TCMessageType, Class<? extends TCAction>> messageTypeClassMapping = new HashMap<TCMessageType, Class<? extends TCAction>>();
        this.initMessageTypeClassMapping(messageTypeClassMapping);
        HealthCheckerConfigImpl hcconfig = new HealthCheckerConfigImpl(tcProperties.getPropertiesFor("l2.healthcheck.l2"), ServerEnv.getServer().getIdentifier() + " - TCGroupManager");
        this.connectionManager = new TCConnectionManagerImpl(ServerEnv.getServer().getIdentifier() + " - " + "L2_L2", this.serverCount <= 1 ? 0 : this.serverCount, this.bufferManagerFactory);
        this.communicationsManager = new CommunicationsManagerImpl((MessageMonitor)new NullMessageMonitor(), (TCMessageRouter)messageRouter, networkStackHarnessFactory, this.connectionManager, this.connectionPolicy, (HealthCheckerConfig)hcconfig, this.thisNodeID, (TransportHandshakeErrorHandler)new TransportHandshakeErrorHandlerForGroupComm(), messageTypeClassMapping, Collections.emptyMap(), this.bufferManagerFactory);
        this.groupListener = this.communicationsManager.createListener(socketAddress, c -> true, (ConnectionIDFactory)new DefaultConnectionIdFactory(), t -> true);
        this.groupListener.getChannelManager().addEventListener((ChannelManagerEventListener)this);
        this.registerForMessages(GroupZapNodeMessage.class, new ZapNodeRequestRouter());
    }

    private NetworkStackHarnessFactory getNetworkStackHarnessFactory() {
        return new PlainNetworkStackHarnessFactory();
    }

    private void createTCGroupManagerStages() {
        int maxStageSize = 5000;
        this.receiveGroupMessageStage = this.stageManager.createStage("receive_group_message_stage", TCGroupMessageWrapper.class, (EventHandler)new ReceiveGroupMessageHandler(this), 1, maxStageSize);
        this.handshakeMessageStage = this.stageManager.createStage("group_handshake_message_stage", TCGroupHandshakeMessage.class, (EventHandler)new TCGroupHandshakeMessageHandler(this), 1, maxStageSize);
        this.discoveryStage = this.stageManager.createStage("group_discovery_stage", DiscoveryStateMachine.class, (EventHandler)new TCGroupMemberDiscoveryHandler(this), 1, maxStageSize, false, false);
    }

    private Map<TCMessageType, Class<? extends TCAction>> initMessageTypeClassMapping(Map<TCMessageType, Class<? extends TCAction>> messageTypeClassMapping) {
        messageTypeClassMapping.put(TCMessageType.GROUP_HANDSHAKE_MESSAGE, TCGroupHandshakeMessage.class);
        messageTypeClassMapping.put(TCMessageType.GROUP_WRAPPER_MESSAGE, TCGroupMessageWrapper.class);
        return messageTypeClassMapping;
    }

    private void initMessageRouter(TCMessageRouter messageRouter) {
        messageRouter.routeMessageType(TCMessageType.GROUP_WRAPPER_MESSAGE, (TCMessageSink)new TCMessageHydrateSink(this.receiveGroupMessageStage.getSink()));
        messageRouter.routeMessageType(TCMessageType.GROUP_HANDSHAKE_MESSAGE, (TCMessageSink)new TCMessageHydrateSink(this.handshakeMessageStage.getSink()));
    }

    protected Sink<DiscoveryStateMachine> getDiscoveryHandlerSink() {
        return this.discoveryStage.getSink();
    }

    private void handshake(MessageChannel channel) {
        this.getOrCreateHandshakeStateMachine(channel);
    }

    public void receivedHandshake(TCGroupHandshakeMessage msg) {
        if (TCGroupManagerImpl.isDebugLogging()) {
            TCGroupManagerImpl.debugInfo("Received group handshake message from " + msg.getChannel());
        }
        MessageChannel channel = msg.getChannel();
        Assert.assertNotNull((Object)channel);
        TCGroupHandshakeStateMachine stateMachine = this.getOrCreateHandshakeStateMachine(channel);
        stateMachine.execute(msg);
    }

    @Override
    public ServerID getLocalNodeID() {
        return this.getNodeID();
    }

    private ServerID getNodeID() {
        return this.thisNodeID;
    }

    private void membersClear() {
        this.members.clear();
    }

    private boolean membersAdd(TCGroupMember member) {
        ServerID nodeID = member.getPeerNodeID();
        TCGroupMember old = this.members.putIfAbsent(nodeID, member);
        return old == null;
    }

    private void membersRemove(TCGroupMember member) {
        ServerID nodeID = member.getPeerNodeID();
        this.members.remove(nodeID);
    }

    private void removeIfMemberReconnecting(ServerID newNodeID) {
        this.members.entrySet().stream().filter(e -> ((ServerID)e.getKey()).getName().equals(newNodeID.getName())).findFirst().ifPresent(e -> {
            MessageChannel channel;
            TCGroupMember oldMember = (TCGroupMember)e.getValue();
            if (oldMember.getPeerNodeID() != newNodeID && !(channel = oldMember.getChannel()).isConnected()) {
                this.closeMember(oldMember);
                logger.warn("Removed old member " + oldMember + " for " + newNodeID);
            }
        });
    }

    @Override
    public void shutdown() {
        try {
            this.stop(1000L);
        }
        catch (TCTimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public void stop(long timeout) throws TCTimeoutException {
        this.isStopped.set(true);
        this.discover.stop(timeout);
        for (ServerID sid : this.members.keySet()) {
            this.closeMember(sid);
        }
        this.groupListener.stop(timeout);
        this.communicationsManager.shutdown();
        this.connectionManager.shutdown();
        this.handshakeTimer.cancel();
        for (TCGroupMember m : this.members.values()) {
            this.notifyAnyPendingRequests(m);
        }
        this.membersClear();
    }

    @Override
    public TCConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    @Override
    public boolean isStopped() {
        return this.isStopped.get();
    }

    @Override
    public void registerForGroupEvents(GroupEventsListener listener) {
        this.groupListeners.add(listener);
    }

    private void fireNodeEvent(TCGroupMember member, boolean joined) {
        ServerID newNode = member.getPeerNodeID();
        member.setReady(joined);
        if (TCGroupManagerImpl.isDebugLogging()) {
            TCGroupManagerImpl.debugInfo("fireNodeEvent: joined = " + joined + ", node = " + newNode + ", channel: " + member.getChannel());
        }
        for (GroupEventsListener listener : this.groupListeners) {
            if (joined) {
                listener.nodeJoined((NodeID)newNode);
                continue;
            }
            listener.nodeLeft((NodeID)newNode);
        }
    }

    private boolean tryAddMember(TCGroupMember member) {
        if (!GuardianContext.validate(Guardian.Op.CONNECT_SERVER, "add:" + member.getPeerNodeID(), member.getChannel())) {
            return false;
        }
        if (this.isStopped.get()) {
            return false;
        }
        boolean added = this.membersAdd(member);
        if (added) {
            member.setTCGroupManager(this);
            return true;
        }
        return false;
    }

    @Override
    public NodeID join(GroupConfiguration groupConfiguration) throws GroupException {
        if (!this.alreadyJoined.compareAndSet(false, true)) {
            throw new GroupException("Already Joined");
        }
        if (TCGroupManagerImpl.isDebugLogging()) {
            TCGroupManagerImpl.debugInfo("Starting discover... thisNode: " + groupConfiguration.getCurrentNode() + ", otherNodes: " + groupConfiguration.getNodes());
        }
        this.discover.setupNodes(groupConfiguration.getCurrentNode(), groupConfiguration.getNodes());
        this.discover.start();
        try {
            this.groupListener.start(new HashSet());
        }
        catch (IOException e) {
            throw new GroupException(e);
        }
        return this.getNodeID();
    }

    @Override
    public void closeMember(ServerID serverID) {
        TCGroupMember member = this.getMember((NodeID)serverID);
        if (member != null) {
            logger.info("Closing down member for " + serverID + " - " + member);
            this.closeMember(member);
        } else {
            logger.warn("Closing down member for " + serverID + " - member doesn't exist.");
        }
    }

    private void closeMember(TCGroupMember member) {
        Assert.assertNotNull((Object)member);
        if (TCGroupManagerImpl.isDebugLogging()) {
            TCGroupManagerImpl.debugInfo("Closing member: " + member);
        }
        if (this.isStopped.get()) {
            this.shutdownMember(member);
            return;
        }
        member.setTCGroupManager(null);
        TCGroupMember m = this.members.get(member.getPeerNodeID());
        if (m != null && m.getChannel() == member.getChannel()) {
            this.membersRemove(member);
            if (member.isJoinedEventFired()) {
                this.fireNodeEvent(member, false);
            }
            this.zappedSet.remove(member.getPeerNodeID());
            member.setJoinedEventFired(false);
            this.notifyAnyPendingRequests(member);
        }
        this.shutdownMember(member);
        if (TCGroupManagerImpl.isDebugLogging()) {
            TCGroupManagerImpl.debugInfo(this.getNodeID() + " removed " + member);
        }
    }

    private void shutdownMember(TCGroupMember member) {
        member.setReady(false);
        member.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyAnyPendingRequests(TCGroupMember member) {
        Map<MessageID, GroupResponseImpl> map = this.pendingRequests;
        synchronized (map) {
            for (GroupResponseImpl response : this.pendingRequests.values()) {
                response.notifyMemberDead(member);
            }
        }
    }

    @Override
    public void sendAll(AbstractGroupMessage msg) {
        this.sendAll(msg, (Set<? extends NodeID>)this.members.keySet());
    }

    @Override
    public void sendAll(AbstractGroupMessage msg, Set<? extends NodeID> nodeIDs) {
        boolean debug = msg instanceof L2StateMessage;
        for (TCGroupMember m : this.members.values()) {
            if (!nodeIDs.contains(m.getPeerNodeID())) {
                if (!debug || !TCGroupManagerImpl.isDebugLogging()) continue;
                TCGroupManagerImpl.debugInfo("Not sending msg to " + m.getPeerNodeID() + ", " + msg + ", channel: " + m.getChannel());
                continue;
            }
            if (m.isReady()) {
                if (debug && TCGroupManagerImpl.isDebugLogging()) {
                    TCGroupManagerImpl.debugInfo("Sending msg to " + m.getPeerNodeID() + ", " + msg + ", channel: " + m.getChannel());
                }
                m.sendIgnoreNotReady(msg);
                continue;
            }
            logger.warn("Ignored sending msg to a not ready member=" + m + ", msg=" + msg);
        }
    }

    @Override
    public void sendTo(NodeID node, AbstractGroupMessage msg) throws GroupException {
        Runnable sentCallback = null;
        this.internalSendTo(node, msg, sentCallback);
    }

    @Override
    public void sendTo(Set<String> nodes, AbstractGroupMessage msg) {
        this.sendAll(msg, this.members.keySet().stream().filter(id -> nodes.contains(id.getName())).collect(Collectors.toSet()));
    }

    @Override
    public void sendToWithSentCallback(NodeID node, AbstractGroupMessage msg, Runnable sentCallback) throws GroupException {
        this.internalSendTo(node, msg, sentCallback);
    }

    private void internalSendTo(NodeID node, AbstractGroupMessage msg, Runnable sentCallback) throws GroupException {
        TCGroupMember member = this.getMember(node);
        if (member != null && member.isReady()) {
            if (msg instanceof L2StateMessage && TCGroupManagerImpl.isDebugLogging()) {
                TCGroupManagerImpl.debugInfo("Sending msg to " + node + ", msg: " + msg + ", channel: " + member.getChannel());
            }
        } else {
            if (member != null) {
                this.closeMember(member);
            }
            throw new GroupException("Send to " + (member == null ? "non-exist" : "not ready") + " member of " + node);
        }
        member.send(msg, sentCallback);
    }

    @Override
    public AbstractGroupMessage sendToAndWaitForResponse(NodeID nodeID, AbstractGroupMessage msg) throws GroupException {
        if (TCGroupManagerImpl.isDebugLogging()) {
            TCGroupManagerImpl.debugInfo("Sending to " + nodeID + " and Waiting for Response : " + msg.getMessageID());
        }
        GroupResponseImpl groupResponse = new GroupResponseImpl();
        MessageID msgID = msg.getMessageID();
        TCGroupMember m = this.getMember(nodeID);
        if (m == null || !m.isReady()) {
            String errorMsg = "Node " + nodeID + " not present in the group. Ignoring Message : " + msg;
            logger.error(errorMsg);
            if (m != null) {
                this.closeMember(m);
            }
            throw new GroupException(errorMsg);
        }
        GroupResponse old = this.pendingRequests.put(msgID, groupResponse);
        Assert.assertNull((Object)old);
        groupResponse.sendTo(m, msg);
        this.pendingRequests.remove(msgID);
        return groupResponse.getResponse(nodeID);
    }

    @Override
    public GroupResponse<AbstractGroupMessage> sendToAndWaitForResponse(Set<String> nodes, AbstractGroupMessage msg) throws GroupException {
        return this.sendAllAndWaitForResponse(msg, this.members.keySet().stream().filter(id -> nodes.contains(id.getName())).collect(Collectors.toSet()));
    }

    @Override
    public GroupResponse<AbstractGroupMessage> sendAllAndWaitForResponse(AbstractGroupMessage msg) throws GroupException {
        return this.sendAllAndWaitForResponse(msg, (Set<? extends NodeID>)this.members.keySet());
    }

    @Override
    public GroupResponse<AbstractGroupMessage> sendAllAndWaitForResponse(AbstractGroupMessage msg, Set<? extends NodeID> nodeIDs) throws GroupException {
        if (TCGroupManagerImpl.isDebugLogging()) {
            TCGroupManagerImpl.debugInfo("Sending to " + nodeIDs + " and Waiting for Response : " + msg.getMessageID());
        }
        GroupResponseImpl groupResponse = new GroupResponseImpl();
        MessageID msgID = msg.getMessageID();
        GroupResponse old = this.pendingRequests.put(msgID, groupResponse);
        Assert.assertNull((Object)old);
        groupResponse.sendAll(msg, nodeIDs);
        this.pendingRequests.remove(msgID);
        if (TCGroupManagerImpl.isDebugLogging()) {
            TCGroupManagerImpl.debugInfo("Complete from " + nodeIDs + " : " + msg.getMessageID());
        }
        return groupResponse;
    }

    private void openChannel(InetSocketAddress serverAddress, ChannelEventListener listener) throws TCTimeoutException, MaxConnectionsExceededException, IOException, CommStackMismatchException {
        if (this.isStopped.get()) {
            return;
        }
        this.communicationsManager.addClassMapping(TCMessageType.GROUP_WRAPPER_MESSAGE, TCGroupMessageWrapper.class);
        this.communicationsManager.addClassMapping(TCMessageType.GROUP_HANDSHAKE_MESSAGE, TCGroupHandshakeMessage.class);
        ProductID product = ProductID.DISCOVERY;
        ClientMessageChannel channel = this.communicationsManager.createClientChannel(product, 2000);
        channel.addListener(listener);
        channel.open(serverAddress);
        this.handshake((MessageChannel)channel);
    }

    public void openChannel(String hostname, int port, ChannelEventListener listener) throws TCTimeoutException, MaxConnectionsExceededException, IOException, CommStackMismatchException {
        this.openChannel(InetSocketAddress.createUnresolved(hostname, port), listener);
    }

    public void channelCreated(MessageChannel aChannel) {
        if (this.isStopped.get()) {
            aChannel.close();
            return;
        }
        this.handshake(aChannel);
    }

    public void channelRemoved(MessageChannel channel) {
        TCGroupHandshakeStateMachine stateMachine = this.getHandshakeStateMachine(channel);
        if (stateMachine != null) {
            stateMachine.disconnected();
        }
    }

    private TCGroupMember getMember(MessageChannel channel) {
        ServerID sid;
        TCGroupHandshakeStateMachine stateMachine = this.getHandshakeStateMachine(channel);
        if (stateMachine != null && (sid = stateMachine.getPeerNodeID()) != null) {
            return this.getMember((NodeID)sid);
        }
        return this.members.values().stream().filter(m -> m.getChannel() == channel).findFirst().orElse(null);
    }

    private TCGroupMember getMember(NodeID nodeID) {
        return this.members.get((ServerID)nodeID);
    }

    public Collection<TCGroupMember> getMembers() {
        return Collections.unmodifiableCollection(this.members.values());
    }

    public final void setDiscover(TCGroupMemberDiscovery discover) {
        this.discover = discover;
    }

    public TCGroupMemberDiscovery getDiscover() {
        return this.discover;
    }

    private Timer getHandshakeTimer() {
        return this.handshakeTimer;
    }

    int size() {
        return this.members.size();
    }

    public void messageReceived(AbstractGroupMessage message, MessageChannel channel) {
        if (this.isStopped()) {
            channel.close();
            return;
        }
        TCGroupMember m = this.getMember(channel);
        if (channel.isClosed()) {
            logger.warn(this.getNodeID() + " recd msg " + message.getMessageID() + " From closed " + channel + " Msg : " + message);
            return;
        }
        while (m == null) {
            TCGroupHandshakeStateMachine stateMachine = this.getHandshakeStateMachine(channel);
            String errInfo = "Received message for non-exist member from " + channel.getRemoteAddress() + " to " + channel.getLocalAddress() + "; " + stateMachine + "; msg: " + message;
            if (stateMachine != null) {
                if (stateMachine.isFailureState()) {
                    logger.warn(errInfo);
                    return;
                }
                m = this.getMember(channel);
                continue;
            }
            if (this.isStopped()) {
                return;
            }
            throw new RuntimeException(errInfo);
        }
        ServerID from = m.getPeerNodeID();
        MessageID requestID = message.inResponseTo();
        message.setMessageOrginator(from);
        if (requestID.isNull() || !this.notifyPendingRequests(requestID, message, from)) {
            this.fireMessageReceivedEvent(from, (GroupMessage)message);
        }
    }

    private boolean notifyPendingRequests(MessageID requestID, AbstractGroupMessage gmsg, ServerID nodeID) {
        GroupResponseImpl response = this.pendingRequests.get(requestID);
        if (response != null) {
            response.addResponseFrom(nodeID, gmsg);
            return true;
        }
        return false;
    }

    private static void validateExternalizableClass(Class<? extends GroupMessage> msgClass) {
        String name = msgClass.getName();
        try {
            Constructor<? extends GroupMessage> cons = msgClass.getDeclaredConstructor(new Class[0]);
            if ((cons.getModifiers() & 1) == 0) {
                throw new AssertionError((Object)(name + " : public no arg constructor not found"));
            }
        }
        catch (NoSuchMethodException ex) {
            throw new AssertionError((Object)(name + " : public no arg constructor not found"));
        }
    }

    @Override
    public <N extends AbstractGroupMessage> void registerForMessages(Class<? extends N> msgClass, GroupMessageListener<N> listener) {
        TCGroupManagerImpl.validateExternalizableClass(msgClass);
        GroupMessageListener<N> prev = this.messageListeners.put(msgClass.getName(), listener);
        if (prev != null) {
            logger.warn("Previous listener removed : " + prev);
        }
    }

    @Override
    public <N extends AbstractGroupMessage> void routeMessages(Class<? extends N> msgClass, Sink<N> sink) {
        this.registerForMessages(msgClass, new RouteGroupMessagesToSink<N>(msgClass.getName(), sink));
    }

    private void fireMessageReceivedEvent(ServerID from, GroupMessage msg) {
        GroupMessageListener<? extends GroupMessage> listener = this.messageListeners.get(msg.getClass().getName());
        if (listener == null) {
            String errorMsg = "No Route for " + msg + " from " + from;
            errorMsg = errorMsg + " " + msg.getClass().getName() + " " + this.messageListeners.keySet();
            logger.error(errorMsg);
            throw new AssertionError((Object)errorMsg);
        }
        listener.messageReceived((NodeID)from, (GroupMessage)msg);
    }

    @Override
    public void setZapNodeRequestProcessor(ZapNodeRequestProcessor processor) {
        this.zapNodeRequestProcessor = processor;
    }

    @Override
    public void zapNode(NodeID nodeID, int type, String reason) {
        this.zappedSet.add(nodeID);
        TCGroupMember m = this.getMember(nodeID);
        if (m == null) {
            logger.warn("Ignoring Zap node request since Member is null");
        } else if (!this.zapNodeRequestProcessor.acceptOutgoingZapNodeRequest(nodeID, type, reason)) {
            logger.warn("Ignoring Zap node request since " + this.zapNodeRequestProcessor + " asked us to : " + nodeID + " type = " + type + " reason = " + reason);
        } else {
            long[] weights = this.zapNodeRequestProcessor.getCurrentNodeWeights();
            logger.warn("Zapping node : " + nodeID + " type = " + type + " reason = " + reason + " my weight = " + Arrays.toString(weights));
            AbstractGroupMessage msg = GroupZapNodeMessageFactory.createGroupZapNodeMessage(type, reason, weights);
            try {
                this.sendTo(nodeID, msg);
            }
            catch (GroupException e) {
                logger.error("Error sending ZapNode Request to " + nodeID + " msg = " + msg);
            }
        }
    }

    private boolean isZappedNode(NodeID nodeID) {
        return this.zappedSet.contains(nodeID);
    }

    public Map<String, ?> getStateMap() {
        LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
        map.put("className", this.getClass().getName());
        map.put("communications", this.communicationsManager.getStateMap());
        LinkedHashMap<String, TCGroupMember> memberReport = new LinkedHashMap<String, TCGroupMember>();
        map.put("members", memberReport);
        for (Map.Entry<ServerID, TCGroupMember> entry : this.members.entrySet()) {
            memberReport.put(entry.getKey().toString(), entry.getValue());
        }
        ArrayList zapped = new ArrayList(this.zappedSet.size());
        map.put("zapped", zapped);
        this.zappedSet.forEach(node -> zapped.add(node));
        return map;
    }

    @Override
    public void nodeAdded(String host, int port, int group) {
        this.discover.addNode(new Node(host, port, group));
    }

    @Override
    public void nodeRemoved(String host, int port, int group) {
        this.discover.removeNode(new Node(host, port, group));
    }

    private synchronized TCGroupHandshakeStateMachine getOrCreateHandshakeStateMachine(MessageChannel channel) {
        TCGroupHandshakeStateMachine stateMachine = (TCGroupHandshakeStateMachine)channel.getAttachment(HANDSHAKE_STATE_MACHINE_TAG);
        if (stateMachine == null) {
            if (TCGroupManagerImpl.isDebugLogging()) {
                TCGroupManagerImpl.debugInfo("Creating handshake state machine for channel: " + channel);
            }
            stateMachine = new TCGroupHandshakeStateMachine(this, channel, this.getNodeID(), this.weightGeneratorFactory, this.version);
            channel.addAttachment(HANDSHAKE_STATE_MACHINE_TAG, (Object)stateMachine, false);
            channel.addListener((ChannelEventListener)new HandshakeChannelEventListener(stateMachine));
            if (channel.isOpen()) {
                stateMachine.start();
            } else {
                stateMachine.disconnected();
            }
        }
        Assert.assertNotNull((Object)stateMachine);
        return stateMachine;
    }

    private synchronized TCGroupHandshakeStateMachine getHandshakeStateMachine(MessageChannel channel) {
        TCGroupHandshakeStateMachine stateMachine = (TCGroupHandshakeStateMachine)channel.getAttachment(HANDSHAKE_STATE_MACHINE_TAG);
        return stateMachine;
    }

    void addZappedNode(NodeID nodeID) {
        this.zappedSet.add(nodeID);
    }

    @Override
    public boolean isServerConnected(String nodeName) {
        return this.discover.isServerConnected(nodeName);
    }

    @Override
    public int getBufferCount() {
        return this.connectionManager.getBufferCount();
    }

    private static void debugInfo(String message) {
        L2DebugLogging.log(logger, L2DebugLogging.LogLevel.INFO, message, null);
    }

    private static boolean isDebugLogging() {
        return L2DebugLogging.isDebugLogging();
    }

    private static void debugWarn(String message) {
        L2DebugLogging.log(logger, L2DebugLogging.LogLevel.WARN, message, null);
    }

    private static class TCGroupHandshakeStateMachine {
        private final HandshakeState STATE_NEW = new HandshakeState("NEW");
        private final HandshakeState STATE_NODEID = new NodeIDState();
        private final HandshakeState STATE_TRY_ADD_MEMBER = new TryAddMemberState();
        private final HandshakeState STATE_ACK_OK = new AckOkState();
        private final HandshakeState STATE_SUCCESS = new SuccessState();
        private final HandshakeState STATE_FAILURE = new FailureState();
        private static final long HANDSHAKE_TIMEOUT = TCPropertiesImpl.getProperties().getLong("l2.nha.tcgroupcomm.handshake.timeout");
        private final TCGroupManagerImpl manager;
        private final MessageChannel channel;
        private final ServerID localNodeID;
        private final WeightGeneratorFactory weightGeneratorFactory;
        private final String version;
        private HandshakeMonitor current;
        private ServerID peerNodeID;
        private TimerTask timerTask;
        private TCGroupMember member;

        public TCGroupHandshakeStateMachine(TCGroupManagerImpl manager, MessageChannel channel, ServerID localNodeID, WeightGeneratorFactory weightGeneratorFactory, String version) {
            this.manager = manager;
            this.channel = channel;
            this.localNodeID = localNodeID;
            this.weightGeneratorFactory = weightGeneratorFactory;
            this.version = version;
            this.current = this.STATE_NEW.createMonitor();
            this.current.complete();
        }

        public final void start() {
            this.switchToState(this.initialState());
        }

        public synchronized boolean isFailureState() {
            return this.current.getState() == this.STATE_FAILURE;
        }

        public void execute(TCGroupHandshakeMessage msg) {
            if (TCGroupManagerImpl.isDebugLogging()) {
                TCGroupManagerImpl.debugInfo("[TCGroupHandshakeStateMachine]: Executing state machine, currentState=" + this.current + ", msg: " + msg + ", channel: " + this.channel);
            }
            this.getCurrentState().execute(msg);
        }

        private synchronized HandshakeState getCurrentState() {
            return this.current.getState();
        }

        protected HandshakeState initialState() {
            return this.STATE_NODEID;
        }

        private String stateInfo(HandshakeState state) {
            String info = " switching to state: " + state + " channel: " + this.channel;
            if (this.member != null) {
                return this.member.toString() + info;
            }
            if (this.peerNodeID == null) {
                return this.localNodeID.toString() + info;
            }
            return this.peerNodeID.toString() + " -> " + this.localNodeID.toString() + info;
        }

        public String toString() {
            return "TCGroupHandshakeStateMachine: " + this.stateInfo(this.current.getState());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void switchToState(HandshakeState state) {
            Assert.assertNotNull((Object)state);
            if (TCGroupManagerImpl.isDebugLogging()) {
                TCGroupManagerImpl.debugInfo("[TCGroupHandshakeStateMachine]: Attempting to switch state (" + this.current + "->" + state + "): " + this.stateInfo(state));
            }
            HandshakeMonitor previous = null;
            HandshakeMonitor next = state.createMonitor();
            TCGroupHandshakeStateMachine tCGroupHandshakeStateMachine = this;
            synchronized (tCGroupHandshakeStateMachine) {
                previous = this.current;
                if (this.current.getState() == this.STATE_FAILURE) {
                    if (TCGroupManagerImpl.isDebugLogging()) {
                        TCGroupManagerImpl.debugWarn("Ignored switching to " + state + " as current is " + this.current + ", " + this.stateInfo(state));
                    }
                    return;
                }
                this.current = next;
            }
            if (TCGroupManagerImpl.isDebugLogging()) {
                TCGroupManagerImpl.debugInfo("[TCGroupHandshakeStateMachine]: Entering state: " + state + ", for channel: " + this.channel);
            }
            previous.waitForCompletion();
            next.complete();
        }

        MessageChannel getChannel() {
            return this.channel;
        }

        ServerID getPeerNodeID() {
            return this.peerNodeID;
        }

        private synchronized void setTimerTask(long timeout) {
            TimerTask task;
            this.timerTask = task = new TimerTask(){

                @Override
                public void run() {
                    this.handshakeTimeout();
                }
            };
            Timer timer = this.manager.getHandshakeTimer();
            timer.purge();
            timer.schedule(task, timeout);
        }

        private synchronized void cancelTimerTask() {
            if (this.timerTask != null) {
                this.timerTask.cancel();
                this.timerTask = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void handshakeTimeout() {
            this.cancelTimerTask();
            TCGroupHandshakeStateMachine tCGroupHandshakeStateMachine = this;
            synchronized (tCGroupHandshakeStateMachine) {
                if (this.current.getState() == this.STATE_SUCCESS) {
                    if (TCGroupManagerImpl.isDebugLogging()) {
                        TCGroupManagerImpl.debugInfo("Handshake successed. Ignore timeout " + this.stateInfo(this.current.getState()));
                    }
                    return;
                }
                logger.warn("Group member handshake timeout. " + this.stateInfo(this.current.getState()));
            }
            this.switchToState(this.STATE_FAILURE);
            this.channel.close();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void disconnected() {
            TCGroupHandshakeStateMachine tCGroupHandshakeStateMachine = this;
            synchronized (tCGroupHandshakeStateMachine) {
                if (TCGroupManagerImpl.isDebugLogging()) {
                    TCGroupManagerImpl.debugWarn("[TCGroupHandshakeStateMachine]: Group member handshake disconnected. " + this.stateInfo(this.current.getState()) + ", for channel: " + this.channel);
                }
            }
            this.switchToState(this.STATE_FAILURE);
        }

        private class FailureState
        extends HandshakeState {
            public FailureState() {
                super("Failure");
            }

            @Override
            public void enter() {
                TCGroupHandshakeStateMachine.this.cancelTimerTask();
                if (TCGroupHandshakeStateMachine.this.member != null) {
                    TCGroupHandshakeStateMachine.this.member.abortMemberAdding();
                    TCGroupHandshakeStateMachine.this.manager.closeMember(TCGroupHandshakeStateMachine.this.member);
                } else {
                    TCGroupHandshakeStateMachine.this.channel.close();
                }
            }
        }

        private class SuccessState
        extends HandshakeState {
            public SuccessState() {
                super("Success");
            }

            @Override
            public void enter() {
                TCGroupHandshakeStateMachine.this.cancelTimerTask();
                TCGroupHandshakeStateMachine.this.manager.fireNodeEvent(TCGroupHandshakeStateMachine.this.member, true);
                TCGroupHandshakeStateMachine.this.member.setJoinedEventFired(true);
                if (TCGroupHandshakeStateMachine.this.manager.isZappedNode((NodeID)TCGroupHandshakeStateMachine.this.member.getPeerNodeID())) {
                    logger.info("Aborting previously zapped node " + TCGroupHandshakeStateMachine.this.member);
                    TCGroupHandshakeStateMachine.this.manager.zapNode((NodeID)TCGroupHandshakeStateMachine.this.member.getPeerNodeID(), 1, "Aborting the zapped node");
                }
            }
        }

        private class AckOkState
        extends HandshakeState {
            public AckOkState() {
                super("Ack-Ok");
            }

            @Override
            public void enter() {
                TCGroupHandshakeStateMachine.this.member.setReady(true);
                TCGroupHandshakeStateMachine.this.member.notifyMemberAdded();
                this.ackOk();
            }

            @Override
            public void execute(TCGroupHandshakeMessage msg) {
                if (msg.isAckMessage()) {
                    TCGroupHandshakeStateMachine.this.switchToState(TCGroupHandshakeStateMachine.this.STATE_SUCCESS);
                } else {
                    TCGroupHandshakeStateMachine.this.switchToState(TCGroupHandshakeStateMachine.this.STATE_FAILURE);
                }
            }

            private void ackOk() {
                TCGroupHandshakeMessage msg = (TCGroupHandshakeMessage)TCGroupHandshakeStateMachine.this.channel.createMessage(TCMessageType.GROUP_HANDSHAKE_MESSAGE);
                if (TCGroupManagerImpl.isDebugLogging()) {
                    TCGroupManagerImpl.debugInfo("Send ack message to " + TCGroupHandshakeStateMachine.this.member);
                }
                msg.initializeAck();
                msg.send();
            }
        }

        private class TryAddMemberState
        extends HandshakeState {
            public TryAddMemberState() {
                super("Try-Add-Member");
            }

            @Override
            public void enter() {
                this.createMember();
                if (TCGroupHandshakeStateMachine.this.member.isHighPriorityNode()) {
                    if (TCGroupManagerImpl.isDebugLogging()) {
                        TCGroupManagerImpl.debugInfo("Try-Add-Member: Adding high priority member: " + TCGroupHandshakeStateMachine.this.member);
                    }
                    TCGroupHandshakeStateMachine.this.member.memberAddingInProcess();
                    boolean isAdded = TCGroupHandshakeStateMachine.this.manager.tryAddMember(TCGroupHandshakeStateMachine.this.member);
                    if (!isAdded) {
                        TCGroupHandshakeStateMachine.this.member.abortMemberAdding();
                    }
                    this.signalToJoin(isAdded);
                } else if (TCGroupManagerImpl.isDebugLogging()) {
                    TCGroupManagerImpl.debugInfo("Try-Add-Member ignoring member as not high priority: " + TCGroupHandshakeStateMachine.this.member);
                }
            }

            @Override
            public void execute(TCGroupHandshakeMessage msg) {
                boolean isOkToJoin = msg.isOkMessage();
                if (!TCGroupHandshakeStateMachine.this.member.isHighPriorityNode()) {
                    if (TCGroupManagerImpl.isDebugLogging()) {
                        TCGroupManagerImpl.debugInfo("Try-Add-Member: Adding not-high priority member: " + TCGroupHandshakeStateMachine.this.member);
                    }
                    if (isOkToJoin) {
                        isOkToJoin = TCGroupHandshakeStateMachine.this.manager.tryAddMember(TCGroupHandshakeStateMachine.this.member);
                        if (isOkToJoin) {
                            TCGroupHandshakeStateMachine.this.member.memberAddingInProcess();
                        } else {
                            logger.warn("Unexpected bad handshake, abort connection.");
                        }
                    }
                    this.signalToJoin(isOkToJoin);
                } else if (TCGroupManagerImpl.isDebugLogging()) {
                    TCGroupManagerImpl.debugInfo("Try-Add-Member not adding member as its highPriority: " + TCGroupHandshakeStateMachine.this.member);
                }
                if (isOkToJoin) {
                    TCGroupHandshakeStateMachine.this.switchToState(TCGroupHandshakeStateMachine.this.STATE_ACK_OK);
                } else {
                    TCGroupHandshakeStateMachine.this.switchToState(TCGroupHandshakeStateMachine.this.STATE_FAILURE);
                }
            }

            private void createMember() {
                Assert.assertNotNull((Object)TCGroupHandshakeStateMachine.this.localNodeID);
                Assert.assertNotNull((Object)TCGroupHandshakeStateMachine.this.peerNodeID);
                TCGroupHandshakeStateMachine.this.member = new TCGroupMemberImpl(TCGroupHandshakeStateMachine.this.localNodeID, TCGroupHandshakeStateMachine.this.peerNodeID, TCGroupHandshakeStateMachine.this.channel);
            }

            private void signalToJoin(boolean ok) {
                Assert.assertNotNull((Object)TCGroupHandshakeStateMachine.this.member);
                TCGroupHandshakeMessage msg = (TCGroupHandshakeMessage)TCGroupHandshakeStateMachine.this.channel.createMessage(TCMessageType.GROUP_HANDSHAKE_MESSAGE);
                if (ok) {
                    if (TCGroupManagerImpl.isDebugLogging()) {
                        TCGroupManagerImpl.debugInfo("Send ok message to " + TCGroupHandshakeStateMachine.this.member);
                    }
                    msg.initializeOk();
                } else {
                    if (TCGroupManagerImpl.isDebugLogging()) {
                        TCGroupManagerImpl.debugInfo("Send deny message to " + TCGroupHandshakeStateMachine.this.member);
                    }
                    msg.initializeDeny();
                }
                msg.send();
            }
        }

        private class NodeIDState
        extends HandshakeState {
            public NodeIDState() {
                super("Read-Peer-NodeID");
            }

            @Override
            public void enter() {
                TCGroupHandshakeStateMachine.this.setTimerTask(HANDSHAKE_TIMEOUT);
                this.writeNodeIDMessage();
            }

            @Override
            public void execute(TCGroupHandshakeMessage msg) {
                this.setPeerNodeID(msg);
                if (!TCGroupHandshakeStateMachine.this.manager.getDiscover().isValidClusterNode((NodeID)TCGroupHandshakeStateMachine.this.peerNodeID)) {
                    logger.warn("Drop connection from non-member node " + TCGroupHandshakeStateMachine.this.peerNodeID);
                    TCGroupHandshakeStateMachine.this.switchToState(TCGroupHandshakeStateMachine.this.STATE_FAILURE);
                    return;
                }
                TCGroupHandshakeStateMachine.this.manager.removeIfMemberReconnecting(TCGroupHandshakeStateMachine.this.peerNodeID);
                TCGroupHandshakeStateMachine.this.switchToState(TCGroupHandshakeStateMachine.this.STATE_TRY_ADD_MEMBER);
            }

            void setPeerNodeID(TCGroupHandshakeMessage msg) {
                TCGroupHandshakeStateMachine.this.peerNodeID = msg.getNodeID();
            }

            void writeNodeIDMessage() {
                TCGroupHandshakeMessage msg = (TCGroupHandshakeMessage)TCGroupHandshakeStateMachine.this.channel.createMessage(TCMessageType.GROUP_HANDSHAKE_MESSAGE);
                msg.initializeNodeID(TCGroupHandshakeStateMachine.this.localNodeID, TCGroupHandshakeStateMachine.this.version, TCGroupHandshakeStateMachine.this.weightGeneratorFactory.generateWeightSequence());
                if (TCGroupManagerImpl.isDebugLogging()) {
                    TCGroupManagerImpl.debugInfo("Sending group nodeID message to " + TCGroupHandshakeStateMachine.this.channel);
                }
                msg.send();
            }

            boolean checkWeights(TCGroupHandshakeMessage msg) {
                long[] myWeights = TCGroupHandshakeStateMachine.this.weightGeneratorFactory.generateWeightSequence();
                for (int i = 0; i < myWeights.length; ++i) {
                    if (myWeights[i] > msg.getWeights()[i]) {
                        return true;
                    }
                    if (msg.getWeights()[i] <= myWeights[i]) continue;
                    return false;
                }
                return false;
            }
        }

        private static interface HandshakeMonitor {
            public HandshakeState getState();

            public void waitForCompletion();

            public void complete();
        }

        private class HandshakeState {
            private final String name;

            public HandshakeState(String name) {
                this.name = name;
            }

            public void enter() {
            }

            public void execute(TCGroupHandshakeMessage handshakeMessage) {
            }

            public String toString() {
                return this.name;
            }

            public HandshakeMonitor createMonitor() {
                return new HandshakeMonitor(){
                    boolean completed = false;
                    Thread owner = null;

                    @Override
                    public HandshakeState getState() {
                        return HandshakeState.this;
                    }

                    @Override
                    public synchronized void waitForCompletion() {
                        while (Thread.currentThread() != this.owner && !this.completed) {
                            try {
                                this.wait();
                            }
                            catch (InterruptedException ee) {
                                L2Utils.handleInterrupted(logger, ee);
                            }
                        }
                    }

                    private void start() {
                        Assert.assertNull((Object)this.owner);
                        this.owner = Thread.currentThread();
                        this.getState().enter();
                    }

                    @Override
                    public synchronized void complete() {
                        this.start();
                        this.signalComplete();
                    }

                    private synchronized void signalComplete() {
                        this.completed = true;
                        this.notifyAll();
                    }
                };
            }
        }
    }

    private static class HandshakeChannelEventListener
    implements ChannelEventListener {
        private final TCGroupHandshakeStateMachine stateMachine;

        HandshakeChannelEventListener(TCGroupHandshakeStateMachine stateMachine) {
            this.stateMachine = stateMachine;
        }

        public void notifyChannelEvent(ChannelEvent event) {
            if (event.getChannel() == this.stateMachine.getChannel() && (event.getType() == ChannelEventType.TRANSPORT_DISCONNECTED_EVENT || event.getType() == ChannelEventType.CHANNEL_CLOSED_EVENT)) {
                this.stateMachine.disconnected();
            }
        }
    }

    private final class ZapNodeRequestRouter
    implements GroupMessageListener<GroupZapNodeMessage> {
        private ZapNodeRequestRouter() {
        }

        @Override
        public void messageReceived(NodeID fromNode, GroupZapNodeMessage zapMsg) {
            TCGroupManagerImpl.this.zapNodeRequestProcessor.incomingZapNodeRequest((NodeID)zapMsg.messageFrom(), zapMsg.getZapNodeType(), zapMsg.getReason(), zapMsg.getWeights());
        }
    }

    private class GroupResponseImpl
    implements GroupResponse<AbstractGroupMessage> {
        private final Set<ServerID> waitFor = new HashSet<ServerID>();
        private final List<AbstractGroupMessage> responses = new ArrayList<AbstractGroupMessage>();

        GroupResponseImpl() {
        }

        @Override
        public synchronized List<AbstractGroupMessage> getResponses() {
            Assert.assertTrue((boolean)this.waitFor.isEmpty());
            return this.responses;
        }

        @Override
        public synchronized AbstractGroupMessage getResponse(NodeID nodeID) {
            Assert.assertTrue((boolean)this.waitFor.isEmpty());
            for (AbstractGroupMessage msg : this.responses) {
                if (!nodeID.equals(msg.messageFrom())) continue;
                return msg;
            }
            logger.warn("Missing response message from " + nodeID);
            return null;
        }

        public synchronized void sendTo(TCGroupMember member, AbstractGroupMessage msg) throws GroupException {
            if (!member.isReady()) {
                TCGroupManagerImpl.this.closeMember(member);
                throw new GroupException("Send to a not ready member " + member);
            }
            Assert.assertNotNull((Object)member.getPeerNodeID());
            this.waitFor.add(member.getPeerNodeID());
            Runnable sentCallback = null;
            member.send(msg, sentCallback);
            this.waitForResponses(TCGroupManagerImpl.this.getNodeID());
        }

        public synchronized void sendAll(AbstractGroupMessage msg, Set<? extends NodeID> nodeIDs) throws GroupException {
            boolean debug = msg instanceof L2StateMessage;
            for (TCGroupMember m : TCGroupManagerImpl.this.getMembers()) {
                if (!nodeIDs.contains(m.getPeerNodeID())) {
                    if (!debug || !TCGroupManagerImpl.isDebugLogging()) continue;
                    TCGroupManagerImpl.debugInfo("Not sending msg to " + m.getPeerNodeID() + ", msg: " + msg + ", channel: " + m.getChannel());
                    continue;
                }
                if (m.isReady()) {
                    Assert.assertNotNull((Object)m.getPeerNodeID());
                    this.waitFor.add(m.getPeerNodeID());
                    if (debug && TCGroupManagerImpl.isDebugLogging()) {
                        TCGroupManagerImpl.debugInfo("Sending msg to " + m.getPeerNodeID() + ", msg: " + msg + ", channel: " + m.getChannel());
                    }
                    m.sendIgnoreNotReady(msg);
                    continue;
                }
                logger.warn("SendAllAndWait to a not ready member " + m);
            }
            this.waitForResponses(TCGroupManagerImpl.this.getNodeID());
        }

        public synchronized void addResponseFrom(ServerID nodeID, AbstractGroupMessage gmsg) {
            if (!this.waitFor.remove(nodeID)) {
                String message = "Recd response from a member not in list : " + nodeID + " : waiting For : " + this.waitFor + " msg : " + gmsg;
                logger.error(message);
                throw new AssertionError((Object)message);
            }
            if (gmsg instanceof L2StateMessage && TCGroupManagerImpl.isDebugLogging()) {
                TCGroupManagerImpl.debugInfo("Received msg from: " + nodeID + ", msg: " + gmsg);
            }
            this.responses.add(gmsg);
            this.notifyAll();
        }

        public synchronized void notifyMemberDead(TCGroupMember member) {
            logger.warn("Remove dead member from waitFor response list, dead member: " + member.getPeerNodeID());
            this.waitFor.remove(member.getPeerNodeID());
            this.notifyAll();
        }

        private void waitForResponses(ServerID sender) throws GroupException {
            long start = System.currentTimeMillis();
            while (!this.waitFor.isEmpty() && !TCGroupManagerImpl.this.isStopped()) {
                try {
                    this.wait(5000L);
                    long end = System.currentTimeMillis();
                    if (this.waitFor.isEmpty() || end - start <= 5000L) continue;
                    logger.warn(sender + " Still waiting for response from " + this.waitFor + ". Waited for " + (end - start) + " ms");
                }
                catch (InterruptedException e) {
                    throw new GroupException(e);
                }
            }
            if (TCGroupManagerImpl.this.isStopped()) {
                this.waitFor.clear();
            }
        }
    }
}

