/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.impl.connection.nio;

import com.hazelcast.client.AuthenticationException;
import com.hazelcast.client.ClientNotAllowedInClusterException;
import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.HazelcastClientOfflineException;
import com.hazelcast.client.LoadBalancer;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientConnectionStrategyConfig;
import com.hazelcast.client.config.ClientNetworkConfig;
import com.hazelcast.client.config.ConnectionRetryConfig;
import com.hazelcast.client.impl.clientside.CandidateClusterContext;
import com.hazelcast.client.impl.clientside.ClientDiscoveryService;
import com.hazelcast.client.impl.clientside.ClientLoggingService;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.clientside.LifecycleServiceImpl;
import com.hazelcast.client.impl.connection.AddressProvider;
import com.hazelcast.client.impl.connection.Addresses;
import com.hazelcast.client.impl.connection.ClientConnectionManager;
import com.hazelcast.client.impl.connection.nio.ClientConnection;
import com.hazelcast.client.impl.connection.nio.HeartbeatManager;
import com.hazelcast.client.impl.connection.nio.WaitStrategy;
import com.hazelcast.client.impl.management.ManagementCenterService;
import com.hazelcast.client.impl.protocol.AuthenticationStatus;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ClientAuthenticationCodec;
import com.hazelcast.client.impl.protocol.codec.ClientAuthenticationCustomCodec;
import com.hazelcast.client.impl.protocol.codec.ClientIsFailoverSupportedCodec;
import com.hazelcast.client.impl.spi.impl.ClientExecutionServiceImpl;
import com.hazelcast.client.impl.spi.impl.ClientInvocation;
import com.hazelcast.client.impl.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.properties.ClientProperty;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.ChannelErrorHandler;
import com.hazelcast.internal.networking.ChannelInitializerProvider;
import com.hazelcast.internal.networking.nio.NioNetworking;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.nio.ConnectionListener;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.impl.HeapData;
import com.hazelcast.internal.util.AddressUtil;
import com.hazelcast.internal.util.ConcurrencyUtil;
import com.hazelcast.internal.util.EmptyStatement;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.internal.util.executor.LoggingScheduledExecutor;
import com.hazelcast.internal.util.executor.PoolExecutorThreadFactory;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.SocketInterceptor;
import com.hazelcast.security.Credentials;
import com.hazelcast.security.PasswordCredentials;
import com.hazelcast.security.TokenCredentials;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.spi.properties.HazelcastProperties;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;

