/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.AbstractReconnectionHandler;
import com.datastax.driver.core.CloseFuture;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ClusterNameMismatchException;
import com.datastax.driver.core.Connection;
import com.datastax.driver.core.DefaultResultSetFuture;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.ProtocolEvent;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.Requests;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SchemaElement;
import com.datastax.driver.core.SchemaParser;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.SystemProperties;
import com.datastax.driver.core.Token;
import com.datastax.driver.core.VersionNumber;
import com.datastax.driver.core.exceptions.BusyConnectionException;
import com.datastax.driver.core.exceptions.ConnectionException;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.DriverInternalError;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.UnsupportedProtocolVersionException;
import com.datastax.driver.core.utils.MoreObjects;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ControlConnection
implements Connection.Owner {
    private static final Logger logger = LoggerFactory.getLogger(ControlConnection.class);
    private static final boolean EXTENDED_PEER_CHECK = SystemProperties.getBoolean("com.datastax.driver.EXTENDED_PEER_CHECK", true);
    private static final InetAddress bindAllAddress;
    private static final String SELECT_PEERS = "SELECT * FROM system.peers";
    private static final String SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'";
    private static final String SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers";
    private static final String SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'";
    @VisibleForTesting
    final AtomicReference<Connection> connectionRef = new AtomicReference();
    private final Cluster.Manager cluster;
    private final AtomicReference<ListenableFuture<?>> reconnectionAttempt = new AtomicReference();
    private volatile boolean isShutdown;

    public ControlConnection(Cluster.Manager manager) {
        this.cluster = manager;
    }

    void connect() throws UnsupportedProtocolVersionException {
        if (this.isShutdown) {
            return;
        }
        ArrayList<Host> hosts = new ArrayList<Host>(this.cluster.metadata.allHosts());
        Collections.shuffle(hosts);
        this.setNewConnection(this.reconnectInternal(hosts.iterator(), true));
    }

    CloseFuture closeAsync() {
        Connection connection;
        this.isShutdown = true;
        ListenableFuture<?> r = this.reconnectionAttempt.get();
        if (r != null) {
            r.cancel(false);
        }
        return (connection = this.connectionRef.get()) == null ? CloseFuture.immediateFuture() : connection.closeAsync().force();
    }

    Host connectedHost() {
        Connection current = this.connectionRef.get();
        return current == null ? null : this.cluster.metadata.getHost(current.address);
    }

    void triggerReconnect() {
        this.backgroundReconnect(0L);
    }

    private void backgroundReconnect(long initialDelayMs) {
        if (this.isShutdown) {
            return;
        }
        ListenableFuture<?> reconnection = this.reconnectionAttempt.get();
        if (reconnection != null && !reconnection.isDone()) {
            return;
        }
        new AbstractReconnectionHandler("Control connection", this.cluster.reconnectionExecutor, this.cluster.reconnectionPolicy().newSchedule(), this.reconnectionAttempt, initialDelayMs){

            @Override
            protected Connection tryReconnect() throws ConnectionException {
                if (ControlConnection.this.isShutdown) {
                    throw new ConnectionException(null, "Control connection was shut down");
                }
                try {
                    return ControlConnection.this.reconnectInternal(ControlConnection.this.queryPlan(), false);
                }
                catch (NoHostAvailableException e) {
                    throw new ConnectionException(null, e.getMessage());
                }
                catch (UnsupportedProtocolVersionException e) {
                    throw new AssertionError();
                }
            }

            @Override
            protected void onReconnection(Connection connection) {
                if (ControlConnection.this.isShutdown) {
                    connection.closeAsync().force();
                    return;
                }
                ControlConnection.this.setNewConnection(connection);
            }

            @Override
            protected boolean onConnectionException(ConnectionException e, long nextDelayMs) {
                if (ControlConnection.this.isShutdown) {
                    return false;
                }
                logger.error("[Control connection] Cannot connect to any host, scheduling retry in {} milliseconds", (Object)nextDelayMs);
                return true;
            }

            @Override
            protected boolean onUnknownException(Exception e, long nextDelayMs) {
                if (ControlConnection.this.isShutdown) {
                    return false;
                }
                logger.error(String.format("[Control connection] Unknown error during reconnection, scheduling retry in %d milliseconds", nextDelayMs), (Throwable)e);
                return true;
            }
        }.start();
    }

    private Iterator<Host> queryPlan() {
        return this.cluster.loadBalancingPolicy().newQueryPlan(null, Statement.DEFAULT);
    }

    private void signalError() {
        Connection connection = this.connectionRef.get();
        if (connection != null) {
            connection.closeAsync().force();
        }
        this.backgroundReconnect(0L);
    }

    private void setNewConnection(Connection newConnection) {
        Host.statesLogger.debug("[Control connection] established to {}", (Object)newConnection.address);
        newConnection.setOwner(this);
        Connection old = this.connectionRef.getAndSet(newConnection);
        if (old != null && !old.isClosed()) {
            old.closeAsync().force();
        }
    }

    private Connection reconnectInternal(Iterator<Host> iter, boolean isInitialConnection) throws UnsupportedProtocolVersionException {
        Map<InetSocketAddress, Throwable> errors = null;
        Host host = null;
        try {
            while (iter.hasNext()) {
                host = iter.next();
                if (!host.convictionPolicy.canReconnectNow()) continue;
                try {
                    return this.tryConnect(host, isInitialConnection);
                }
                catch (ConnectionException e) {
                    errors = ControlConnection.logError(host, e, errors, iter);
                    if (!isInitialConnection) continue;
                    host.setDown();
                }
                catch (ExecutionException e) {
                    errors = ControlConnection.logError(host, e.getCause(), errors, iter);
                }
                catch (UnsupportedProtocolVersionException e) {
                    if (isInitialConnection) {
                        throw e;
                    }
                    logger.debug("Ignoring host {}: {}", (Object)host, (Object)e.getMessage());
                    errors = ControlConnection.logError(host, e, errors, iter);
                }
                catch (ClusterNameMismatchException e) {
                    logger.debug("Ignoring host {}: {}", (Object)host, (Object)e.getMessage());
                    errors = ControlConnection.logError(host, e, errors, iter);
                }
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            errors = ControlConnection.logError(host, new DriverException("Connection thread interrupted"), errors, iter);
            while (iter.hasNext()) {
                errors = ControlConnection.logError(iter.next(), new DriverException("Connection thread interrupted"), errors, iter);
            }
        }
        throw new NoHostAvailableException(errors == null ? Collections.emptyMap() : errors);
    }

    private static Map<InetSocketAddress, Throwable> logError(Host host, Throwable exception, Map<InetSocketAddress, Throwable> errors, Iterator<Host> iter) {
        if (errors == null) {
            errors = new HashMap<InetSocketAddress, Throwable>();
        }
        errors.put(host.getSocketAddress(), exception);
        if (logger.isDebugEnabled()) {
            if (iter.hasNext()) {
                logger.debug(String.format("[Control connection] error on %s connection, trying next host", host), exception);
            } else {
                logger.debug(String.format("[Control connection] error on %s connection, no more host to try", host), exception);
            }
        }
        return errors;
    }

    private Connection tryConnect(Host host, boolean isInitialConnection) throws ConnectionException, ExecutionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException {
        Connection connection = this.cluster.connectionFactory.open(host);
        if (this.cluster.connectionFactory.protocolVersion == null) {
            this.cluster.connectionFactory.protocolVersion = ProtocolVersion.NEWEST_SUPPORTED;
        }
        try {
            logger.trace("[Control connection] Registering for events");
            List<ProtocolEvent.Type> evs = Arrays.asList(ProtocolEvent.Type.TOPOLOGY_CHANGE, ProtocolEvent.Type.STATUS_CHANGE, ProtocolEvent.Type.SCHEMA_CHANGE);
            connection.write(new Requests.Register(evs));
            ControlConnection.refreshNodeListAndTokenMap(connection, this.cluster, isInitialConnection, true);
            logger.debug("[Control connection] Refreshing schema");
            ControlConnection.refreshSchema(connection, null, null, null, null, this.cluster);
            return connection;
        }
        catch (BusyConnectionException e) {
            connection.closeAsync().force();
            throw new DriverInternalError("Newly created connection should not be busy");
        }
        catch (InterruptedException e) {
            connection.closeAsync().force();
            throw e;
        }
        catch (ConnectionException e) {
            connection.closeAsync().force();
            throw e;
        }
        catch (ExecutionException e) {
            connection.closeAsync().force();
            throw e;
        }
        catch (RuntimeException e) {
            connection.closeAsync().force();
            throw e;
        }
    }

    public void refreshSchema(SchemaElement targetType, String targetKeyspace, String targetName, List<String> signature) throws InterruptedException {
        logger.debug("[Control connection] Refreshing schema for {}{}", (Object)(targetType == null ? "everything" : targetKeyspace), (Object)(targetType == SchemaElement.KEYSPACE ? "" : "." + targetName + " (" + (Object)((Object)targetType) + ")"));
        try {
            Connection c = this.connectionRef.get();
            if (c == null || c.isClosed()) {
                return;
            }
            ControlConnection.refreshSchema(c, targetType, targetKeyspace, targetName, signature, this.cluster);
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refreshing schema ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            if (!this.isShutdown) {
                logger.error("[Control connection] Unexpected error while refreshing schema", (Throwable)e);
            }
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
    }

    static void refreshSchema(Connection connection, SchemaElement targetType, String targetKeyspace, String targetName, List<String> targetSignature, Cluster.Manager cluster) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        VersionNumber cassandraVersion;
        Host host = cluster.metadata.getHost(connection.address);
        if (host == null || host.getCassandraVersion() == null) {
            cassandraVersion = cluster.protocolVersion().minCassandraVersion();
            logger.warn("Cannot find Cassandra version for host {} to parse the schema, using {} based on protocol version in use. If parsing the schema fails, this could be the cause", (Object)connection.address, (Object)cassandraVersion);
        } else {
            cassandraVersion = host.getCassandraVersion();
        }
        SchemaParser.forVersion(cassandraVersion).refresh(cluster.getCluster(), targetType, targetKeyspace, targetName, targetSignature, connection, cassandraVersion);
    }

    void refreshNodeListAndTokenMap() {
        Connection c = this.connectionRef.get();
        if (c == null || c.isClosed()) {
            return;
        }
        try {
            ControlConnection.refreshNodeListAndTokenMap(c, this.cluster, false, true);
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refreshing node list and token map ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            if (!this.isShutdown) {
                logger.error("[Control connection] Unexpected error while refreshing node list and token map", (Throwable)e);
            }
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("[Control connection] Interrupted while refreshing node list and token map, skipping it.");
        }
    }

    private static InetSocketAddress rpcAddressForPeerHost(Row peersRow, InetSocketAddress connectedHost, Cluster.Manager cluster) {
        InetAddress broadcastAddress = peersRow.getInet("peer");
        InetAddress rpcAddress = peersRow.getInet("rpc_address");
        if (broadcastAddress == null) {
            return null;
        }
        if (broadcastAddress.equals(connectedHost.getAddress()) || rpcAddress != null && rpcAddress.equals(connectedHost.getAddress())) {
            logger.debug("System.peers on node {} has a line for itself. This is not normal but is a known problem of some DSE version. Ignoring the entry.", (Object)connectedHost);
            return null;
        }
        if (rpcAddress == null) {
            return null;
        }
        if (rpcAddress.equals(bindAllAddress)) {
            logger.warn("Found host with 0.0.0.0 as rpc_address, using broadcast_address ({}) to contact it instead. If this is incorrect you should avoid the use of 0.0.0.0 server side.", (Object)broadcastAddress);
            rpcAddress = broadcastAddress;
        }
        return cluster.translateAddress(rpcAddress);
    }

    private Row fetchNodeInfo(Host host, Connection c) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        DefaultResultSetFuture future;
        boolean isConnectedHost = c.address.equals(host.getSocketAddress());
        if (isConnectedHost || host.getBroadcastAddress() != null) {
            future = isConnectedHost ? new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL)) : new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query("SELECT * FROM system.peers WHERE peer='" + host.getBroadcastAddress().getHostAddress() + '\''));
            c.write(future);
            Row row = ((ResultSet)future.get()).one();
            if (row != null) {
                return row;
            }
            logger.debug("Could not find peer with broadcast address {}, falling back to a full system.peers scan to fetch info for {} (this can happen if the broadcast address changed)", (Object)host.getBroadcastAddress(), (Object)host);
        }
        future = new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
        c.write(future);
        for (Row row : (ResultSet)future.get()) {
            InetSocketAddress addr = ControlConnection.rpcAddressForPeerHost(row, c.address, this.cluster);
            if (addr == null || !addr.equals(host.getSocketAddress())) continue;
            return row;
        }
        return null;
    }

    boolean refreshNodeInfo(Host host) {
        Connection c = this.connectionRef.get();
        if (c == null || c.isClosed()) {
            return true;
        }
        logger.debug("[Control connection] Refreshing node info on {}", (Object)host);
        try {
            Row row = this.fetchNodeInfo(host, c);
            if (row == null) {
                if (c.isDefunct()) {
                    logger.debug("Control connection is down, could not refresh node info");
                    return true;
                }
                logger.warn("No row found for host {} in {}'s peers system table. {} will be ignored.", new Object[]{host.getAddress(), c.address, host.getAddress()});
                return false;
            }
            if (!c.address.equals(host.getSocketAddress()) && !ControlConnection.isValidPeer(row, true)) {
                return false;
            }
            ControlConnection.updateInfo(host, row, this.cluster, false);
            return true;
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refreshing node info ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            if (!this.isShutdown) {
                logger.debug("[Control connection] Unexpected error while refreshing node info", (Throwable)e);
            }
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("[Control connection] Interrupted while refreshing node info, skipping it.");
        }
        catch (Exception e) {
            logger.debug("[Control connection] Unexpected error while refreshing node info", (Throwable)e);
            this.signalError();
        }
        return true;
    }

    private static void updateInfo(Host host, Row row, Cluster.Manager cluster, boolean isInitialConnection) {
        if (!row.isNull("data_center") || !row.isNull("rack")) {
            ControlConnection.updateLocationInfo(host, row.getString("data_center"), row.getString("rack"), isInitialConnection, cluster);
        }
        String version = row.getString("release_version");
        host.setVersion(version);
        InetAddress broadcastAddress = null;
        if (row.getColumnDefinitions().contains("peer")) {
            broadcastAddress = row.getInet("peer");
        } else if (row.getColumnDefinitions().contains("broadcast_address")) {
            broadcastAddress = row.getInet("broadcast_address");
        }
        host.setBroadcastAddress(broadcastAddress);
        InetAddress listenAddress = row.getColumnDefinitions().contains("listen_address") ? row.getInet("listen_address") : null;
        host.setListenAddress(listenAddress);
        if (row.getColumnDefinitions().contains("workloads")) {
            Set<String> dseWorkloads = row.getSet("workloads", String.class);
            host.setDseWorkloads(dseWorkloads);
        } else if (row.getColumnDefinitions().contains("workload") && row.getString("workload") != null) {
            String dseWorkload = row.getString("workload");
            host.setDseWorkloads(Collections.singleton(dseWorkload));
        } else {
            host.setDseWorkloads(Collections.<String>emptySet());
        }
        if (row.getColumnDefinitions().contains("graph")) {
            boolean isDseGraph = row.getBool("graph");
            host.setDseGraphEnabled(isDseGraph);
        }
        if (row.getColumnDefinitions().contains("dse_version")) {
            String dseVersion = row.getString("dse_version");
            host.setDseVersion(dseVersion);
        }
        host.setHostId(row.getUUID("host_id"));
        host.setSchemaVersion(row.getUUID("schema_version"));
    }

    private static void updateLocationInfo(Host host, String datacenter, String rack, boolean isInitialConnection, Cluster.Manager cluster) {
        if (MoreObjects.equal(host.getDatacenter(), datacenter) && MoreObjects.equal(host.getRack(), rack)) {
            return;
        }
        if (!isInitialConnection) {
            cluster.loadBalancingPolicy().onDown(host);
        }
        host.setLocationInfo(datacenter, rack);
        if (!isInitialConnection) {
            cluster.loadBalancingPolicy().onAdd(host);
        }
    }

    private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Manager cluster, boolean isInitialConnection, boolean logInvalidPeers) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        logger.debug("[Control connection] Refreshing node list and token map");
        boolean metadataEnabled = cluster.configuration.getQueryOptions().isMetadataEnabled();
        DefaultResultSetFuture localFuture = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL));
        DefaultResultSetFuture peersFuture = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
        connection.write(localFuture);
        connection.write(peersFuture);
        String partitioner = null;
        Token.Factory factory = null;
        HashMap<Host, Set<Token>> tokenMap = new HashMap<Host, Set<Token>>();
        Row localRow = ((ResultSet)localFuture.get()).one();
        if (localRow != null) {
            Host host;
            String clusterName = localRow.getString("cluster_name");
            if (clusterName != null) {
                cluster.metadata.clusterName = clusterName;
            }
            if ((partitioner = localRow.getString("partitioner")) != null) {
                cluster.metadata.partitioner = partitioner;
                factory = Token.getFactory(partitioner);
            }
            if ((host = cluster.metadata.getHost(connection.address)) == null) {
                logger.debug("Host in local system table ({}) unknown to us (ok if said host just got removed)", (Object)connection.address);
            } else {
                Set<String> tokensStr;
                ControlConnection.updateInfo(host, localRow, cluster, isInitialConnection);
                if (metadataEnabled && factory != null && !(tokensStr = localRow.getSet("tokens", String.class)).isEmpty()) {
                    Set<Token> tokens = ControlConnection.toTokens(factory, tokensStr);
                    tokenMap.put(host, tokens);
                }
            }
        }
        ArrayList<InetSocketAddress> foundHosts = new ArrayList<InetSocketAddress>();
        ArrayList<String> dcs = new ArrayList<String>();
        ArrayList<String> racks = new ArrayList<String>();
        ArrayList<String> cassandraVersions = new ArrayList<String>();
        ArrayList<InetAddress> broadcastAddresses = new ArrayList<InetAddress>();
        ArrayList<InetAddress> listenAddresses = new ArrayList<InetAddress>();
        ArrayList<Set<Token>> allTokens = new ArrayList<Set<Token>>();
        ArrayList<String> dseVersions = new ArrayList<String>();
        ArrayList<Boolean> dseGraphEnabled = new ArrayList<Boolean>();
        ArrayList<Set<String>> dseWorkloads = new ArrayList<Set<String>>();
        ArrayList<UUID> hostIds = new ArrayList<UUID>();
        ArrayList<UUID> schemaVersions = new ArrayList<UUID>();
        for (Row row : (ResultSet)peersFuture.get()) {
            InetSocketAddress rpcAddress;
            if (!ControlConnection.isValidPeer(row, logInvalidPeers) || (rpcAddress = ControlConnection.rpcAddressForPeerHost(row, connection.address, cluster)) == null) continue;
            foundHosts.add(rpcAddress);
            dcs.add(row.getString("data_center"));
            racks.add(row.getString("rack"));
            cassandraVersions.add(row.getString("release_version"));
            broadcastAddresses.add(row.getInet("peer"));
            if (metadataEnabled && factory != null) {
                Set<String> tokensStr = row.getSet("tokens", String.class);
                Set<Token> tokens = null;
                if (!tokensStr.isEmpty()) {
                    tokens = ControlConnection.toTokens(factory, tokensStr);
                }
                allTokens.add(tokens);
            }
            InetAddress listenAddress = row.getColumnDefinitions().contains("listen_address") ? row.getInet("listen_address") : null;
            listenAddresses.add(listenAddress);
            if (row.getColumnDefinitions().contains("workloads")) {
                dseWorkloads.add(row.getSet("workloads", String.class));
            } else if (row.getColumnDefinitions().contains("workload") && row.getString("workload") != null) {
                dseWorkloads.add(Collections.singleton(row.getString("workload")));
            } else {
                dseWorkloads.add(null);
            }
            Boolean isDseGraph = row.getColumnDefinitions().contains("graph") ? Boolean.valueOf(row.getBool("graph")) : null;
            dseGraphEnabled.add(isDseGraph);
            String dseVersion = row.getColumnDefinitions().contains("dse_version") ? row.getString("dse_version") : null;
            dseVersions.add(dseVersion);
            hostIds.add(row.getUUID("host_id"));
            schemaVersions.add(row.getUUID("schema_version"));
        }
        for (int i = 0; i < foundHosts.size(); ++i) {
            Host host = cluster.metadata.getHost((InetSocketAddress)foundHosts.get(i));
            boolean isNew = false;
            if (host == null) {
                Host newHost = cluster.metadata.newHost((InetSocketAddress)foundHosts.get(i));
                Host previous = cluster.metadata.addIfAbsent(newHost);
                if (previous == null) {
                    host = newHost;
                    isNew = true;
                } else {
                    host = previous;
                    isNew = false;
                }
            }
            if (dcs.get(i) != null || racks.get(i) != null) {
                ControlConnection.updateLocationInfo(host, (String)dcs.get(i), (String)racks.get(i), isInitialConnection, cluster);
            }
            if (cassandraVersions.get(i) != null) {
                host.setVersion((String)cassandraVersions.get(i));
            }
            if (broadcastAddresses.get(i) != null) {
                host.setBroadcastAddress((InetAddress)broadcastAddresses.get(i));
            }
            if (listenAddresses.get(i) != null) {
                host.setListenAddress((InetAddress)listenAddresses.get(i));
            }
            if (dseVersions.get(i) != null) {
                host.setDseVersion((String)dseVersions.get(i));
            }
            if (dseWorkloads.get(i) != null) {
                host.setDseWorkloads((Set)dseWorkloads.get(i));
            }
            if (dseGraphEnabled.get(i) != null) {
                host.setDseGraphEnabled((Boolean)dseGraphEnabled.get(i));
            }
            if (hostIds.get(i) != null) {
                host.setHostId((UUID)hostIds.get(i));
            }
            if (schemaVersions.get(i) != null) {
                host.setSchemaVersion((UUID)schemaVersions.get(i));
            }
            if (metadataEnabled && factory != null && allTokens.get(i) != null) {
                tokenMap.put(host, (Set<Token>)allTokens.get(i));
            }
            if (!isNew || isInitialConnection) continue;
            cluster.triggerOnAdd(host);
        }
        HashSet foundHostsSet = new HashSet(foundHosts);
        for (Host host : cluster.metadata.allHosts()) {
            if (host.getSocketAddress().equals(connection.address) || foundHostsSet.contains(host.getSocketAddress())) continue;
            cluster.removeHost(host, isInitialConnection);
        }
        if (metadataEnabled && factory != null && !tokenMap.isEmpty()) {
            cluster.metadata.rebuildTokenMap(factory, tokenMap);
        }
    }

    private static Set<Token> toTokens(Token.Factory factory, Set<String> tokensStr) {
        LinkedHashSet<Token> tokens = new LinkedHashSet<Token>(tokensStr.size());
        for (String tokenStr : tokensStr) {
            tokens.add(factory.fromString(tokenStr));
        }
        return tokens;
    }

    private static boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
        boolean isValid;
        boolean bl = isValid = peerRow.getColumnDefinitions().contains("rpc_address") && !peerRow.isNull("rpc_address");
        if (EXTENDED_PEER_CHECK) {
            isValid &= peerRow.getColumnDefinitions().contains("host_id") && !peerRow.isNull("host_id") && peerRow.getColumnDefinitions().contains("data_center") && !peerRow.isNull("data_center") && peerRow.getColumnDefinitions().contains("rack") && !peerRow.isNull("rack") && peerRow.getColumnDefinitions().contains("tokens") && !peerRow.isNull("tokens");
        }
        if (!isValid && logIfInvalid) {
            logger.warn("Found invalid row in system.peers: {}. This is likely a gossip or snitch issue, this host will be ignored.", (Object)ControlConnection.formatInvalidPeer(peerRow));
        }
        return isValid;
    }

    private static String formatInvalidPeer(Row peerRow) {
        StringBuilder sb = new StringBuilder("[peer=" + peerRow.getInet("peer"));
        ControlConnection.formatMissingOrNullColumn(peerRow, "rpc_address", sb);
        if (EXTENDED_PEER_CHECK) {
            ControlConnection.formatMissingOrNullColumn(peerRow, "host_id", sb);
            ControlConnection.formatMissingOrNullColumn(peerRow, "data_center", sb);
            ControlConnection.formatMissingOrNullColumn(peerRow, "rack", sb);
            ControlConnection.formatMissingOrNullColumn(peerRow, "tokens", sb);
        }
        sb.append("]");
        return sb.toString();
    }

    private static void formatMissingOrNullColumn(Row peerRow, String columnName, StringBuilder sb) {
        if (!peerRow.getColumnDefinitions().contains(columnName)) {
            sb.append(", missing ").append(columnName);
        } else if (peerRow.isNull(columnName)) {
            sb.append(", ").append(columnName).append("=null");
        }
    }

    static boolean waitForSchemaAgreement(Connection connection, Cluster.Manager cluster) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        long start = System.nanoTime();
        long elapsed = 0L;
        int maxSchemaAgreementWaitSeconds = cluster.configuration.getProtocolOptions().getMaxSchemaAgreementWaitSeconds();
        while (elapsed < (long)(maxSchemaAgreementWaitSeconds * 1000)) {
            if (ControlConnection.checkSchemaAgreement(connection, cluster)) {
                return true;
            }
            Thread.sleep(200L);
            elapsed = Cluster.timeSince(start, TimeUnit.MILLISECONDS);
        }
        return false;
    }

    private static boolean checkSchemaAgreement(Connection connection, Cluster.Manager cluster) throws InterruptedException, ExecutionException {
        DefaultResultSetFuture peersFuture = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_SCHEMA_PEERS));
        DefaultResultSetFuture localFuture = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_SCHEMA_LOCAL));
        connection.write(peersFuture);
        connection.write(localFuture);
        HashSet<UUID> versions = new HashSet<UUID>();
        Row localRow = ((ResultSet)localFuture.get()).one();
        if (localRow != null && !localRow.isNull("schema_version")) {
            versions.add(localRow.getUUID("schema_version"));
        }
        for (Row row : (ResultSet)peersFuture.get()) {
            Host peer;
            InetSocketAddress addr = ControlConnection.rpcAddressForPeerHost(row, connection.address, cluster);
            if (addr == null || row.isNull("schema_version") || (peer = cluster.metadata.getHost(addr)) == null || !peer.isUp()) continue;
            versions.add(row.getUUID("schema_version"));
        }
        logger.debug("Checking for schema agreement: versions are {}", versions);
        return versions.size() <= 1;
    }

    boolean checkSchemaAgreement() throws ConnectionException, BusyConnectionException, InterruptedException, ExecutionException {
        Connection connection = this.connectionRef.get();
        return connection != null && !connection.isClosed() && ControlConnection.checkSchemaAgreement(connection, this.cluster);
    }

    boolean isOpen() {
        Connection c = this.connectionRef.get();
        return c != null && !c.isClosed();
    }

    public void onUp(Host host) {
    }

    public void onAdd(Host host) {
    }

    public void onDown(Host host) {
        this.onHostGone(host);
    }

    public void onRemove(Host host) {
        this.onHostGone(host);
    }

    private void onHostGone(Host host) {
        Connection current = this.connectionRef.get();
        if (current != null && current.address.equals(host.getSocketAddress())) {
            logger.debug("[Control connection] {} is down/removed and it was the control host, triggering reconnect", (Object)current.address);
            if (!current.isClosed()) {
                current.closeAsync().force();
            }
            this.backgroundReconnect(0L);
        }
    }

    @Override
    public void onConnectionDefunct(Connection connection) {
        if (connection == this.connectionRef.get()) {
            this.backgroundReconnect(0L);
        }
    }

    static {
        try {
            bindAllAddress = InetAddress.getByAddress(new byte[4]);
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }
}

