package net.spy.memcached;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import net.spy.memcached.CouchbaseNode;
import net.spy.memcached.compat.SpyThread;
import net.spy.memcached.couch.AsyncConnectionManager;
import net.spy.memcached.protocol.couch.HttpOperation;
import net.spy.memcached.vbucket.Reconfigurable;
import net.spy.memcached.vbucket.config.Bucket;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
import org.apache.http.nio.util.DirectByteBufferAllocator;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;

/* loaded from: input_file:net/spy/memcached/CouchbaseConnection.class */
public final class CouchbaseConnection extends SpyThread implements Reconfigurable {
    private static final int NUM_CONNS = 1;
    private final CouchbaseConnectionFactory connFactory;
    private List<CouchbaseNode> nodes;
    private int nextNode;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected volatile boolean reconfiguring = false;
    protected volatile boolean running = true;
    private final Collection<ConnectionObserver> connObservers = new ConcurrentLinkedQueue();
    private volatile boolean shutDown = false;
    private final ConcurrentLinkedQueue<CouchbaseNode> nodesToShutdown = new ConcurrentLinkedQueue<>();

    public CouchbaseConnection(CouchbaseConnectionFactory couchbaseConnectionFactory, List<InetSocketAddress> list, Collection<ConnectionObserver> collection) throws IOException {
        this.connFactory = couchbaseConnectionFactory;
        this.connObservers.addAll(collection);
        this.nodes = createConnections(list);
        this.nextNode = 0;
        start();
    }

    private List<CouchbaseNode> createConnections(List<InetSocketAddress> list) throws IOException {
        LinkedList linkedList = new LinkedList();
        for (InetSocketAddress inetSocketAddress : list) {
            SyncBasicHttpParams syncBasicHttpParams = new SyncBasicHttpParams();
            syncBasicHttpParams.setIntParameter("http.socket.timeout", 5000).setIntParameter("http.connection.timeout", 5000).setIntParameter("http.socket.buffer-size", 8192).setBooleanParameter("http.connection.stalecheck", false).setBooleanParameter("http.tcp.nodelay", true).setParameter("http.useragent", "Spymemcached Client/1.1");
            AsyncNHttpClientHandler asyncNHttpClientHandler = new AsyncNHttpClientHandler(new ImmutableHttpProcessor(new HttpRequestInterceptor[]{new RequestContent(), new RequestTargetHost(), new RequestConnControl(), new RequestUserAgent(), new RequestExpectContinue()}), new CouchbaseNode.MyHttpRequestExecutionHandler(), new DefaultConnectionReuseStrategy(), new DirectByteBufferAllocator(), syncBasicHttpParams);
            asyncNHttpClientHandler.setEventListener(new CouchbaseNode.EventLogger());
            AsyncConnectionManager asyncConnectionManager = new AsyncConnectionManager(new HttpHost(inetSocketAddress.getHostName(), inetSocketAddress.getPort()), 1, asyncNHttpClientHandler, syncBasicHttpParams);
            getLogger().info("Added %s to connect queue", inetSocketAddress);
            CouchbaseNode createCouchDBNode = this.connFactory.createCouchDBNode(inetSocketAddress, asyncConnectionManager);
            createCouchDBNode.init();
            linkedList.add(createCouchDBNode);
        }
        return linkedList;
    }

    public void addOp(HttpOperation httpOperation) {
        this.nodes.get(getNextNode()).addOp(httpOperation);
    }

    public void handleIO() {
        Iterator<CouchbaseNode> it = this.nodes.iterator();
        while (it.hasNext()) {
            it.next().doWrites();
        }
        Iterator<CouchbaseNode> it2 = this.nodesToShutdown.iterator();
        while (it2.hasNext()) {
            CouchbaseNode next = it2.next();
            this.nodesToShutdown.remove(next);
            Collection<HttpOperation> destroyWriteQueue = next.destroyWriteQueue();
            try {
                next.shutdown();
            } catch (IOException e) {
                getLogger().error("Error shutting down connection to " + next.getSocketAddress());
            }
            redistributeOperations(destroyWriteQueue);
        }
    }

    private void redistributeOperations(Collection<HttpOperation> collection) {
        int i = 0;
        Iterator<HttpOperation> it = collection.iterator();
        while (it.hasNext()) {
            addOp(it.next());
            i++;
        }
        if (!$assertionsDisabled && i <= 0) {
            throw new AssertionError("Didn't add any new operations when redistributing");
        }
    }

    private int getNextNode() {
        int i = this.nextNode + 1;
        this.nextNode = i;
        int size = i % this.nodes.size();
        this.nextNode = size;
        return size;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkState() {
        if (this.shutDown) {
            throw new IllegalStateException("Shutting down");
        }
        if (!$assertionsDisabled && !isAlive()) {
            throw new AssertionError("IO Thread is not running.");
        }
    }

    public boolean shutdown() throws IOException {
        if (this.shutDown) {
            getLogger().info("Suppressing duplicate attempt to shut down");
            return false;
        }
        this.shutDown = true;
        this.running = false;
        for (CouchbaseNode couchbaseNode : this.nodes) {
            if (couchbaseNode != null) {
                couchbaseNode.shutdown();
                if (couchbaseNode.hasWriteOps()) {
                    getLogger().warn("Shutting down with ops waiting to be written");
                }
            }
        }
        return true;
    }

    @Override // net.spy.memcached.vbucket.Reconfigurable
    public void reconfigure(Bucket bucket) {
        this.reconfiguring = true;
        try {
            try {
                List<String> servers = bucket.getConfig().getServers();
                HashSet hashSet = new HashSet();
                ArrayList arrayList = new ArrayList();
                for (String str : servers) {
                    int lastIndexOf = str.lastIndexOf(58);
                    if (lastIndexOf < 1) {
                        throw new IllegalArgumentException("Invalid server ``" + str + "'' in vbucket's server list");
                    }
                    InetSocketAddress inetSocketAddress = new InetSocketAddress(str.substring(0, lastIndexOf), Integer.parseInt("5984"));
                    hashSet.add(inetSocketAddress);
                    arrayList.add(inetSocketAddress);
                }
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                ArrayList arrayList4 = new ArrayList();
                for (CouchbaseNode couchbaseNode : this.nodes) {
                    if (hashSet.contains(couchbaseNode.getSocketAddress())) {
                        arrayList3.add(couchbaseNode);
                        arrayList4.add(couchbaseNode.getSocketAddress());
                    } else {
                        arrayList2.add(couchbaseNode);
                    }
                }
                arrayList.removeAll(arrayList4);
                List<CouchbaseNode> createConnections = createConnections(arrayList);
                ArrayList arrayList5 = new ArrayList();
                arrayList5.addAll(arrayList3);
                arrayList5.addAll(createConnections);
                this.nodes = arrayList5;
                this.nodesToShutdown.addAll(arrayList2);
                this.reconfiguring = false;
            } catch (IOException e) {
                getLogger().error("Connection reconfiguration failed", e);
                this.reconfiguring = false;
            }
        } catch (Throwable th) {
            this.reconfiguring = false;
            throw th;
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.running) {
            if (!this.reconfiguring) {
                try {
                    handleIO();
                } catch (Exception e) {
                    logRunException(e);
                }
            }
        }
        getLogger().info("Shut down memcached client");
    }

    private void logRunException(Exception exc) {
        if (this.shutDown) {
            getLogger().debug("Exception occurred during shutdown", exc);
        } else {
            getLogger().warn("Problem handling memcached IO", exc);
        }
    }

    static {
        $assertionsDisabled = !CouchbaseConnection.class.desiredAssertionStatus();
    }
}
