/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.impl.connection.nio;

import com.hazelcast.client.AuthenticationException;
import com.hazelcast.client.ClientNotAllowedInClusterException;
import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.HazelcastClientOfflineException;
import com.hazelcast.client.LoadBalancer;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientConnectionStrategyConfig;
import com.hazelcast.client.config.ClientNetworkConfig;
import com.hazelcast.client.config.ConnectionRetryConfig;
import com.hazelcast.client.impl.clientside.CandidateClusterContext;
import com.hazelcast.client.impl.clientside.ClientLoggingService;
import com.hazelcast.client.impl.clientside.ClusterDiscoveryService;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.clientside.LifecycleServiceImpl;
import com.hazelcast.client.impl.connection.AddressProvider;
import com.hazelcast.client.impl.connection.Addresses;
import com.hazelcast.client.impl.connection.ClientConnectionManager;
import com.hazelcast.client.impl.connection.nio.ClientConnection;
import com.hazelcast.client.impl.connection.nio.HeartbeatManager;
import com.hazelcast.client.impl.connection.nio.WaitStrategy;
import com.hazelcast.client.impl.management.ManagementCenterService;
import com.hazelcast.client.impl.protocol.AuthenticationStatus;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ClientAuthenticationCodec;
import com.hazelcast.client.impl.protocol.codec.ClientAuthenticationCustomCodec;
import com.hazelcast.client.impl.spi.impl.ClientExecutionServiceImpl;
import com.hazelcast.client.impl.spi.impl.ClientInvocation;
import com.hazelcast.client.impl.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.impl.spi.impl.ClientPartitionServiceImpl;
import com.hazelcast.client.properties.ClientProperty;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.ChannelErrorHandler;
import com.hazelcast.internal.networking.ChannelInitializerProvider;
import com.hazelcast.internal.networking.nio.NioNetworking;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.nio.ConnectionListener;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.AddressUtil;
import com.hazelcast.internal.util.ConcurrencyUtil;
import com.hazelcast.internal.util.EmptyStatement;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.RuntimeAvailableProcessors;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.internal.util.executor.LoggingScheduledExecutor;
import com.hazelcast.internal.util.executor.PoolExecutorThreadFactory;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.SocketInterceptor;
import com.hazelcast.security.Credentials;
import com.hazelcast.security.PasswordCredentials;
import com.hazelcast.security.TokenCredentials;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.spi.properties.HazelcastProperties;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class ClientConnectionManagerImpl
implements ClientConnectionManager {
    private static final int DEFAULT_SMART_CLIENT_THREAD_COUNT = 3;
    private static final int EXECUTOR_CORE_POOL_SIZE = 10;
    private static final int SMALL_MACHINE_PROCESSOR_COUNT = 8;
    protected final AtomicInteger connectionIdGen = new AtomicInteger();
    private final AtomicBoolean isAlive = new AtomicBoolean();
    private final ILogger logger;
    private final int connectionTimeoutMillis;
    private final HazelcastClientInstanceImpl client;
    private final ConcurrentMap<Address, InetSocketAddress> inetSocketAddressCache = new ConcurrentHashMap<Address, InetSocketAddress>();
    private final Collection<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
    private final NioNetworking networking;
    private final HeartbeatManager heartbeat;
    private final long authenticationTimeout;
    private final String connectionType;
    private final UUID clientUuid = UuidUtil.newUnsecureUUID();
    private final LinkedList<Integer> outboundPorts = new LinkedList();
    private final Set<String> labels;
    private final int outboundPortCount;
    private final boolean failoverConfigProvided;
    private final ScheduledExecutorService executor;
    private final boolean shuffleMemberList;
    private final WaitStrategy waitStrategy;
    private final ClusterDiscoveryService clusterDiscoveryService;
    private final boolean asyncStart;
    private final ClientConnectionStrategyConfig.ReconnectMode reconnectMode;
    private final LoadBalancer loadBalancer;
    private final boolean isSmartRoutingEnabled;
    private final Runnable connectToAllClusterMembersTask = new ConnectToAllClusterMembersTask();
    private volatile Credentials currentCredentials;
    private final Object clientStateMutex = new Object();
    private final ConcurrentMap<UUID, ClientConnection> activeConnections = new ConcurrentHashMap<UUID, ClientConnection>();
    private volatile UUID clusterId;
    private volatile ClientState clientState = ClientState.INITIAL;
    private volatile boolean connectToClusterTaskSubmitted;

    public ClientConnectionManagerImpl(HazelcastClientInstanceImpl client) {
        this.client = client;
        this.loadBalancer = client.getLoadBalancer();
        this.labels = Collections.unmodifiableSet(client.getClientConfig().getLabels());
        this.logger = client.getLoggingService().getLogger(ClientConnectionManager.class);
        this.connectionType = client.getProperties().getBoolean(ManagementCenterService.MC_CLIENT_MODE_PROP) ? "MCJVM" : "JVM";
        this.connectionTimeoutMillis = this.initConnectionTimeoutMillis();
        this.networking = this.initNetworking();
        this.outboundPorts.addAll(this.getOutboundPorts());
        this.outboundPortCount = this.outboundPorts.size();
        this.heartbeat = new HeartbeatManager(this, client);
        this.authenticationTimeout = this.heartbeat.getHeartbeatTimeout();
        this.failoverConfigProvided = client.getFailoverConfig() != null;
        this.executor = this.createExecutorService();
        this.shuffleMemberList = client.getProperties().getBoolean(ClientProperty.SHUFFLE_MEMBER_LIST);
        this.clusterDiscoveryService = client.getClusterDiscoveryService();
        this.isSmartRoutingEnabled = client.getClientConfig().getNetworkConfig().isSmartRouting();
        this.waitStrategy = this.initializeWaitStrategy(client.getClientConfig());
        ClientConnectionStrategyConfig connectionStrategyConfig = client.getClientConfig().getConnectionStrategyConfig();
        this.asyncStart = connectionStrategyConfig.isAsyncStart();
        this.reconnectMode = connectionStrategyConfig.getReconnectMode();
    }

    private int initConnectionTimeoutMillis() {
        ClientNetworkConfig networkConfig = this.client.getClientConfig().getNetworkConfig();
        int connTimeout = networkConfig.getConnectionTimeout();
        return connTimeout == 0 ? Integer.MAX_VALUE : connTimeout;
    }

    private ScheduledExecutorService createExecutorService() {
        ClassLoader classLoader = this.client.getClientConfig().getClassLoader();
        String name = this.client.getName();
        return new LoggingScheduledExecutor(this.logger, 10, new PoolExecutorThreadFactory(name + ".internal-", classLoader), (r, executor) -> {
            String message = "Internal executor rejected task: " + r + ", because client is shutting down...";
            this.logger.finest(message);
            throw new RejectedExecutionException(message);
        });
    }

    private Collection<Integer> getOutboundPorts() {
        ClientNetworkConfig networkConfig = this.client.getClientConfig().getNetworkConfig();
        Collection<Integer> outboundPorts = networkConfig.getOutboundPorts();
        Collection<String> outboundPortDefinitions = networkConfig.getOutboundPortDefinitions();
        return AddressUtil.getOutboundPorts(outboundPorts, outboundPortDefinitions);
    }

    public NioNetworking getNetworking() {
        return this.networking;
    }

    protected NioNetworking initNetworking() {
        HazelcastProperties properties = this.client.getProperties();
        int configuredInputThreads = properties.getInteger(ClientProperty.IO_INPUT_THREAD_COUNT);
        int configuredOutputThreads = properties.getInteger(ClientProperty.IO_OUTPUT_THREAD_COUNT);
        int inputThreads = configuredInputThreads == -1 ? (this.isSmartRoutingEnabled && RuntimeAvailableProcessors.get() > 8 ? 3 : 1) : configuredInputThreads;
        int outputThreads = configuredOutputThreads == -1 ? (this.isSmartRoutingEnabled && RuntimeAvailableProcessors.get() > 8 ? 3 : 1) : configuredOutputThreads;
        return new NioNetworking(new NioNetworking.Context().loggingService(this.client.getLoggingService()).metricsRegistry(this.client.getMetricsRegistry()).threadNamePrefix(this.client.getName()).errorHandler(new ClientConnectionChannelErrorHandler()).inputThreadCount(inputThreads).outputThreadCount(outputThreads).balancerIntervalSeconds(properties.getInteger(ClientProperty.IO_BALANCER_INTERVAL_SECONDS)).writeThroughEnabled(properties.getBoolean(ClientProperty.IO_WRITE_THROUGH_ENABLED)).concurrencyDetection(this.client.getConcurrencyDetection()));
    }

    private WaitStrategy initializeWaitStrategy(ClientConfig clientConfig) {
        ClientConnectionStrategyConfig connectionStrategyConfig = clientConfig.getConnectionStrategyConfig();
        ConnectionRetryConfig expoRetryConfig = connectionStrategyConfig.getConnectionRetryConfig();
        return new WaitStrategy(expoRetryConfig.getInitialBackoffMillis(), expoRetryConfig.getMaxBackoffMillis(), expoRetryConfig.getMultiplier(), expoRetryConfig.getClusterConnectTimeoutMillis(), expoRetryConfig.getJitter(), this.logger);
    }

    public synchronized void start() {
        if (!this.isAlive.compareAndSet(false, true)) {
            return;
        }
        this.startNetworking();
        this.heartbeat.start();
        this.connectToCluster();
        if (this.isSmartRoutingEnabled) {
            this.executor.scheduleWithFixedDelay(this.connectToAllClusterMembersTask, 1L, 1L, TimeUnit.SECONDS);
        }
    }

    public void connectToAllClusterMembers() {
        if (!this.isSmartRoutingEnabled) {
            return;
        }
        for (Member member : this.client.getClientClusterService().getMemberList()) {
            try {
                this.getOrConnect(member.getAddress());
            }
            catch (Exception e) {
                EmptyStatement.ignore(e);
            }
        }
    }

    protected void startNetworking() {
        this.networking.restart();
    }

    public synchronized void shutdown() {
        if (!this.isAlive.compareAndSet(true, false)) {
            return;
        }
        ClientExecutionServiceImpl.shutdownExecutor("cluster", this.executor, this.logger);
        for (Connection connection : this.activeConnections.values()) {
            connection.close("Hazelcast client is shutting down", null);
        }
        this.stopNetworking();
        this.connectionListeners.clear();
        this.heartbeat.shutdown();
        this.clusterDiscoveryService.current().destroy();
    }

    protected void stopNetworking() {
        this.networking.shutdown();
    }

    private void connectToCluster() {
        this.clusterDiscoveryService.current().start();
        if (this.asyncStart) {
            this.submitConnectToClusterTask();
        } else {
            this.doConnectToCluster();
        }
    }

    private void submitConnectToClusterTask() {
        if (this.connectToClusterTaskSubmitted) {
            return;
        }
        this.executor.submit(() -> {
            try {
                this.doConnectToCluster();
                Object object = this.clientStateMutex;
                synchronized (object) {
                    this.connectToClusterTaskSubmitted = false;
                    if (this.activeConnections.isEmpty()) {
                        if (this.logger.isFineEnabled()) {
                            this.logger.warning("No connection to cluster: " + this.clusterId);
                        }
                        this.submitConnectToClusterTask();
                    }
                }
            }
            catch (Throwable e) {
                this.logger.warning("Could not connect to any cluster, shutting down the client: " + e.getMessage());
                this.shutdownWithExternalThread();
            }
        });
        this.connectToClusterTaskSubmitted = true;
    }

    private void doConnectToCluster() {
        CandidateClusterContext currentContext = this.clusterDiscoveryService.current();
        this.logger.info("Trying to connect to cluster: " + currentContext.getClusterName());
        if (this.doConnectToCandidateCluster(currentContext)) {
            return;
        }
        if (this.clusterDiscoveryService.tryNextCluster(this::destroyCurrentClusterConnectionAndTryNextCluster)) {
            return;
        }
        String msg = this.client.getLifecycleService().isRunning() ? "Unable to connect to any cluster." : "Client is being shutdown.";
        throw new IllegalStateException(msg);
    }

    private Boolean destroyCurrentClusterConnectionAndTryNextCluster(CandidateClusterContext currentContext, CandidateClusterContext nextContext) {
        currentContext.destroy();
        this.client.onClusterChange();
        nextContext.start();
        ((ClientLoggingService)this.client.getLoggingService()).updateClusterName(nextContext.getClusterName());
        this.logger.info("Trying to connect to next cluster: " + nextContext.getClusterName());
        if (this.doConnectToCandidateCluster(nextContext)) {
            this.client.waitForInitialMembershipEvents();
            this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_CHANGED_CLUSTER);
            return true;
        }
        return false;
    }

    private Connection connect(Address address) {
        try {
            this.logger.info("Trying to connect to " + address);
            return this.getOrConnect(address);
        }
        catch (InvalidConfigurationException e) {
            this.logger.warning("Exception during initial connection to " + address + ": " + e);
            throw ExceptionUtil.rethrow(e);
        }
        catch (ClientNotAllowedInClusterException e) {
            this.logger.warning("Exception during initial connection to " + address + ": " + e);
            throw e;
        }
        catch (Exception e) {
            this.logger.warning("Exception during initial connection to " + address + ": " + e);
            return null;
        }
    }

    private void fireLifecycleEvent(LifecycleEvent.LifecycleState state) {
        LifecycleServiceImpl lifecycleService = (LifecycleServiceImpl)this.client.getLifecycleService();
        lifecycleService.fireLifecycleEvent(state);
    }

    private boolean doConnectToCandidateCluster(CandidateClusterContext context) {
        HashSet<Address> triedAddresses = new HashSet<Address>();
        try {
            this.waitStrategy.reset();
            do {
                Collection<Address> addresses = this.getPossibleMemberAddresses(context.getAddressProvider());
                for (Address address : addresses) {
                    this.checkClientActive();
                    triedAddresses.add(address);
                    Connection connection = this.connect(address);
                    if (connection == null) continue;
                    return true;
                }
                this.checkClientActive();
            } while (this.waitStrategy.sleep());
        }
        catch (ClientNotAllowedInClusterException | InvalidConfigurationException e) {
            this.logger.warning("Stopped trying on the cluster: " + context.getClusterName() + " reason: " + e.getMessage());
        }
        this.logger.info("Unable to connect to any address from the cluster with name: " + context.getClusterName() + ". The following addresses were tried: " + triedAddresses);
        return false;
    }

    @Override
    public void checkInvocationAllowed() throws IOException {
        ClientState state = this.clientState;
        if (state == ClientState.INITIALIZED_ON_CLUSTER && this.activeConnections.size() > 0) {
            return;
        }
        if (state == ClientState.INITIAL) {
            if (this.asyncStart) {
                throw new HazelcastClientOfflineException();
            }
            throw new IOException("No connection found to cluster since the client is starting.");
        }
        if (ClientConnectionStrategyConfig.ReconnectMode.ASYNC.equals((Object)this.reconnectMode)) {
            throw new HazelcastClientOfflineException();
        }
        throw new IOException("No connection found to cluster.");
    }

    Collection<Address> getPossibleMemberAddresses(AddressProvider addressProvider) {
        List memberAddresses = this.client.getClientClusterService().getMemberList().stream().map(Member::getAddress).collect(Collectors.toList());
        if (this.shuffleMemberList) {
            Collections.shuffle(memberAddresses);
        }
        LinkedHashSet<Address> addresses = new LinkedHashSet<Address>(memberAddresses);
        try {
            Addresses result = addressProvider.loadAddresses();
            if (this.shuffleMemberList) {
                Collections.shuffle(result.primary());
                Collections.shuffle(result.secondary());
            }
            addresses.addAll(result.primary());
            addresses.addAll(result.secondary());
        }
        catch (NullPointerException e) {
            throw e;
        }
        catch (Exception e) {
            this.logger.warning("Exception from AddressProvider: " + this.clusterDiscoveryService, e);
        }
        return addresses;
    }

    private void shutdownWithExternalThread() {
        new Thread(() -> {
            try {
                this.client.getLifecycleService().shutdown();
            }
            catch (Exception exception) {
                this.logger.severe("Exception during client shutdown", exception);
            }
        }, this.client.getName() + ".clientShutdown-").start();
    }

    @Override
    public Collection<ClientConnection> getActiveConnections() {
        return this.activeConnections.values();
    }

    @Override
    public boolean isAlive() {
        return this.isAlive.get();
    }

    @Override
    public UUID getClientUuid() {
        return this.clientUuid;
    }

    @Override
    public Connection getConnection(@Nonnull UUID uuid) {
        return (Connection)this.activeConnections.get(uuid);
    }

    private ClientConnection getConnection(@Nonnull Address address) {
        for (ClientConnection connection : this.activeConnections.values()) {
            if (!connection.getEndPoint().equals(address)) continue;
            return connection;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Connection getOrConnect(@Nonnull Address address) {
        this.checkClientActive();
        ClientConnection connection = this.getConnection(address);
        if (connection != null) {
            return connection;
        }
        InetSocketAddress inetSocketAddress = this.resolveAddress(address);
        synchronized (inetSocketAddress) {
            connection = this.getConnection(address);
            if (connection != null) {
                return connection;
            }
            address = this.translate(address);
            connection = this.createSocketConnection(address);
            this.authenticateOnCluster(connection);
            return connection;
        }
    }

    private void fireConnectionAddedEvent(ClientConnection connection) {
        for (ConnectionListener connectionListener : this.connectionListeners) {
            connectionListener.connectionAdded(connection);
        }
    }

    private void fireConnectionRemovedEvent(ClientConnection connection) {
        for (ConnectionListener listener : this.connectionListeners) {
            listener.connectionRemoved(connection);
        }
    }

    private boolean useAnyOutboundPort() {
        return this.outboundPortCount == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int acquireOutboundPort() {
        if (this.outboundPortCount == 0) {
            return 0;
        }
        LinkedList<Integer> linkedList = this.outboundPorts;
        synchronized (linkedList) {
            Integer port = this.outboundPorts.removeFirst();
            this.outboundPorts.addLast(port);
            return port;
        }
    }

    private void bindSocketToPort(Socket socket) throws IOException {
        if (this.useAnyOutboundPort()) {
            InetSocketAddress socketAddress = new InetSocketAddress(0);
            socket.bind(socketAddress);
        } else {
            int retryCount = this.outboundPortCount * 2;
            IOException ex = null;
            for (int i = 0; i < retryCount; ++i) {
                int port = this.acquireOutboundPort();
                if (port == 0) {
                    return;
                }
                InetSocketAddress socketAddress = new InetSocketAddress(port);
                try {
                    socket.bind(socketAddress);
                    return;
                }
                catch (IOException e) {
                    ex = e;
                    this.logger.finest("Could not bind port[ " + port + "]: " + e.getMessage());
                    continue;
                }
            }
            if (ex != null) {
                throw ex;
            }
        }
    }

    protected ClientConnection createSocketConnection(Address target) {
        CandidateClusterContext currentClusterContext = this.clusterDiscoveryService.current();
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            Socket socket = socketChannel.socket();
            this.bindSocketToPort(socket);
            ChannelInitializerProvider channelInitializer = currentClusterContext.getChannelInitializerProvider();
            Channel channel = this.networking.register(null, channelInitializer, socketChannel, true);
            channel.attributeMap().put(Address.class, target);
            channel.connect(this.resolveAddress(target), this.connectionTimeoutMillis);
            ClientConnection connection = new ClientConnection(this.client, this.connectionIdGen.incrementAndGet(), channel);
            socketChannel.configureBlocking(true);
            SocketInterceptor socketInterceptor = currentClusterContext.getSocketInterceptor();
            if (socketInterceptor != null) {
                socketInterceptor.onConnect(socket);
            }
            channel.start();
            return connection;
        }
        catch (Exception e) {
            IOUtil.closeResource(socketChannel);
            this.logger.finest(e);
            throw ExceptionUtil.rethrow(e);
        }
    }

    private Address translate(Address target) {
        CandidateClusterContext currentContext = this.clusterDiscoveryService.current();
        AddressProvider addressProvider = currentContext.getAddressProvider();
        try {
            Address translatedAddress = addressProvider.translate(target);
            if (translatedAddress == null) {
                throw new NullPointerException("Address Provider " + addressProvider.getClass() + " could not translate address " + target);
            }
            return translatedAddress;
        }
        catch (Exception e) {
            this.logger.warning("Failed to translate address " + target + " via address provider " + e.getMessage());
            throw ExceptionUtil.rethrow(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onConnectionClose(ClientConnection connection) {
        Address endpoint = connection.getEndPoint();
        UUID memberUuid = connection.getRemoteUuid();
        if (endpoint == null) {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Destroying " + connection + ", but it has end-point set to null -> not removing it from a connection map");
            }
            return;
        }
        Object object = this.clientStateMutex;
        synchronized (object) {
            if (this.activeConnections.remove(memberUuid, connection)) {
                this.logger.info("Removed connection to endpoint: " + endpoint + ":" + memberUuid + ", connection: " + connection);
                if (this.activeConnections.isEmpty()) {
                    if (this.clientState == ClientState.INITIALIZED_ON_CLUSTER) {
                        this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_DISCONNECTED);
                    }
                    this.triggerClusterReconnection();
                }
                this.fireConnectionRemovedEvent(connection);
            } else if (this.logger.isFinestEnabled()) {
                this.logger.finest("Destroying a connection, but there is no mapping " + endpoint + ":" + memberUuid + " -> " + connection + " in the connection map.");
            }
        }
    }

    private void triggerClusterReconnection() {
        if (this.reconnectMode == ClientConnectionStrategyConfig.ReconnectMode.OFF) {
            this.logger.info("RECONNECT MODE is off. Shutting down the client.");
            this.shutdownWithExternalThread();
            return;
        }
        if (this.client.getLifecycleService().isRunning()) {
            try {
                this.submitConnectToClusterTask();
            }
            catch (RejectedExecutionException r) {
                this.shutdownWithExternalThread();
            }
        }
    }

    @Override
    public void addConnectionListener(ConnectionListener connectionListener) {
        this.connectionListeners.add(connectionListener);
    }

    public Credentials getCurrentCredentials() {
        return this.currentCredentials;
    }

    public void reset() {
        for (ClientConnection activeConnection : this.activeConnections.values()) {
            activeConnection.close(null, new TargetDisconnectedException("Closing since client is switching cluster"));
        }
        this.inetSocketAddressCache.clear();
    }

    @Override
    public Connection getRandomConnection() {
        Connection connection;
        Member member;
        if (this.isSmartRoutingEnabled && (member = this.loadBalancer.next()) != null && (connection = this.getConnection(member.getUuid())) != null) {
            return connection;
        }
        Iterator iterator = this.activeConnections.values().iterator();
        return iterator.hasNext() ? (Connection)iterator.next() : null;
    }

    private void authenticateOnCluster(ClientConnection connection) {
        ClientAuthenticationCodec.ResponseParameters response;
        ClientMessage request = this.encodeAuthenticationRequest();
        ClientInvocationFuture future = new ClientInvocation(this.client, request, null, connection).invokeUrgent();
        try {
            response = ClientAuthenticationCodec.decodeResponse((ClientMessage)future.get(this.authenticationTimeout, TimeUnit.MILLISECONDS));
        }
        catch (Exception e) {
            connection.close("Failed to authenticate connection", e);
            throw ExceptionUtil.rethrow(e);
        }
        AuthenticationStatus authenticationStatus = AuthenticationStatus.getById(response.status);
        if (this.failoverConfigProvided && !response.failoverSupported) {
            this.logger.warning("Cluster does not support failover. This feature is available in Hazelcast Enterprise");
            authenticationStatus = AuthenticationStatus.NOT_ALLOWED_IN_CLUSTER;
        }
        switch (authenticationStatus) {
            case AUTHENTICATED: {
                this.handleSuccessfulAuth(connection, response);
                break;
            }
            case CREDENTIALS_FAILED: {
                AuthenticationException authException = new AuthenticationException("Invalid credentials!");
                connection.close("Failed to authenticate connection", authException);
                throw authException;
            }
            case NOT_ALLOWED_IN_CLUSTER: {
                ClientNotAllowedInClusterException notAllowedException = new ClientNotAllowedInClusterException("Client is not allowed in the cluster");
                connection.close("Failed to authenticate connection", notAllowedException);
                throw notAllowedException;
            }
            default: {
                AuthenticationException exception = new AuthenticationException("Authentication status code not supported. status: " + (Object)((Object)authenticationStatus));
                connection.close("Failed to authenticate connection", exception);
                throw exception;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleSuccessfulAuth(ClientConnection connection, ClientAuthenticationCodec.ResponseParameters response) {
        Object object = this.clientStateMutex;
        synchronized (object) {
            boolean initialConnection;
            boolean changedCluster;
            this.checkPartitionCount(response.partitionCount);
            connection.setConnectedServerVersion(response.serverHazelcastVersion);
            connection.setRemoteEndpoint(response.address);
            connection.setRemoteUuid(response.memberUuid);
            UUID newClusterId = response.clusterId;
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Checking the cluster: " + newClusterId + ", current cluster: " + this.clusterId);
            }
            boolean bl = changedCluster = (initialConnection = this.activeConnections.isEmpty()) && this.clusterId != null && !newClusterId.equals(this.clusterId);
            if (changedCluster) {
                this.logger.warning("Switching from current cluster: " + this.clusterId + " to new cluster: " + newClusterId);
                this.client.onClusterRestart();
            }
            this.activeConnections.put(response.memberUuid, connection);
            if (initialConnection) {
                this.clusterId = newClusterId;
                if (changedCluster) {
                    this.clientState = ClientState.CONNECTED_TO_CLUSTER;
                    this.executor.execute(() -> this.initializeClientOnCluster(newClusterId));
                } else {
                    this.clientState = ClientState.INITIALIZED_ON_CLUSTER;
                    this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_CONNECTED);
                }
            }
            this.logger.info("Authenticated with server " + response.address + ":" + response.memberUuid + ", server version: " + response.serverHazelcastVersion + ", local address: " + connection.getLocalSocketAddress());
            this.fireConnectionAddedEvent(connection);
        }
        if (!connection.isAlive()) {
            this.onConnectionClose(connection);
        }
    }

    private ClientMessage encodeAuthenticationRequest() {
        InternalSerializationService ss = this.client.getSerializationService();
        byte serializationVersion = ss.getVersion();
        CandidateClusterContext currentContext = this.clusterDiscoveryService.current();
        Credentials credentials = currentContext.getCredentialsFactory().newCredentials();
        String clusterName = currentContext.getClusterName();
        this.currentCredentials = credentials;
        if (credentials instanceof PasswordCredentials) {
            PasswordCredentials cr = (PasswordCredentials)credentials;
            return ClientAuthenticationCodec.encodeRequest(clusterName, cr.getName(), cr.getPassword(), this.clientUuid, this.connectionType, serializationVersion, BuildInfoProvider.getBuildInfo().getVersion(), this.client.getName(), this.labels);
        }
        byte[] secretBytes = credentials instanceof TokenCredentials ? ((TokenCredentials)credentials).getToken() : ss.toData(credentials).toByteArray();
        return ClientAuthenticationCustomCodec.encodeRequest(clusterName, secretBytes, this.clientUuid, this.connectionType, serializationVersion, BuildInfoProvider.getBuildInfo().getVersion(), this.client.getName(), this.labels);
    }

    protected void checkClientActive() {
        if (!this.client.getLifecycleService().isRunning()) {
            throw new HazelcastClientNotActiveException();
        }
    }

    private void checkPartitionCount(int newPartitionCount) {
        ClientPartitionServiceImpl partitionService = (ClientPartitionServiceImpl)this.client.getClientPartitionService();
        if (!partitionService.checkAndSetPartitionCount(newPartitionCount)) {
            throw new ClientNotAllowedInClusterException("Client can not work with this cluster because it has a different partition count. Expected partition count: " + partitionService.getPartitionCount() + ", Member partition count: " + newPartitionCount);
        }
    }

    private InetSocketAddress resolveAddress(Address target) {
        return ConcurrencyUtil.getOrPutIfAbsent(this.inetSocketAddressCache, target, arg -> {
            try {
                return new InetSocketAddress(target.getInetAddress(), target.getPort());
            }
            catch (UnknownHostException e) {
                throw ExceptionUtil.rethrow(e);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeClientOnCluster(UUID targetClusterId) {
        try {
            Object object = this.clientStateMutex;
            synchronized (object) {
                if (!targetClusterId.equals(this.clusterId)) {
                    this.logger.warning("Won't send client state to cluster: " + targetClusterId + " Because switched to a new cluster: " + this.clusterId);
                    return;
                }
            }
            this.client.sendStateToCluster();
            object = this.clientStateMutex;
            synchronized (object) {
                if (targetClusterId.equals(this.clusterId)) {
                    if (this.logger.isFineEnabled()) {
                        this.logger.fine("Client state is sent to cluster: " + targetClusterId);
                    }
                    this.clientState = ClientState.INITIALIZED_ON_CLUSTER;
                    this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_CONNECTED);
                } else if (this.logger.isFineEnabled()) {
                    this.logger.warning("Cannot set client state to " + (Object)((Object)ClientState.INITIALIZED_ON_CLUSTER) + " because current cluster id: " + this.clusterId + " is different than expected cluster id: " + targetClusterId);
                }
            }
        }
        catch (Exception e) {
            String clusterName = this.clusterDiscoveryService.current().getClusterName();
            this.logger.warning("Failure during sending state to the cluster.", e);
            Object object = this.clientStateMutex;
            synchronized (object) {
                if (targetClusterId.equals(this.clusterId)) {
                    if (this.logger.isFineEnabled()) {
                        this.logger.warning("Retrying sending state to the cluster: " + targetClusterId + ", name: " + clusterName);
                    }
                    this.executor.execute(() -> this.initializeClientOnCluster(targetClusterId));
                }
            }
        }
    }

    private class ConnectToAllClusterMembersTask
    implements Runnable {
        private Set<Address> connectingAddresses = Collections.newSetFromMap(new ConcurrentHashMap());

        private ConnectToAllClusterMembersTask() {
        }

        @Override
        public void run() {
            if (!ClientConnectionManagerImpl.this.client.getLifecycleService().isRunning()) {
                return;
            }
            for (Member member : ClientConnectionManagerImpl.this.client.getClientClusterService().getMemberList()) {
                Address address = member.getAddress();
                if (!ClientConnectionManagerImpl.this.client.getLifecycleService().isRunning() || ClientConnectionManagerImpl.this.getConnection(address) != null || !this.connectingAddresses.add(address)) continue;
                ClientConnectionManagerImpl.this.executor.submit(() -> {
                    try {
                        if (!ClientConnectionManagerImpl.this.client.getLifecycleService().isRunning()) {
                            return;
                        }
                        if (ClientConnectionManagerImpl.this.getConnection(member.getUuid()) == null) {
                            ClientConnectionManagerImpl.this.getOrConnect(address);
                        }
                    }
                    catch (Exception e) {
                        EmptyStatement.ignore(e);
                    }
                    finally {
                        this.connectingAddresses.remove(address);
                    }
                });
            }
        }
    }

    private class ClientConnectionChannelErrorHandler
    implements ChannelErrorHandler {
        private ClientConnectionChannelErrorHandler() {
        }

        @Override
        public void onError(Channel channel, Throwable cause) {
            if (channel == null) {
                ClientConnectionManagerImpl.this.logger.severe(cause);
            } else {
                if (cause instanceof OutOfMemoryError) {
                    ClientConnectionManagerImpl.this.logger.severe(cause);
                }
                Connection connection = (Connection)channel.attributeMap().get(ClientConnection.class);
                if (cause instanceof EOFException) {
                    connection.close("Connection closed by the other side", cause);
                } else {
                    connection.close("Exception in " + connection + ", thread=" + Thread.currentThread().getName(), cause);
                }
            }
        }
    }

    private static enum ClientState {
        INITIAL,
        CONNECTED_TO_CLUSTER,
        INITIALIZED_ON_CLUSTER;

    }
}