public class ClientConnectionManagerImpl
implements ClientConnectionManager {
    private static final int DEFAULT_SMART_CLIENT_THREAD_COUNT = 3;
    private static final int EXECUTOR_CORE_POOL_SIZE = 10;
    protected final AtomicInteger connectionIdGen = new AtomicInteger();
    private final AtomicBoolean isAlive = new AtomicBoolean();
    private final ILogger logger;
    private final int connectionTimeoutMillis;
    private final HazelcastClientInstanceImpl client;
    private final ConcurrentMap<Address, InetSocketAddress> inetSocketAddressCache = new ConcurrentHashMap<Address, InetSocketAddress>();
    private final ConcurrentMap<InetSocketAddress, ClientConnection> activeConnections = new ConcurrentHashMap<InetSocketAddress, ClientConnection>();
    private final Collection<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
    private final NioNetworking networking;
    private final HeartbeatManager heartbeat;
    private final long authenticationTimeout;
    private final String connectionType;
    private final UUID clientUuid = UuidUtil.newUnsecureUUID();
    private final LinkedList<Integer> outboundPorts = new LinkedList();
    private final Set<String> labels;
    private final int outboundPortCount;
    private final boolean failoverConfigProvided;
    private final ScheduledExecutorService clusterConnectionExecutor;
    private final boolean shuffleMemberList;
    private final WaitStrategy waitStrategy;
    private final ClientDiscoveryService discoveryService;
    private final AtomicInteger connectionCount = new AtomicInteger();
    private final boolean asyncStart;
    private final ClientConnectionStrategyConfig.ReconnectMode reconnectMode;
    private final LoadBalancer loadBalancer;
    private final boolean isSmartRoutingEnabled;
    private volatile Credentials currentCredentials;
    private volatile int partitionCount = -1;
    private volatile UUID clusterId;
    private volatile LifecycleEvent.LifecycleState state = LifecycleEvent.LifecycleState.STARTING;

    public ClientConnectionManagerImpl(HazelcastClientInstanceImpl client) {
        this.client = client;
        this.loadBalancer = client.getLoadBalancer();
        this.labels = Collections.unmodifiableSet(client.getClientConfig().getLabels());
        this.logger = client.getLoggingService().getLogger(ClientConnectionManager.class);
        this.connectionType = client.getProperties().getBoolean(ManagementCenterService.MC_CLIENT_MODE_PROP) ? "MCJVM" : "JVM";
        this.connectionTimeoutMillis = this.initConnectionTimeoutMillis();
        this.networking = this.initNetworking();
        this.outboundPorts.addAll(this.getOutboundPorts());
        this.outboundPortCount = this.outboundPorts.size();
        this.heartbeat = new HeartbeatManager(this, client);
        this.authenticationTimeout = this.heartbeat.getHeartbeatTimeout();
        this.failoverConfigProvided = client.getFailoverConfig() != null;
        this.clusterConnectionExecutor = this.createExecutorService();
        this.shuffleMemberList = client.getProperties().getBoolean(ClientProperty.SHUFFLE_MEMBER_LIST);
        this.discoveryService = client.getClientDiscoveryService();
        this.isSmartRoutingEnabled = client.getClientConfig().getNetworkConfig().isSmartRouting();
        this.waitStrategy = this.initializeWaitStrategy(client.getClientConfig());
        ClientConnectionStrategyConfig connectionStrategyConfig = client.getClientConfig().getConnectionStrategyConfig();
        this.asyncStart = connectionStrategyConfig.isAsyncStart();
        this.reconnectMode = connectionStrategyConfig.getReconnectMode();
    }

    private int initConnectionTimeoutMillis() {
        ClientNetworkConfig networkConfig = this.client.getClientConfig().getNetworkConfig();
        int connTimeout = networkConfig.getConnectionTimeout();
        return connTimeout == 0 ? Integer.MAX_VALUE : connTimeout;
    }

    private ScheduledExecutorService createExecutorService() {
        ClassLoader classLoader = this.client.getClientConfig().getClassLoader();
        String name = this.client.getName();
        return new LoggingScheduledExecutor(this.logger, 10, new PoolExecutorThreadFactory(name + ".internal-", classLoader), (r, executor) -> {
            String message = "Internal executor rejected task: " + r + ", because client is shutting down...";
            this.logger.finest(message);
            throw new RejectedExecutionException(message);
        });
    }

    private Collection<Integer> getOutboundPorts() {
        ClientNetworkConfig networkConfig = this.client.getClientConfig().getNetworkConfig();
        Collection<Integer> outboundPorts = networkConfig.getOutboundPorts();
        Collection<String> outboundPortDefinitions = networkConfig.getOutboundPortDefinitions();
        return AddressUtil.getOutboundPorts(outboundPorts, outboundPortDefinitions);
    }

    public NioNetworking getNetworking() {
        return this.networking;
    }

    protected NioNetworking initNetworking() {
        HazelcastProperties properties = this.client.getProperties();
        int configuredInputThreads = properties.getInteger(ClientProperty.IO_INPUT_THREAD_COUNT);
        int configuredOutputThreads = properties.getInteger(ClientProperty.IO_OUTPUT_THREAD_COUNT);
        int inputThreads = configuredInputThreads == -1 ? (this.isSmartRoutingEnabled ? 3 : 1) : configuredInputThreads;
        int outputThreads = configuredOutputThreads == -1 ? (this.isSmartRoutingEnabled ? 3 : 1) : configuredOutputThreads;
        return new NioNetworking(new NioNetworking.Context().loggingService(this.client.getLoggingService()).metricsRegistry(this.client.getMetricsRegistry()).threadNamePrefix(this.client.getName()).errorHandler(new ClientConnectionChannelErrorHandler()).inputThreadCount(inputThreads).outputThreadCount(outputThreads).balancerIntervalSeconds(properties.getInteger(ClientProperty.IO_BALANCER_INTERVAL_SECONDS)).writeThroughEnabled(properties.getBoolean(ClientProperty.IO_WRITE_THROUGH_ENABLED)).concurrencyDetection(this.client.getConcurrencyDetection()));
    }

    private WaitStrategy initializeWaitStrategy(ClientConfig clientConfig) {
        ClientConnectionStrategyConfig connectionStrategyConfig = clientConfig.getConnectionStrategyConfig();
        ConnectionRetryConfig expoRetryConfig = connectionStrategyConfig.getConnectionRetryConfig();
        return new WaitStrategy(expoRetryConfig.getInitialBackoffMillis(), expoRetryConfig.getMaxBackoffMillis(), expoRetryConfig.getMultiplier(), expoRetryConfig.getClusterConnectTimeoutMillis(), expoRetryConfig.getJitter(), this.logger);
    }

    public synchronized void start() {
        if (!this.isAlive.compareAndSet(false, true)) {
            return;
        }
        this.startNetworking();
        this.heartbeat.start();
        this.connectToCluster();
        if (this.isSmartRoutingEnabled) {
            this.clusterConnectionExecutor.scheduleWithFixedDelay(this::tryOpenConnectionToAllMembers, 1L, 1L, TimeUnit.SECONDS);
        }
    }

    public void tryOpenConnectionToAllMembers() {
        if (!this.isSmartRoutingEnabled) {
            return;
        }
        Collection<Member> memberList = this.client.getClientClusterService().getMemberList();
        for (Member member : memberList) {
            if (!this.client.getLifecycleService().isRunning()) {
                return;
            }
            Address address = member.getAddress();
            if (this.getConnection(address) != null) continue;
            try {
                this.getOrConnect(address);
            }
            catch (Exception e) {
                EmptyStatement.ignore(e);
            }
        }
    }

    protected void startNetworking() {
        this.networking.restart();
    }

    public synchronized void shutdown() {
        if (!this.isAlive.compareAndSet(true, false)) {
            return;
        }
        ClientExecutionServiceImpl.shutdownExecutor("cluster", this.clusterConnectionExecutor, this.logger);
        for (Connection connection : this.activeConnections.values()) {
            connection.close("Hazelcast client is shutting down", null);
        }
        this.stopNetworking();
        this.connectionListeners.clear();
        this.heartbeat.shutdown();
        this.discoveryService.current().destroy();
    }

    protected void stopNetworking() {
        this.networking.shutdown();
    }

    private void connectToCluster() {
        CandidateClusterContext currentClusterContext = this.discoveryService.current();
        currentClusterContext.start();
        if (this.asyncStart) {
            this.connectToClusterAsync();
        } else {
            this.connectToClusterSync();
        }
    }

    private void connectToClusterAsync() {
        this.clusterConnectionExecutor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    ClientConnectionManagerImpl.this.connectToClusterSync();
                }
                catch (Throwable e) {
                    ClientConnectionManagerImpl.this.logger.warning("Could not connect to any cluster, shutting down the client: " + e.getMessage());
                    ClientConnectionManagerImpl.this.shutdownWithExternalThread();
                }
            }
        });
    }

    private void connectToClusterSync() {
        CandidateClusterContext currentClusterContext = this.discoveryService.current();
        this.logger.info("Trying to connect to cluster with client name: " + currentClusterContext.getClusterName());
        if (this.connectToCandidate(currentClusterContext)) {
            return;
        }
        this.discoveryService.resetSearch();
        while (this.discoveryService.hasNext() && this.client.getLifecycleService().isRunning()) {
            this.discoveryService.current().destroy();
            CandidateClusterContext candidateClusterContext = this.discoveryService.next();
            candidateClusterContext.start();
            ((ClientLoggingService)this.client.getLoggingService()).updateClusterName(candidateClusterContext.getClusterName());
            this.logger.info("Trying to connect to next cluster with client name: " + candidateClusterContext.getClusterName());
            if (!this.connectToCandidate(candidateClusterContext)) continue;
            return;
        }
        if (!this.client.getLifecycleService().isRunning()) {
            throw new IllegalStateException("Client is being shutdown.");
        }
        throw new IllegalStateException("Unable to connect to any cluster.");
    }

    private Connection connect(Address address) {
        try {
            this.logger.info("Trying to connect to " + address);
            return this.getOrConnect(address);
        }
        catch (InvalidConfigurationException e) {
            this.logger.warning("Exception during initial connection to " + address + ": " + e);
            throw ExceptionUtil.rethrow(e);
        }
        catch (ClientNotAllowedInClusterException e) {
            this.logger.warning("Exception during initial connection to " + address + ": " + e);
            throw e;
        }
        catch (Exception e) {
            this.logger.warning("Exception during initial connection to " + address + ": " + e);
            return null;
        }
    }

    private void fireLifecycleEvent(LifecycleEvent.LifecycleState state) {
        LifecycleServiceImpl lifecycleService = (LifecycleServiceImpl)this.client.getLifecycleService();
        lifecycleService.fireLifecycleEvent(state);
    }

    private boolean connectToCandidate(CandidateClusterContext context) {
        HashSet<Address> triedAddresses = new HashSet<Address>();
        try {
            this.waitStrategy.reset();
            do {
                Collection<Address> addresses = this.getPossibleMemberAddresses(context.getAddressProvider());
                for (Address address : addresses) {
                    this.checkClientActive();
                    triedAddresses.add(address);
                    Connection connection = this.connect(address);
                    if (connection == null) continue;
                    return this.checkFailoverSupport(connection);
                }
                this.checkClientActive();
            } while (this.waitStrategy.sleep());
        }
        catch (ClientNotAllowedInClusterException | InvalidConfigurationException e) {
            this.logger.warning("Give up trying on this cluster with name: " + context.getClusterName() + " reason: " + e.getMessage());
        }
        this.logger.info("Unable to connect to any address from the cluster with name: " + context.getClusterName() + ". The following addresses were tried: " + triedAddresses);
        return false;
    }

    @Override
    public void checkInvocationAllowed() throws IOException {
        LifecycleEvent.LifecycleState state = this.state;
        if (state.equals((Object)LifecycleEvent.LifecycleState.CLIENT_CONNECTED)) {
            return;
        }
        if (state.equals((Object)LifecycleEvent.LifecycleState.STARTING)) {
            if (this.asyncStart) {
                throw new HazelcastClientOfflineException();
            }
            throw new IOException("No connection found to cluster since the client is starting.");
        }
        if (ClientConnectionStrategyConfig.ReconnectMode.ASYNC.equals((Object)this.reconnectMode)) {
            throw new HazelcastClientOfflineException();
        }
        throw new IOException("No connection found to cluster.");
    }

    Collection<Address> getPossibleMemberAddresses(AddressProvider addressProvider) {
        LinkedHashSet addresses = new LinkedHashSet();
        Collection<Member> memberList = this.client.getClientClusterService().getMemberList();
        for (Member member : memberList) {
            addresses.add(member.getAddress());
        }
        if (this.shuffleMemberList) {
            addresses = (LinkedHashSet)ClientConnectionManagerImpl.shuffle(addresses);
        }
        LinkedHashSet<Address> providedAddresses = new LinkedHashSet<Address>();
        try {
            Addresses result = addressProvider.loadAddresses();
            if (this.shuffleMemberList) {
                Collections.shuffle(result.primary());
                Collections.shuffle(result.secondary());
            }
            providedAddresses.addAll(result.primary());
            providedAddresses.addAll(result.secondary());
        }
        catch (NullPointerException e) {
            throw e;
        }
        catch (Exception e) {
            this.logger.warning("Exception from AddressProvider: " + this.discoveryService, e);
        }
        addresses.addAll(providedAddresses);
        return addresses;
    }

    private static <T> Set<T> shuffle(Set<T> set) {
        ArrayList<T> shuffleMe = new ArrayList<T>(set);
        Collections.shuffle(shuffleMe);
        return new LinkedHashSet<T>(shuffleMe);
    }

    private void triggerReconnectToCluster() {
        if (this.reconnectMode == ClientConnectionStrategyConfig.ReconnectMode.OFF) {
            this.logger.info("RECONNECT MODE is off. Shutting down the client");
            this.shutdownWithExternalThread();
            return;
        }
        if (this.client.getLifecycleService().isRunning()) {
            try {
                this.connectToClusterAsync();
            }
            catch (RejectedExecutionException r) {
                this.shutdownWithExternalThread();
            }
        }
    }

    private void shutdownWithExternalThread() {
        new Thread(() -> {
            try {
                this.client.getLifecycleService().shutdown();
            }
            catch (Exception exception) {
                this.logger.severe("Exception during client shutdown ", exception);
            }
        }, this.client.getName() + ".clientShutdown-").start();
    }

    @Override
    public Collection<ClientConnection> getActiveConnections() {
        return this.activeConnections.values();
    }

    @Override
    public boolean isAlive() {
        return this.isAlive.get();
    }

    @Override
    public UUID getClientUuid() {
        return this.clientUuid;
    }

    @Override
    public Connection getConnection(@Nonnull Address target) {
        this.checkClientActive();
        return (Connection)this.activeConnections.get(this.resolveAddress(target));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Connection getOrConnect(@Nonnull Address address) {
        this.checkClientActive();
        InetSocketAddress inetSocketAddress = this.resolveAddress(address);
        ClientConnection connection = (ClientConnection)this.activeConnections.get(inetSocketAddress);
        if (connection != null) {
            return connection;
        }
        InetSocketAddress inetSocketAddress2 = inetSocketAddress;
        synchronized (inetSocketAddress2) {
            connection = (ClientConnection)this.activeConnections.get(inetSocketAddress);
            if (connection != null) {
                return connection;
            }
            address = this.translate(address);
            connection = this.createSocketConnection(address);
            this.initializeToCluster(connection);
            return connection;
        }
    }

    private void fireConnectionAddedEvent(ClientConnection connection) {
        for (ConnectionListener connectionListener : this.connectionListeners) {
            connectionListener.connectionAdded(connection);
        }
    }

    private void fireConnectionRemovedEvent(ClientConnection connection) {
        for (ConnectionListener listener : this.connectionListeners) {
            listener.connectionRemoved(connection);
        }
    }

    private boolean useAnyOutboundPort() {
        return this.outboundPortCount == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int acquireOutboundPort() {
        if (this.outboundPortCount == 0) {
            return 0;
        }
        LinkedList<Integer> linkedList = this.outboundPorts;
        synchronized (linkedList) {
            Integer port = this.outboundPorts.removeFirst();
            this.outboundPorts.addLast(port);
            return port;
        }
    }

    private void bindSocketToPort(Socket socket) throws IOException {
        if (!this.useAnyOutboundPort()) {
            int retryCount = this.outboundPortCount * 2;
            IOException ex = null;
            for (int i = 0; i < retryCount; ++i) {
                int port = this.acquireOutboundPort();
                if (port == 0) {
                    return;
                }
                InetSocketAddress socketAddress = new InetSocketAddress(port);
                try {
                    socket.bind(socketAddress);
                    return;
                }
                catch (IOException e) {
                    ex = e;
                    this.logger.finest("Could not bind port[ " + port + "]: " + e.getMessage());
                    continue;
                }
            }
            throw ex;
        }
        InetSocketAddress socketAddress = new InetSocketAddress(0);
        socket.bind(socketAddress);
    }

    protected ClientConnection createSocketConnection(Address target) {
        CandidateClusterContext currentClusterContext = this.discoveryService.current();
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            Socket socket = socketChannel.socket();
            this.bindSocketToPort(socket);
            ChannelInitializerProvider channelInitializer = currentClusterContext.getChannelInitializerProvider();
            Channel channel = this.networking.register(null, channelInitializer, socketChannel, true);
            channel.connect(this.resolveAddress(target), this.connectionTimeoutMillis);
            ClientConnection connection = new ClientConnection(this.client, this.connectionIdGen.incrementAndGet(), channel);
            socketChannel.configureBlocking(true);
            SocketInterceptor socketInterceptor = currentClusterContext.getSocketInterceptor();
            if (socketInterceptor != null) {
                socketInterceptor.onConnect(socket);
            }
            channel.start();
            return connection;
        }
        catch (Exception e) {
            IOUtil.closeResource(socketChannel);
            this.logger.finest(e);
            throw ExceptionUtil.rethrow(e);
        }
    }

    private Address translate(Address target) {
        CandidateClusterContext currentClusterContext = this.discoveryService.current();
        AddressProvider addressProvider = currentClusterContext.getAddressProvider();
        try {
            Address translatedAddress = addressProvider.translate(target);
            if (translatedAddress == null) {
                throw new NullPointerException("Address Provider " + addressProvider.getClass() + " could not translate address " + target);
            }
            return translatedAddress;
        }
        catch (Exception e) {
            this.logger.warning("Failed to translate address " + target + " via address provider " + e.getMessage());
            throw ExceptionUtil.rethrow(e);
        }
    }

    void onClose(Connection connection) {
        this.removeFromActiveConnections((ClientConnection)connection);
    }

    private void removeFromActiveConnections(ClientConnection connection) {
        Address endpoint = connection.getEndPoint();
        if (endpoint == null) {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Destroying " + connection + ", but it has end-point set to null -> not removing it from a connection map");
            }
            return;
        }
        if (this.activeConnections.remove(this.resolveAddress(endpoint), connection)) {
            this.logger.info("Removed connection to endpoint: " + endpoint + ", connection: " + connection);
            if (this.connectionCount.decrementAndGet() == 0) {
                this.state = LifecycleEvent.LifecycleState.CLIENT_DISCONNECTED;
                this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_DISCONNECTED);
                this.triggerReconnectToCluster();
            }
            this.fireConnectionRemovedEvent(connection);
        } else if (this.logger.isFinestEnabled()) {
            this.logger.finest("Destroying a connection, but there is no mapping " + endpoint + " -> " + connection + " in the connection map.");
        }
    }

    @Override
    public void addConnectionListener(ConnectionListener connectionListener) {
        this.connectionListeners.add(connectionListener);
    }

    public Credentials getCurrentCredentials() {
        return this.currentCredentials;
    }

    public void reset() {
        for (ClientConnection activeConnection : this.activeConnections.values()) {
            activeConnection.close(null, new TargetDisconnectedException("Closing since client is switching cluster"));
        }
        this.inetSocketAddressCache.clear();
    }

    @Override
    public Connection getRandomConnection() {
        Member member;
        Connection connection = null;
        if (this.isSmartRoutingEnabled && (member = this.loadBalancer.next()) != null) {
            connection = this.getConnection(member.getAddress());
        }
        if (connection != null) {
            return connection;
        }
        Iterator iterator = this.activeConnections.values().iterator();
        if (iterator.hasNext()) {
            return (Connection)iterator.next();
        }
        return null;
    }

    private void initializeToCluster(ClientConnection connection) {
        ClientMessage response;
        ClientMessage clientMessage = this.encodeAuthenticationRequest();
        ClientInvocation clientInvocation = new ClientInvocation(this.client, clientMessage, null, connection);
        ClientInvocationFuture invocationFuture = clientInvocation.invokeUrgent();
        try {
            response = (ClientMessage)invocationFuture.get(this.authenticationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            connection.close("Failed to authenticate connection", e);
            throw ExceptionUtil.rethrow(e);
        }
        ClientAuthenticationCodec.ResponseParameters result = ClientAuthenticationCodec.decodeResponse(response);
        AuthenticationStatus authenticationStatus = AuthenticationStatus.getById(result.status);
        switch (authenticationStatus) {
            case AUTHENTICATED: {
                this.checkPartitionCount(result.partitionCount);
                connection.setConnectedServerVersion(result.serverHazelcastVersion);
                connection.setRemoteEndpoint(result.address);
                UUID newClusterId = result.clusterId;
                if (this.logger.isFineEnabled()) {
                    this.logger.fine("Checking the cluster id.Old: " + this.clusterId + ", new: " + newClusterId);
                }
                boolean changedCluster = this.clusterId != null && !newClusterId.equals(this.clusterId);
                this.clusterId = newClusterId;
                if (changedCluster) {
                    this.client.clear();
                }
                this.activeConnections.put(this.resolveAddress(result.address), connection);
                this.fireConnectionAddedEvent(connection);
                this.logger.info("Authenticated with server " + result.address + ", server version:" + connection.getConnectedServerVersion() + " Local address: " + connection.getLocalSocketAddress());
                this.establishConnectedState(newClusterId, changedCluster);
                if (connection.isAlive()) break;
                this.removeFromActiveConnections(connection);
                break;
            }
            case CREDENTIALS_FAILED: {
                AuthenticationException authException = new AuthenticationException("Invalid credentials!");
                connection.close("Failed to authenticate connection", authException);
                throw authException;
            }
            case NOT_ALLOWED_IN_CLUSTER: {
                ClientNotAllowedInClusterException notAllowedException = new ClientNotAllowedInClusterException("Client is not allowed in the cluster");
                connection.close("Failed to authenticate connection", notAllowedException);
                throw notAllowedException;
            }
            default: {
                AuthenticationException exception = new AuthenticationException("Authentication status code not supported. status: " + (Object)((Object)authenticationStatus));
                connection.close("Failed to authenticate connection", exception);
                throw exception;
            }
        }
    }

    private void establishConnectedState(UUID newClusterId, boolean changedCluster) {
        boolean isFirstConnectionAfterDisconnection;
        boolean bl = isFirstConnectionAfterDisconnection = this.connectionCount.incrementAndGet() == 1;
        if (isFirstConnectionAfterDisconnection) {
            if (changedCluster) {
                this.clusterConnectionExecutor.execute(() -> this.sendStatesToCluster(newClusterId));
            } else {
                this.state = LifecycleEvent.LifecycleState.CLIENT_CONNECTED;
                this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_CONNECTED);
            }
        }
    }

    private ClientMessage encodeAuthenticationRequest() {
        InternalSerializationService ss = this.client.getSerializationService();
        byte serializationVersion = ss.getVersion();
        CandidateClusterContext currentClusterContext = this.discoveryService.current();
        Credentials credentials = currentClusterContext.getCredentialsFactory().newCredentials();
        String clusterName = currentClusterContext.getClusterName();
        this.currentCredentials = credentials;
        if (credentials instanceof PasswordCredentials) {
            PasswordCredentials cr = (PasswordCredentials)credentials;
            return ClientAuthenticationCodec.encodeRequest(clusterName, cr.getName(), cr.getPassword(), this.clientUuid, this.connectionType, serializationVersion, BuildInfoProvider.getBuildInfo().getVersion(), this.client.getName(), this.labels);
        }
        Object data = credentials instanceof TokenCredentials ? new HeapData(((TokenCredentials)credentials).getToken()) : ss.toData(credentials);
        return ClientAuthenticationCustomCodec.encodeRequest(clusterName, data, this.clientUuid, this.connectionType, serializationVersion, BuildInfoProvider.getBuildInfo().getVersion(), this.client.getName(), this.labels);
    }

    protected void checkClientActive() {
        if (!this.client.getLifecycleService().isRunning()) {
            throw new HazelcastClientNotActiveException();
        }
    }

    private void checkPartitionCount(int newPartitionCount) {
        if (this.partitionCount == -1) {
            this.partitionCount = newPartitionCount;
        } else if (this.partitionCount != newPartitionCount) {
            throw new ClientNotAllowedInClusterException("Client can not work with this cluster  because it has a different partition count. Partition count client expects :" + this.partitionCount + ", Member partition count:" + newPartitionCount);
        }
    }

    private boolean checkFailoverSupport(Connection connection) {
        if (!this.failoverConfigProvided) {
            return true;
        }
        ClientMessage isFailoverSupportedMessage = ClientIsFailoverSupportedCodec.encodeRequest();
        ClientInvocationFuture future = new ClientInvocation(this.client, isFailoverSupportedMessage, null, connection).invoke();
        try {
            boolean isAllowed = ClientIsFailoverSupportedCodec.decodeResponse((ClientMessage)((ClientMessage)future.get())).response;
            if (!isAllowed) {
                this.logger.warning("Cluster does not support failover. This feature is available in Hazelcast Enterprise");
            }
            return isAllowed;
        }
        catch (InterruptedException | ExecutionException e) {
            this.logger.warning("Cluster did not answer to failover support query. ", e);
            return false;
        }
    }

    private InetSocketAddress resolveAddress(Address target) {
        return ConcurrencyUtil.getOrPutIfAbsent(this.inetSocketAddressCache, target, arg -> {
            try {
                return new InetSocketAddress(target.getInetAddress(), target.getPort());
            }
            catch (UnknownHostException e) {
                throw ExceptionUtil.rethrow(e);
            }
        });
    }

    private void sendStatesToCluster(UUID targetClusterId) {
        block4: {
            try {
                if (targetClusterId.equals(this.clusterId)) {
                    this.client.sendStateToCluster();
                    this.state = LifecycleEvent.LifecycleState.CLIENT_CONNECTED;
                    this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_CONNECTED);
                    if (this.failoverConfigProvided) {
                        this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_CHANGED_CLUSTER);
                    }
                }
            }
            catch (Exception e) {
                String clusterName = this.discoveryService.current().getClusterName();
                this.logger.warning("Got exception when trying to send state to cluster. ", e);
                if (!targetClusterId.equals(this.clusterId)) break block4;
                this.logger.warning("Retrying sending state to cluster with uuid" + targetClusterId + ", name " + clusterName);
                this.clusterConnectionExecutor.execute(() -> this.sendStatesToCluster(targetClusterId));
            }
        }
    }

    private class ClientConnectionChannelErrorHandler
    implements ChannelErrorHandler {
        private ClientConnectionChannelErrorHandler() {
        }

        @Override
        public void onError(Channel channel, Throwable cause) {
            if (channel == null) {
                ClientConnectionManagerImpl.this.logger.severe(cause);
            } else {
                if (cause instanceof OutOfMemoryError) {
                    ClientConnectionManagerImpl.this.logger.severe(cause);
                }
                Connection connection = (Connection)channel.attributeMap().get(ClientConnection.class);
                if (cause instanceof EOFException) {
                    connection.close("Connection closed by the other side", cause);
                } else {
                    connection.close("Exception in " + connection + ", thread=" + Thread.currentThread().getName(), cause);
                }
            }
        }
    }
}

