/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.AbstractReconnectionHandler;
import com.datastax.driver.core.BusyConnectionException;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Connection;
import com.datastax.driver.core.ConnectionException;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Query;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.exceptions.DriverInternalError;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.google.common.base.Objects;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.transport.Event;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.transport.messages.RegisterMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ControlConnection
implements Host.StateListener {
    private static final Logger logger = LoggerFactory.getLogger(ControlConnection.class);
    static final long MAX_SCHEMA_AGREEMENT_WAIT_MS = 10000L;
    private static final InetAddress bindAllAddress;
    private static final String SELECT_KEYSPACES = "SELECT * FROM system.schema_keyspaces";
    private static final String SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies";
    private static final String SELECT_COLUMNS = "SELECT * FROM system.schema_columns";
    private static final String SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address FROM system.peers";
    private static final String SELECT_LOCAL = "SELECT cluster_name, data_center, rack, tokens, partitioner FROM system.local WHERE key='local'";
    private static final String SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers";
    private static final String SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'";
    private final AtomicReference<Connection> connectionRef = new AtomicReference();
    private final Cluster.Manager cluster;
    private final LoadBalancingPolicy balancingPolicy;
    private final ReconnectionPolicy reconnectionPolicy;
    private final AtomicReference<ScheduledFuture<?>> reconnectionAttempt = new AtomicReference();
    private volatile boolean isShutdown;

    public ControlConnection(Cluster.Manager manager) {
        this.cluster = manager;
        this.reconnectionPolicy = manager.configuration.getPolicies().getReconnectionPolicy();
        this.balancingPolicy = manager.configuration.getPolicies().getLoadBalancingPolicy();
    }

    public void connect() {
        if (this.isShutdown) {
            return;
        }
        this.setNewConnection(this.reconnectInternal());
    }

    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
        this.isShutdown = true;
        Connection connection = this.connectionRef.get();
        return connection != null ? connection.close(timeout, unit) : true;
    }

    private void reconnect() {
        if (this.isShutdown) {
            return;
        }
        try {
            this.setNewConnection(this.reconnectInternal());
        }
        catch (NoHostAvailableException e) {
            logger.error("[Control connection] Cannot connect to any host, scheduling retry");
            new AbstractReconnectionHandler(this.cluster.reconnectionExecutor, this.reconnectionPolicy.newSchedule(), this.reconnectionAttempt){

                @Override
                protected Connection tryReconnect() throws ConnectionException {
                    try {
                        return ControlConnection.this.reconnectInternal();
                    }
                    catch (NoHostAvailableException e) {
                        throw new ConnectionException(null, e.getMessage());
                    }
                }

                @Override
                protected void onReconnection(Connection connection) {
                    ControlConnection.this.setNewConnection(connection);
                }

                @Override
                protected boolean onConnectionException(ConnectionException e, long nextDelayMs) {
                    logger.error("[Control connection] Cannot connect to any host, scheduling retry in {} milliseconds", (Object)nextDelayMs);
                    return true;
                }

                @Override
                protected boolean onUnknownException(Exception e, long nextDelayMs) {
                    logger.error(String.format("[Control connection] Unknown error during reconnection, scheduling retry in %d milliseconds", nextDelayMs), (Throwable)e);
                    return true;
                }
            }.start();
        }
    }

    private void signalError() {
        Host host;
        Connection connection = this.connectionRef.get();
        if (connection != null && connection.isDefunct() && (host = this.cluster.metadata.getHost(connection.address)) != null) {
            host.getMonitor().signalConnectionFailure(connection.lastException());
            return;
        }
        this.reconnect();
    }

    private void setNewConnection(Connection newConnection) {
        logger.debug("[Control connection] Successfully connected to {}", (Object)newConnection.address);
        Connection old = this.connectionRef.getAndSet(newConnection);
        if (old != null && !old.isClosed()) {
            try {
                old.close(0L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private Connection reconnectInternal() {
        Iterator<Host> iter = this.balancingPolicy.newQueryPlan(Query.DEFAULT);
        Map<InetAddress, String> errors = null;
        Host host = null;
        try {
            while (iter.hasNext()) {
                host = iter.next();
                try {
                    return this.tryConnect(host);
                }
                catch (ConnectionException e) {
                    errors = ControlConnection.logError(host, e.getMessage(), errors, iter);
                    host.getMonitor().signalConnectionFailure(e);
                }
                catch (ExecutionException e) {
                    errors = ControlConnection.logError(host, e.getMessage(), errors, iter);
                }
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (host != null) {
                errors = ControlConnection.logError(host, "Connection thread interrupted", errors, iter);
            }
            while (iter.hasNext()) {
                errors = ControlConnection.logError(iter.next(), "Connection thread interrupted", errors, iter);
            }
        }
        throw new NoHostAvailableException(errors == null ? Collections.emptyMap() : errors);
    }

    private static Map<InetAddress, String> logError(Host host, String msg, Map<InetAddress, String> errors, Iterator<Host> iter) {
        if (errors == null) {
            errors = new HashMap<InetAddress, String>();
        }
        errors.put(host.getAddress(), msg);
        if (logger.isDebugEnabled()) {
            if (iter.hasNext()) {
                logger.debug("[Control connection] error on {} connection ({}), trying next host", (Object)host, (Object)msg);
            } else {
                logger.debug("[Control connection] error on {} connection ({}), no more host to try", (Object)host, (Object)msg);
            }
        }
        return errors;
    }

    private Connection tryConnect(Host host) throws ConnectionException, ExecutionException, InterruptedException {
        Connection connection = this.cluster.connectionFactory.open(host);
        try {
            logger.trace("[Control connection] Registering for events");
            List<Event.Type> evs = Arrays.asList(Event.Type.TOPOLOGY_CHANGE, Event.Type.STATUS_CHANGE, Event.Type.SCHEMA_CHANGE);
            connection.write((Message.Request)new RegisterMessage(evs));
            logger.debug(String.format("[Control connection] Refreshing node list and token map", new Object[0]));
            this.refreshNodeListAndTokenMap(connection);
            logger.debug("[Control connection] Refreshing schema");
            ControlConnection.refreshSchema(connection, null, null, this.cluster);
            return connection;
        }
        catch (BusyConnectionException e) {
            throw new DriverInternalError("Newly created connection should not be busy");
        }
    }

    public void refreshSchema(String keyspace, String table) throws InterruptedException {
        logger.debug("[Control connection] Refreshing schema for {}{}", (Object)(keyspace == null ? "" : keyspace), (Object)(table == null ? "" : "." + table));
        try {
            ControlConnection.refreshSchema(this.connectionRef.get(), keyspace, table, this.cluster);
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refeshing schema ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            logger.error("[Control connection] Unexpected error while refeshing schema", (Throwable)e);
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
    }

    static void refreshSchema(Connection connection, String keyspace, String table, Cluster.Manager cluster) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        String whereClause = "";
        if (keyspace != null) {
            whereClause = " WHERE keyspace_name = '" + keyspace + "'";
            if (table != null) {
                whereClause = whereClause + " AND columnfamily_name = '" + table + "'";
            }
        }
        ResultSetFuture ksFuture = table == null ? new ResultSetFuture(null, (Message.Request)new QueryMessage(SELECT_KEYSPACES + whereClause, ConsistencyLevel.DEFAULT_CASSANDRA_CL)) : null;
        ResultSetFuture cfFuture = new ResultSetFuture(null, (Message.Request)new QueryMessage(SELECT_COLUMN_FAMILIES + whereClause, ConsistencyLevel.DEFAULT_CASSANDRA_CL));
        ResultSetFuture colsFuture = new ResultSetFuture(null, (Message.Request)new QueryMessage(SELECT_COLUMNS + whereClause, ConsistencyLevel.DEFAULT_CASSANDRA_CL));
        if (ksFuture != null) {
            connection.write(ksFuture.callback);
        }
        connection.write(cfFuture.callback);
        connection.write(colsFuture.callback);
        cluster.metadata.rebuildSchema(keyspace, table, ksFuture == null ? null : (ResultSet)ksFuture.get(), (ResultSet)cfFuture.get(), (ResultSet)colsFuture.get());
    }

    public void refreshNodeListAndTokenMap() {
        Connection c = this.connectionRef.get();
        if (c == null) {
            return;
        }
        logger.debug(String.format("[Control connection] Refreshing node list and token map", new Object[0]));
        try {
            this.refreshNodeListAndTokenMap(c);
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refeshing node list and token map ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            logger.error("[Control connection] Unexpected error while refeshing node list and token map", (Throwable)e);
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("[Control connection] Interrupted while refreshing node list and token map, skipping it.");
        }
    }

    private void updateLocationInfo(Host host, String datacenter, String rack) {
        if (Objects.equal((Object)host.getDatacenter(), (Object)datacenter) && Objects.equal((Object)host.getRack(), (Object)rack)) {
            return;
        }
        this.balancingPolicy.onDown(host);
        host.setLocationInfo(datacenter, rack);
        this.balancingPolicy.onAdd(host);
    }

    private void refreshNodeListAndTokenMap(Connection connection) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        ResultSetFuture peersFuture = new ResultSetFuture(null, (Message.Request)new QueryMessage(SELECT_PEERS, ConsistencyLevel.DEFAULT_CASSANDRA_CL));
        ResultSetFuture localFuture = new ResultSetFuture(null, (Message.Request)new QueryMessage(SELECT_LOCAL, ConsistencyLevel.DEFAULT_CASSANDRA_CL));
        connection.write(peersFuture.callback);
        connection.write(localFuture.callback);
        String partitioner = null;
        HashMap<Host, Collection<String>> tokenMap = new HashMap<Host, Collection<String>>();
        Row localRow = ((ResultSet)localFuture.get()).one();
        if (localRow != null) {
            String clusterName = localRow.getString("cluster_name");
            if (clusterName != null) {
                this.cluster.metadata.clusterName = clusterName;
            }
            partitioner = localRow.getString("partitioner");
            Host host = this.cluster.metadata.getHost(connection.address);
            if (host == null) {
                logger.debug("Host in local system table ({}) unknown to us (ok if said host just got removed)", (Object)connection.address);
            } else {
                this.updateLocationInfo(host, localRow.getString("data_center"), localRow.getString("rack"));
                Set<String> tokens = localRow.getSet("tokens", String.class);
                if (partitioner != null && !tokens.isEmpty()) {
                    tokenMap.put(host, tokens);
                }
            }
        }
        ArrayList<InetAddress> foundHosts = new ArrayList<InetAddress>();
        ArrayList<String> dcs = new ArrayList<String>();
        ArrayList<String> racks = new ArrayList<String>();
        ArrayList<Set<String>> allTokens = new ArrayList<Set<String>>();
        for (Row row : (ResultSet)peersFuture.get()) {
            InetAddress addr = row.getInet("rpc_address");
            if (addr == null) {
                addr = row.getInet("peer");
                logger.error("No rpc_address found for host {} in {}'s peers system table. That should not happen but using address {} instead", new Object[]{addr, connection.address, addr});
            } else if (addr.equals(bindAllAddress)) {
                addr = row.getInet("peer");
            }
            foundHosts.add(addr);
            dcs.add(row.getString("data_center"));
            racks.add(row.getString("rack"));
            allTokens.add(row.getSet("tokens", String.class));
        }
        for (int i = 0; i < foundHosts.size(); ++i) {
            Host host = this.cluster.metadata.getHost((InetAddress)foundHosts.get(i));
            if (host == null) {
                host = this.cluster.addHost((InetAddress)foundHosts.get(i), true);
            }
            this.updateLocationInfo(host, (String)dcs.get(i), (String)racks.get(i));
            if (partitioner == null || ((Set)allTokens.get(i)).isEmpty()) continue;
            tokenMap.put(host, (Collection<String>)allTokens.get(i));
        }
        HashSet foundHostsSet = new HashSet(foundHosts);
        for (Host host : this.cluster.metadata.allHosts()) {
            if (host.getAddress().equals(connection.address) || foundHostsSet.contains(host.getAddress())) continue;
            this.cluster.removeHost(host);
        }
        if (partitioner != null) {
            this.cluster.metadata.rebuildTokenMap(partitioner, tokenMap);
        }
    }

    static boolean waitForSchemaAgreement(Connection connection, Metadata metadata) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        long start = System.nanoTime();
        long elapsed = 0L;
        while (elapsed < 10000L) {
            ResultSetFuture peersFuture = new ResultSetFuture(null, (Message.Request)new QueryMessage(SELECT_SCHEMA_PEERS, ConsistencyLevel.DEFAULT_CASSANDRA_CL));
            ResultSetFuture localFuture = new ResultSetFuture(null, (Message.Request)new QueryMessage(SELECT_SCHEMA_LOCAL, ConsistencyLevel.DEFAULT_CASSANDRA_CL));
            connection.write(peersFuture.callback);
            connection.write(localFuture.callback);
            HashSet<UUID> versions = new HashSet<UUID>();
            Row localRow = ((ResultSet)localFuture.get()).one();
            if (localRow != null && !localRow.isNull("schema_version")) {
                versions.add(localRow.getUUID("schema_version"));
            }
            for (Row row : (ResultSet)peersFuture.get()) {
                Host peer;
                if (row.isNull("rpc_address") || row.isNull("schema_version")) continue;
                InetAddress rpc = row.getInet("rpc_address");
                if (rpc.equals(bindAllAddress)) {
                    rpc = row.getInet("peer");
                }
                if ((peer = metadata.getHost(rpc)) == null || !peer.getMonitor().isUp()) continue;
                versions.add(row.getUUID("schema_version"));
            }
            logger.debug("Checking for schema agreement: versions are {}", versions);
            if (versions.size() <= 1) {
                return true;
            }
            Thread.sleep(200L);
            elapsed = Cluster.timeSince(start, TimeUnit.MILLISECONDS);
        }
        return false;
    }

    boolean isOpen() {
        Connection c = this.connectionRef.get();
        return c != null && !c.isClosed();
    }

    @Override
    public void onUp(Host host) {
        this.balancingPolicy.onUp(host);
    }

    @Override
    public void onDown(Host host) {
        this.balancingPolicy.onDown(host);
        Connection current = this.connectionRef.get();
        if (logger.isTraceEnabled()) {
            logger.trace("[Control connection] {} is down, currently connected to {}", (Object)host, current == null ? "nobody" : current.address);
        }
        if (current != null && current.address.equals(host.getAddress()) && this.reconnectionAttempt.get() == null) {
            this.cluster.executor.submit(new Runnable(){

                @Override
                public void run() {
                    ControlConnection.this.reconnect();
                }
            });
        }
    }

    @Override
    public void onAdd(Host host) {
        this.balancingPolicy.onAdd(host);
        this.refreshNodeListAndTokenMap();
    }

    @Override
    public void onRemove(Host host) {
        this.balancingPolicy.onRemove(host);
        this.refreshNodeListAndTokenMap();
    }

    static {
        try {
            bindAllAddress = InetAddress.getByAddress(new byte[4]);
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }
}

