/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.connection.nio;

import com.hazelcast.client.AuthenticationException;
import com.hazelcast.client.ClientExtension;
import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.HazelcastClientOfflineException;
import com.hazelcast.client.config.ClientNetworkConfig;
import com.hazelcast.client.config.SocketOptions;
import com.hazelcast.client.connection.AddressProvider;
import com.hazelcast.client.connection.AddressTranslator;
import com.hazelcast.client.connection.ClientConnectionManager;
import com.hazelcast.client.connection.ClientConnectionStrategy;
import com.hazelcast.client.connection.nio.AuthenticationFuture;
import com.hazelcast.client.connection.nio.ClientChannelInitializer;
import com.hazelcast.client.connection.nio.ClientConnection;
import com.hazelcast.client.connection.nio.DefaultClientConnectionStrategy;
import com.hazelcast.client.connection.nio.HeartbeatManager;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.LifecycleServiceImpl;
import com.hazelcast.client.impl.client.ClientPrincipal;
import com.hazelcast.client.impl.protocol.AuthenticationStatus;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ClientAuthenticationCodec;
import com.hazelcast.client.impl.protocol.codec.ClientAuthenticationCustomCodec;
import com.hazelcast.client.spi.ClientContext;
import com.hazelcast.client.spi.impl.ClientExecutionServiceImpl;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.spi.impl.ConnectionHeartbeatListener;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.config.SSLConfig;
import com.hazelcast.config.SocketInterceptorConfig;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.Member;
import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.ChannelErrorHandler;
import com.hazelcast.internal.networking.ChannelFactory;
import com.hazelcast.internal.networking.ChannelInitializer;
import com.hazelcast.internal.networking.nio.NioEventLoopGroup;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.ClassLoaderUtil;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.ConnectionListener;
import com.hazelcast.nio.SocketInterceptor;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.security.Credentials;
import com.hazelcast.security.UsernamePasswordCredentials;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.AddressUtil;
import com.hazelcast.util.Clock;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.executor.SingleExecutorThreadFactory;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class ClientConnectionManagerImpl
implements ClientConnectionManager,
ConnectionHeartbeatListener {
    private static final int DEFAULT_SSL_THREAD_COUNT = 3;
    private static final int DEFAULT_CONNECTION_ATTEMPT_LIMIT_SYNC = 2;
    private static final int DEFAULT_CONNECTION_ATTEMPT_LIMIT_ASYNC = 20;
    protected final AtomicInteger connectionIdGen = new AtomicInteger();
    protected volatile boolean alive;
    private final ILogger logger;
    private final int connectionTimeoutMillis;
    private final HazelcastClientInstanceImpl client;
    private final SocketInterceptor socketInterceptor;
    private final SocketOptions socketOptions;
    private final ChannelFactory channelFactory;
    private final ClientExecutionServiceImpl executionService;
    private final AddressTranslator addressTranslator;
    private final ConcurrentMap<Address, ClientConnection> activeConnections = new ConcurrentHashMap<Address, ClientConnection>();
    private final ConcurrentMap<Address, AuthenticationFuture> connectionsInProgress = new ConcurrentHashMap<Address, AuthenticationFuture>();
    private final Collection<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
    private final boolean allowInvokeWhenDisconnected;
    private final Credentials credentials;
    private final NioEventLoopGroup eventLoopGroup;
    private volatile Address ownerConnectionAddress;
    private volatile Address previousOwnerConnectionAddress;
    private final HeartbeatManager heartbeat;
    private final long authenticationTimeout;
    private volatile ClientPrincipal principal;
    private final ClientConnectionStrategy connectionStrategy;
    private final ExecutorService clusterConnectionExecutor;
    private final int connectionAttemptPeriod;
    private final int connectionAttemptLimit;
    private final boolean shuffleMemberList;
    private final Collection<AddressProvider> addressProviders;
    private final LinkedList<Integer> outboundPorts = new LinkedList();
    private final int outboundPortCount;

    public ClientConnectionManagerImpl(HazelcastClientInstanceImpl client, AddressTranslator addressTranslator, Collection<AddressProvider> addressProviders) {
        this.allowInvokeWhenDisconnected = client.getProperties().getBoolean(ClientProperty.ALLOW_INVOCATIONS_WHEN_DISCONNECTED);
        this.client = client;
        this.addressTranslator = addressTranslator;
        this.logger = client.getLoggingService().getLogger(ClientConnectionManager.class);
        ClientNetworkConfig networkConfig = client.getClientConfig().getNetworkConfig();
        int connTimeout = networkConfig.getConnectionTimeout();
        this.connectionTimeoutMillis = connTimeout == 0 ? Integer.MAX_VALUE : connTimeout;
        this.executionService = (ClientExecutionServiceImpl)client.getClientExecutionService();
        this.socketOptions = networkConfig.getSocketOptions();
        this.eventLoopGroup = this.initEventLoopGroup(client);
        this.channelFactory = client.getClientExtension().createSocketChannelWrapperFactory();
        this.socketInterceptor = this.initSocketInterceptor(networkConfig.getSocketInterceptorConfig());
        this.credentials = client.getCredentials();
        this.connectionStrategy = this.initializeStrategy(client);
        this.clusterConnectionExecutor = this.createSingleThreadExecutorService(client);
        this.shuffleMemberList = client.getProperties().getBoolean(ClientProperty.SHUFFLE_MEMBER_LIST);
        this.addressProviders = addressProviders;
        this.connectionAttemptPeriod = networkConfig.getConnectionAttemptPeriod();
        int connAttemptLimit = networkConfig.getConnectionAttemptLimit();
        boolean isAsync = client.getClientConfig().getConnectionStrategyConfig().isAsyncStart();
        this.connectionAttemptLimit = connAttemptLimit < 0 ? (isAsync ? 20 : 2) : (connAttemptLimit == 0 ? Integer.MAX_VALUE : connAttemptLimit);
        this.outboundPorts.addAll(this.getOutboundPorts(networkConfig));
        this.outboundPortCount = this.outboundPorts.size();
        this.heartbeat = new HeartbeatManager(this, client);
        this.authenticationTimeout = this.heartbeat.getHeartbeatTimeout();
        this.checkSslAllowed();
    }

    private void checkSslAllowed() {
        SSLConfig sslConfig = this.client.getClientConfig().getNetworkConfig().getSSLConfig();
        if (sslConfig != null && sslConfig.isEnabled() && !BuildInfoProvider.getBuildInfo().isEnterprise()) {
            throw new IllegalStateException("SSL/TLS requires Hazelcast Enterprise Edition");
        }
    }

    private Collection<Integer> getOutboundPorts(ClientNetworkConfig networkConfig) {
        Collection<Integer> outboundPorts = networkConfig.getOutboundPorts();
        Collection<String> outboundPortDefinitions = networkConfig.getOutboundPortDefinitions();
        return AddressUtil.getOutboundPorts(outboundPorts, outboundPortDefinitions);
    }

    private ClientConnectionStrategy initializeStrategy(HazelcastClientInstanceImpl client) {
        String className = client.getProperties().get("hazelcast.client.connection.strategy.classname");
        if (className != null) {
            try {
                ClassLoader configClassLoader = client.getClientConfig().getClassLoader();
                return (ClientConnectionStrategy)ClassLoaderUtil.newInstance((ClassLoader)configClassLoader, (String)className);
            }
            catch (Exception e) {
                throw ExceptionUtil.rethrow((Throwable)e);
            }
        }
        DefaultClientConnectionStrategy strategy = new DefaultClientConnectionStrategy();
        return strategy;
    }

    public NioEventLoopGroup getEventLoopGroup() {
        return this.eventLoopGroup;
    }

    protected NioEventLoopGroup initEventLoopGroup(HazelcastClientInstanceImpl client) {
        HazelcastProperties properties = client.getProperties();
        boolean directBuffer = properties.getBoolean(GroupProperty.SOCKET_CLIENT_BUFFER_DIRECT);
        SSLConfig sslConfig = client.getClientConfig().getNetworkConfig().getSSLConfig();
        boolean sslEnabled = sslConfig != null && sslConfig.isEnabled();
        int configuredInputThreads = properties.getInteger(ClientProperty.IO_INPUT_THREAD_COUNT);
        int configuredOutputThreads = properties.getInteger(ClientProperty.IO_OUTPUT_THREAD_COUNT);
        int inputThreads = configuredInputThreads == -1 ? (sslEnabled ? 3 : 1) : configuredInputThreads;
        int outputThreads = configuredOutputThreads == -1 ? (sslEnabled ? 3 : 1) : configuredOutputThreads;
        return new NioEventLoopGroup(new NioEventLoopGroup.Context().loggingService(client.getLoggingService()).metricsRegistry((MetricsRegistry)client.getMetricsRegistry()).threadNamePrefix(client.getName()).errorHandler((ChannelErrorHandler)new ClientConnectionChannelErrorHandler()).inputThreadCount(inputThreads).outputThreadCount(outputThreads).balancerIntervalSeconds(properties.getInteger(ClientProperty.IO_BALANCER_INTERVAL_SECONDS)).channelInitializer((ChannelInitializer)new ClientChannelInitializer(this.getBufferSize(), directBuffer)));
    }

    private SocketInterceptor initSocketInterceptor(SocketInterceptorConfig sic) {
        if (sic != null && sic.isEnabled()) {
            ClientExtension clientExtension = this.client.getClientExtension();
            return clientExtension.createSocketInterceptor();
        }
        return null;
    }

    @Override
    public Collection<ClientConnection> getActiveConnections() {
        return this.activeConnections.values();
    }

    @Override
    public boolean isAlive() {
        return this.alive;
    }

    public synchronized void start(ClientContext clientContext) {
        if (this.alive) {
            return;
        }
        this.alive = true;
        this.startEventLoopGroup();
        this.heartbeat.start();
        this.addConnectionHeartbeatListener(this);
        this.connectionStrategy.init(clientContext);
        this.connectionStrategy.start();
    }

    protected void startEventLoopGroup() {
        this.eventLoopGroup.start();
    }

    public synchronized void shutdown() {
        if (!this.alive) {
            return;
        }
        this.alive = false;
        for (Connection connection : this.activeConnections.values()) {
            connection.close("Hazelcast client is shutting down", null);
        }
        ClientExecutionServiceImpl.shutdownExecutor("cluster", this.clusterConnectionExecutor, this.logger);
        this.stopEventLoopGroup();
        this.connectionListeners.clear();
        this.heartbeat.shutdown();
        this.connectionStrategy.shutdown();
    }

    @Override
    public ClientPrincipal getPrincipal() {
        return this.principal;
    }

    private void setPrincipal(ClientPrincipal principal) {
        this.principal = principal;
    }

    protected void stopEventLoopGroup() {
        this.eventLoopGroup.shutdown();
    }

    @Override
    public Connection getActiveConnection(Address target) {
        if (target == null) {
            return null;
        }
        return (Connection)this.activeConnections.get(target);
    }

    @Override
    public Connection getOrConnect(Address address) throws IOException {
        return this.getOrConnect(address, false);
    }

    @Override
    public Connection getOrTriggerConnect(Address target, boolean acquiresResources) throws IOException {
        Connection connection = this.getConnection(target, false, acquiresResources);
        if (connection != null) {
            return connection;
        }
        this.triggerConnect(target, false);
        return null;
    }

    private Connection getConnection(Address target, boolean asOwner, boolean acquiresResources) throws IOException {
        this.checkAllowed(target, asOwner, acquiresResources);
        if (target == null) {
            throw new IllegalStateException("Address can not be null");
        }
        ClientConnection connection = (ClientConnection)this.activeConnections.get(target);
        if (connection != null) {
            if (!asOwner) {
                return connection;
            }
            if (connection.isAuthenticatedAsOwner()) {
                return connection;
            }
        }
        return null;
    }

    private void checkAllowed(Address target, boolean asOwner, boolean acquiresResources) throws IOException {
        if (!this.alive) {
            throw new HazelcastClientNotActiveException("ConnectionManager is not active!");
        }
        if (asOwner) {
            return;
        }
        try {
            this.connectionStrategy.beforeGetConnection(target);
        }
        catch (HazelcastClientOfflineException e) {
            if (this.allowInvokeWhenDisconnected && !acquiresResources) {
                return;
            }
            throw e;
        }
        if (this.getOwnerConnection() == null) {
            if (this.allowInvokeWhenDisconnected && !acquiresResources) {
                return;
            }
            throw new IOException("Owner connection is not available!");
        }
    }

    @Override
    public Address getOwnerConnectionAddress() {
        return this.ownerConnectionAddress;
    }

    private void setOwnerConnectionAddress(Address ownerConnectionAddress) {
        this.previousOwnerConnectionAddress = this.ownerConnectionAddress;
        this.ownerConnectionAddress = ownerConnectionAddress;
    }

    private Connection getOrConnect(Address address, boolean asOwner) {
        try {
            ClientConnection connection;
            do {
                if ((connection = (ClientConnection)this.getConnection(address, asOwner, true)) != null) {
                    return connection;
                }
                AuthenticationFuture future = this.triggerConnect(address, asOwner);
                connection = (ClientConnection)future.get();
                if (asOwner) continue;
                return connection;
            } while (!connection.isAuthenticatedAsOwner());
            return connection;
        }
        catch (Throwable e) {
            throw ExceptionUtil.rethrow((Throwable)e);
        }
    }

    private AuthenticationFuture triggerConnect(Address target, boolean asOwner) {
        AuthenticationFuture future;
        AuthenticationFuture oldFuture;
        if (!asOwner) {
            this.connectionStrategy.beforeOpenConnection(target);
        }
        if ((oldFuture = this.connectionsInProgress.putIfAbsent(target, future = new AuthenticationFuture())) == null) {
            this.executionService.execute(new InitConnectionTask(target, asOwner, future));
            return future;
        }
        return oldFuture;
    }

    @Override
    public ClientConnection getOwnerConnection() {
        if (this.ownerConnectionAddress == null) {
            return null;
        }
        ClientConnection connection = (ClientConnection)this.getActiveConnection(this.ownerConnectionAddress);
        return connection;
    }

    private Connection connectAsOwner(Address address) {
        Connection connection = null;
        try {
            this.logger.info("Trying to connect to " + address + " as owner member");
            connection = this.getOrConnect(address, true);
            this.client.onClusterConnect(connection);
            this.fireConnectionEvent(LifecycleEvent.LifecycleState.CLIENT_CONNECTED);
            this.connectionStrategy.onConnectToCluster();
        }
        catch (Exception e) {
            this.logger.warning("Exception during initial connection to " + address + ", exception " + e);
            if (null != connection) {
                connection.close("Could not connect to " + address + " as owner", (Throwable)e);
            }
            return null;
        }
        return connection;
    }

    private void fireConnectionAddedEvent(ClientConnection connection) {
        for (ConnectionListener connectionListener : this.connectionListeners) {
            connectionListener.connectionAdded((Connection)connection);
        }
        this.connectionStrategy.onConnect(connection);
    }

    private void fireConnectionRemovedEvent(ClientConnection connection) {
        if (connection.isAuthenticatedAsOwner()) {
            this.disconnectFromCluster(connection);
        }
        for (ConnectionListener listener : this.connectionListeners) {
            listener.connectionRemoved((Connection)connection);
        }
        this.connectionStrategy.onDisconnect(connection);
    }

    private void disconnectFromCluster(final ClientConnection connection) {
        this.clusterConnectionExecutor.execute(new Runnable(){

            @Override
            public void run() {
                Address endpoint = connection.getEndPoint();
                if (endpoint == null || !endpoint.equals((Object)ClientConnectionManagerImpl.this.ownerConnectionAddress)) {
                    return;
                }
                ClientConnectionManagerImpl.this.setOwnerConnectionAddress(null);
                ClientConnectionManagerImpl.this.connectionStrategy.onDisconnectFromCluster();
                if (ClientConnectionManagerImpl.this.client.getLifecycleService().isRunning()) {
                    ClientConnectionManagerImpl.this.fireConnectionEvent(LifecycleEvent.LifecycleState.CLIENT_DISCONNECTED);
                }
            }
        });
    }

    private void fireConnectionEvent(LifecycleEvent.LifecycleState state) {
        LifecycleServiceImpl lifecycleService = (LifecycleServiceImpl)this.client.getLifecycleService();
        lifecycleService.fireLifecycleEvent(state);
    }

    private boolean useAnyOutboundPort() {
        return this.outboundPortCount == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int acquireOutboundPort() {
        if (this.outboundPortCount == 0) {
            return 0;
        }
        LinkedList<Integer> linkedList = this.outboundPorts;
        synchronized (linkedList) {
            Integer port = this.outboundPorts.removeFirst();
            this.outboundPorts.addLast(port);
            return port;
        }
    }

    private void bindSocketToPort(Socket socket) throws IOException {
        if (!this.useAnyOutboundPort()) {
            int retryCount = this.outboundPortCount * 2;
            IOException ex = null;
            for (int i = 0; i < retryCount; ++i) {
                int port = this.acquireOutboundPort();
                if (port == 0) {
                    return;
                }
                InetSocketAddress socketAddress = new InetSocketAddress(port);
                try {
                    socket.bind(socketAddress);
                    return;
                }
                catch (IOException e) {
                    ex = e;
                    this.logger.finest("Could not bind port[ " + port + "]: " + e.getMessage());
                    continue;
                }
            }
            throw ex;
        }
        InetSocketAddress socketAddress = new InetSocketAddress(0);
        socket.bind(socketAddress);
    }

    protected ClientConnection createSocketConnection(Address address) throws IOException {
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            Socket socket = socketChannel.socket();
            socket.setKeepAlive(this.socketOptions.isKeepAlive());
            socket.setTcpNoDelay(this.socketOptions.isTcpNoDelay());
            socket.setReuseAddress(this.socketOptions.isReuseAddress());
            if (this.socketOptions.getLingerSeconds() > 0) {
                socket.setSoLinger(true, this.socketOptions.getLingerSeconds());
            }
            int bufferSize = this.getBufferSize();
            socket.setSendBufferSize(bufferSize);
            socket.setReceiveBufferSize(bufferSize);
            InetSocketAddress inetSocketAddress = address.getInetSocketAddress();
            this.bindSocketToPort(socket);
            socketChannel.socket().connect(inetSocketAddress, this.connectionTimeoutMillis);
            HazelcastProperties properties = this.client.getProperties();
            boolean directBuffer = properties.getBoolean(GroupProperty.SOCKET_CLIENT_BUFFER_DIRECT);
            Channel channel = this.channelFactory.create(socketChannel, true, directBuffer);
            ClientConnection clientConnection = new ClientConnection(this.client, this.connectionIdGen.incrementAndGet(), channel);
            socketChannel.configureBlocking(true);
            if (this.socketInterceptor != null) {
                this.socketInterceptor.onConnect(socket);
            }
            socket.setSoTimeout(0);
            this.eventLoopGroup.register(channel);
            return clientConnection;
        }
        catch (Exception e) {
            if (socketChannel != null) {
                socketChannel.close();
            }
            throw ExceptionUtil.rethrow((Throwable)e, IOException.class);
        }
    }

    private int getBufferSize() {
        int bufferSize = this.socketOptions.getBufferSize() * 1024;
        return bufferSize <= 0 ? 131072 : bufferSize;
    }

    void onClose(Connection connection) {
        this.removeFromActiveConnections(connection);
    }

    private void removeFromActiveConnections(Connection connection) {
        Address endpoint = connection.getEndPoint();
        if (endpoint == null) {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Destroying " + connection + ", but it has end-point set to null -> not removing it from a connection map");
            }
            return;
        }
        if (this.activeConnections.remove(endpoint, connection)) {
            this.logger.info("Removed connection to endpoint: " + endpoint + ", connection: " + connection);
            this.fireConnectionRemovedEvent((ClientConnection)connection);
        } else if (this.logger.isFinestEnabled()) {
            this.logger.finest("Destroying a connection, but there is no mapping " + endpoint + " -> " + connection + " in the connection map.");
        }
    }

    public void addConnectionListener(ConnectionListener connectionListener) {
        this.connectionListeners.add(connectionListener);
    }

    @Override
    public void addConnectionHeartbeatListener(ConnectionHeartbeatListener connectionHeartbeatListener) {
        this.heartbeat.addConnectionHeartbeatListener(connectionHeartbeatListener);
    }

    private void authenticate(Address target, ClientConnection connection, boolean asOwner, AuthenticationFuture future) {
        ClientPrincipal principal = this.getPrincipal();
        ClientMessage clientMessage = this.encodeAuthenticationRequest(asOwner, this.client.getSerializationService(), principal);
        ClientInvocation clientInvocation = new ClientInvocation(this.client, clientMessage, null, connection);
        ClientInvocationFuture invocationFuture = clientInvocation.invokeUrgent();
        ScheduledFuture<?> timeoutTaskFuture = this.executionService.schedule(new TimeoutAuthenticationTask(invocationFuture), this.authenticationTimeout, TimeUnit.MILLISECONDS);
        invocationFuture.andThen(new AuthCallback(connection, asOwner, target, future, timeoutTaskFuture));
    }

    private ClientMessage encodeAuthenticationRequest(boolean asOwner, SerializationService ss, ClientPrincipal principal) {
        ClientMessage clientMessage;
        byte serializationVersion = ((InternalSerializationService)ss).getVersion();
        String uuid = null;
        String ownerUuid = null;
        if (principal != null) {
            uuid = principal.getUuid();
            ownerUuid = principal.getOwnerUuid();
        }
        if (this.credentials.getClass().equals(UsernamePasswordCredentials.class)) {
            UsernamePasswordCredentials cr = (UsernamePasswordCredentials)this.credentials;
            clientMessage = ClientAuthenticationCodec.encodeRequest((String)cr.getUsername(), (String)cr.getPassword(), (String)uuid, (String)ownerUuid, (boolean)asOwner, (String)"JVM", (byte)serializationVersion, (String)BuildInfoProvider.getBuildInfo().getVersion());
        } else {
            Data data = ss.toData((Object)this.credentials);
            clientMessage = ClientAuthenticationCustomCodec.encodeRequest((Data)data, (String)uuid, (String)ownerUuid, (boolean)asOwner, (String)"JVM", (byte)serializationVersion, (String)BuildInfoProvider.getBuildInfo().getVersion());
        }
        return clientMessage;
    }

    private void onAuthenticated(Address target, ClientConnection connection) {
        ClientConnection oldConnection = this.activeConnections.put(connection.getEndPoint(), connection);
        if (oldConnection == null) {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Authentication succeeded for " + connection + " and there was no old connection to this end-point");
            }
            this.fireConnectionAddedEvent(connection);
        } else {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Re-authentication succeeded for " + connection);
            }
            assert (connection.equals(oldConnection));
        }
        this.connectionsInProgress.remove(target);
        this.logger.info("Authenticated with server " + connection.getEndPoint() + ", server version:" + connection.getConnectedServerVersionString() + " Local address: " + connection.getLocalSocketAddress());
        if (!connection.isAlive()) {
            this.removeFromActiveConnections(connection);
        }
    }

    private void onAuthenticationFailed(Address target, ClientConnection connection, Throwable cause) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Authentication of " + connection + " failed.", cause);
        }
        connection.close(null, cause);
        this.connectionsInProgress.remove(target);
    }

    @Override
    public void heartbeatResumed(Connection connection) {
        this.connectionStrategy.onHeartbeatResumed((ClientConnection)connection);
    }

    @Override
    public void heartbeatStopped(Connection connection) {
        this.connectionStrategy.onHeartbeatStopped((ClientConnection)connection);
    }

    @Override
    public void connectToCluster() {
        try {
            this.connectToClusterAsync().get();
        }
        catch (Exception e) {
            throw ExceptionUtil.rethrow((Throwable)e);
        }
    }

    private void connectToClusterInternal() {
        int attempt = 0;
        HashSet<Address> triedAddresses = new HashSet<Address>();
        while (attempt < this.connectionAttemptLimit) {
            ++attempt;
            long nextTry = Clock.currentTimeMillis() + (long)this.connectionAttemptPeriod;
            Collection<Address> addresses = this.getPossibleMemberAddresses();
            for (Address address : addresses) {
                if (!this.client.getLifecycleService().isRunning()) {
                    throw new IllegalStateException("Giving up on retrying to connect to cluster since client is shutdown.");
                }
                triedAddresses.add(address);
                if (this.connectAsOwner(address) == null) continue;
                return;
            }
            if (!this.client.getLifecycleService().isRunning()) {
                throw new IllegalStateException("Client is being shutdown.");
            }
            if (attempt < this.connectionAttemptLimit) {
                long remainingTime = nextTry - Clock.currentTimeMillis();
                this.logger.warning(String.format("Unable to get alive cluster connection, try in %d ms later, attempt %d of %d.", Math.max(0L, remainingTime), attempt, this.connectionAttemptLimit));
                if (remainingTime <= 0L) continue;
                try {
                    Thread.sleep(remainingTime);
                    continue;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            this.logger.warning(String.format("Unable to get alive cluster connection, attempt %d of %d.", attempt, this.connectionAttemptLimit));
        }
        throw new IllegalStateException("Unable to connect to any address! The following addresses were tried: " + triedAddresses);
    }

    @Override
    public Future<Void> connectToClusterAsync() {
        return this.clusterConnectionExecutor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                try {
                    ClientConnectionManagerImpl.this.connectToClusterInternal();
                }
                catch (Exception e) {
                    ClientConnectionManagerImpl.this.logger.warning("Could not connect to cluster, shutting down the client. " + e.getMessage());
                    new Thread(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                ClientConnectionManagerImpl.this.client.getLifecycleService().shutdown();
                            }
                            catch (Exception exception) {
                                ClientConnectionManagerImpl.this.logger.severe("Exception during client shutdown ", (Throwable)exception);
                            }
                        }
                    }, ClientConnectionManagerImpl.this.client.getName() + ".clientShutdown-").start();
                    throw ExceptionUtil.rethrow((Throwable)e);
                }
                return null;
            }
        });
    }

    Collection<Address> getPossibleMemberAddresses() {
        LinkedHashSet<Address> addresses = new LinkedHashSet<Address>();
        Collection<Member> memberList = this.client.getClientClusterService().getMemberList();
        for (Member member : memberList) {
            addresses.add(member.getAddress());
        }
        if (this.shuffleMemberList) {
            ClientConnectionManagerImpl.shuffle(addresses);
        }
        LinkedHashSet<Address> providerAddresses = new LinkedHashSet<Address>();
        for (AddressProvider addressProvider : this.addressProviders) {
            try {
                providerAddresses.addAll(addressProvider.loadAddresses());
            }
            catch (NullPointerException e) {
                throw e;
            }
            catch (Exception e) {
                this.logger.warning("Exception from AddressProvider: " + addressProvider, (Throwable)e);
            }
        }
        if (this.shuffleMemberList) {
            ClientConnectionManagerImpl.shuffle(providerAddresses);
        }
        addresses.addAll(providerAddresses);
        if (this.previousOwnerConnectionAddress != null) {
            addresses.remove(this.previousOwnerConnectionAddress);
            addresses.add(this.previousOwnerConnectionAddress);
        }
        return addresses;
    }

    private static <T> Set<T> shuffle(Set<T> set) {
        ArrayList<T> shuffleMe = new ArrayList<T>(set);
        Collections.shuffle(shuffleMe);
        LinkedHashSet<T> shuffledSet = new LinkedHashSet<T>();
        shuffledSet.addAll(shuffleMe);
        return shuffledSet;
    }

    private ExecutorService createSingleThreadExecutorService(HazelcastClientInstanceImpl client) {
        ClassLoader classLoader = client.getClientConfig().getClassLoader();
        SingleExecutorThreadFactory threadFactory = new SingleExecutorThreadFactory(classLoader, client.getName() + ".cluster-");
        return Executors.newSingleThreadExecutor((ThreadFactory)threadFactory);
    }

    private class AuthCallback
    implements ExecutionCallback<ClientMessage> {
        private final ClientConnection connection;
        private final boolean asOwner;
        private final Address target;
        private final AuthenticationFuture future;
        private final ScheduledFuture timeoutTaskFuture;

        AuthCallback(ClientConnection connection, boolean asOwner, Address target, AuthenticationFuture future, ScheduledFuture timeoutTaskFuture) {
            this.connection = connection;
            this.asOwner = asOwner;
            this.target = target;
            this.future = future;
            this.timeoutTaskFuture = timeoutTaskFuture;
        }

        public void onResponse(ClientMessage response) {
            ClientAuthenticationCodec.ResponseParameters result;
            this.timeoutTaskFuture.cancel(true);
            try {
                result = ClientAuthenticationCodec.decodeResponse((ClientMessage)response);
            }
            catch (HazelcastException e) {
                this.onFailure(e);
                return;
            }
            AuthenticationStatus authenticationStatus = AuthenticationStatus.getById((int)result.status);
            switch (authenticationStatus) {
                case AUTHENTICATED: {
                    this.connection.setConnectedServerVersion(result.serverHazelcastVersion);
                    this.connection.setRemoteEndpoint(result.address);
                    if (this.asOwner) {
                        this.connection.setIsAuthenticatedAsOwner();
                        ClientPrincipal principal = new ClientPrincipal(result.uuid, result.ownerUuid);
                        ClientConnectionManagerImpl.this.setPrincipal(principal);
                        ClientConnectionManagerImpl.this.setOwnerConnectionAddress(this.connection.getEndPoint());
                        ClientConnectionManagerImpl.this.logger.info("Setting " + this.connection + " as owner with principal " + principal);
                    }
                    ClientConnectionManagerImpl.this.onAuthenticated(this.target, this.connection);
                    this.future.onSuccess(this.connection);
                    break;
                }
                case CREDENTIALS_FAILED: {
                    this.onFailure((Throwable)new AuthenticationException("Invalid credentials! Principal: " + ClientConnectionManagerImpl.this.principal));
                    break;
                }
                default: {
                    this.onFailure((Throwable)new AuthenticationException("Authentication status code not supported. status: " + authenticationStatus));
                }
            }
        }

        public void onFailure(Throwable t) {
            this.timeoutTaskFuture.cancel(true);
            ClientConnectionManagerImpl.this.onAuthenticationFailed(this.target, this.connection, t);
            this.future.onFailure(t);
        }
    }

    private class ClientConnectionChannelErrorHandler
    implements ChannelErrorHandler {
        private ClientConnectionChannelErrorHandler() {
        }

        public void onError(Channel channel, Throwable cause) {
            if (channel == null) {
                ClientConnectionManagerImpl.this.logger.severe(cause);
            } else {
                if (cause instanceof OutOfMemoryError) {
                    ClientConnectionManagerImpl.this.logger.severe(cause);
                }
                ClientConnection connection = (ClientConnection)channel.attributeMap().get(ClientConnection.class);
                if (cause instanceof EOFException) {
                    connection.close("Connection closed by the other side", cause);
                } else {
                    connection.close("Exception in " + connection + ", thread=" + Thread.currentThread().getName(), cause);
                }
            }
        }
    }

    private class InitConnectionTask
    implements Runnable {
        private final Address target;
        private final boolean asOwner;
        private final AuthenticationFuture future;

        InitConnectionTask(Address target, boolean asOwner, AuthenticationFuture future) {
            this.target = target;
            this.asOwner = asOwner;
            this.future = future;
        }

        @Override
        public void run() {
            ClientConnection connection;
            try {
                connection = this.getConnection(this.target);
            }
            catch (Exception e) {
                ClientConnectionManagerImpl.this.logger.finest((Throwable)e);
                this.future.onFailure(e);
                ClientConnectionManagerImpl.this.connectionsInProgress.remove(this.target);
                return;
            }
            try {
                ClientConnectionManagerImpl.this.authenticate(this.target, connection, this.asOwner, this.future);
            }
            catch (Exception e) {
                this.future.onFailure(e);
                connection.close("Failed to authenticate connection", e);
                ClientConnectionManagerImpl.this.connectionsInProgress.remove(this.target);
            }
        }

        private ClientConnection getConnection(Address target) throws IOException {
            ClientConnection connection = (ClientConnection)ClientConnectionManagerImpl.this.activeConnections.get(target);
            if (connection != null) {
                return connection;
            }
            Address address = ClientConnectionManagerImpl.this.addressTranslator.translate(target);
            if (address == null) {
                throw new NullPointerException("Address Translator " + ClientConnectionManagerImpl.this.addressTranslator.getClass() + " could not translate address " + target);
            }
            return ClientConnectionManagerImpl.this.createSocketConnection(address);
        }
    }

    private class TimeoutAuthenticationTask
    implements Runnable {
        private final ClientInvocationFuture future;

        TimeoutAuthenticationTask(ClientInvocationFuture future) {
            this.future = future;
        }

        @Override
        public void run() {
            if (this.future.isDone()) {
                return;
            }
            this.future.complete(new TimeoutException("Authentication response did not come back in " + ClientConnectionManagerImpl.this.authenticationTimeout + " millis"));
        }
    }
}

