/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.dax.cluster;

import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import software.amazon.dax.Configuration;
import software.amazon.dax.DaxAsyncClient;
import software.amazon.dax.InternalConfiguration;
import software.amazon.dax.cluster.Backend;
import software.amazon.dax.cluster.Source;
import software.amazon.dax.com.amazon.dax.bits.disco.ServiceEndpoint;
import software.amazon.dax.com.amazon.dax.client.HostPort;
import software.amazon.dax.com.amazon.dax.client.cluster.RandomRouter;
import software.amazon.dax.com.amazon.dax.client.cluster.Router;
import software.amazon.dax.com.amazon.dax.client.cluster.ThreadAffinityRouter;
import software.amazon.dax.exceptions.ClientCreationException;
import software.amazon.dax.exceptions.DaxServiceException;
import software.amazon.dax.utils.IpFilter;

public class Cluster
implements Closeable {
    private static final Log LOG = LogFactory.getLog(Cluster.class);
    private static final int MIN_CLUSTER_SIZE_USING_THREAD_AFFINITY = 8;
    private static final long THREAD_KEEP_ALIVE_MS = 10000L;
    private final HostPort seed;
    private final Map<InetSocketAddress, Backend> backends = new ConcurrentHashMap<InetSocketAddress, Backend>();
    private Set<ServiceEndpoint> cfg;
    private final Source source;
    private volatile Router<DaxAsyncClient> routes;
    private volatile boolean closed;
    private final ScheduledExecutorService scheduler;
    private ScheduledFuture<?> refreshJob;
    private final Configuration configuration;
    final Configuration endpointClientConfiguration;
    private final InternalConfiguration internalConfiguration;
    private final long clusterUpdateIntervalMillis;

    public Cluster(Configuration configuration) {
        this(configuration, null);
    }

    Cluster(Configuration configuration, Source source) {
        this.configuration = configuration;
        this.endpointClientConfiguration = configuration.toBuilder().requestTimeoutMillis(configuration.endpointRefreshTimeoutMillis()).connectTimeoutMillis(configuration.endpointRefreshTimeoutMillis()).build();
        this.internalConfiguration = new InternalConfiguration().dns(configuration.host());
        this.seed = HostPort.parse(configuration.url());
        this.source = source == null ? Source.autoconf(this, this.seed) : source;
        this.source.setIpDiscovery(configuration.ipDiscovery());
        this.clusterUpdateIntervalMillis = configuration.clusterUpdateIntervalMillis();
        this.scheduler = Executors.newScheduledThreadPool(0, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("DaxCluster-" + t.getId());
                return t;
            }
        });
        if (this.scheduler instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor tpe = (ThreadPoolExecutor)((Object)this.scheduler);
            tpe.setKeepAliveTime(10000L, TimeUnit.MILLISECONDS);
            tpe.allowCoreThreadTimeOut(true);
        }
        if (this.scheduler instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)this.scheduler;
            stpe.setRemoveOnCancelPolicy(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startup(int minimumHealthy) throws IOException {
        Cluster cluster = this;
        synchronized (cluster) {
            if (this.closed) {
                throw new IllegalStateException("closed");
            }
            long refreshIntervalMs = Math.max(TimeUnit.SECONDS.toMillis(1L), Cluster.jitter(this.clusterUpdateIntervalMillis));
            this.refreshJob = this.scheduler.scheduleWithFixedDelay(new Runnable(){
                private Throwable lastError;

                @Override
                public void run() {
                    try {
                        if (!Cluster.this.closed) {
                            Cluster.this.refresh();
                            this.lastError = null;
                        }
                    }
                    catch (Throwable t) {
                        if (this.lastError != null) {
                            if (this.lastError != t) {
                                t.addSuppressed(this.lastError);
                            }
                            if (LOG.isDebugEnabled()) {
                                LOG.debug((Object)("caught exception during cluster refresh: " + t), t);
                            }
                            this.lastError = null;
                        }
                        this.lastError = t;
                    }
                }
            }, refreshIntervalMs, refreshIntervalMs, TimeUnit.MILLISECONDS);
            try {
                this.refresh();
            }
            catch (DaxServiceException e) {
                int[] errCodes = e.getCodeSeq();
                if (errCodes != null && errCodes.length >= 3 && errCodes[1] == 23 && errCodes[2] == 31) {
                    LOG.warn((Object)("Auth exception while starting up cluster client:" + (Object)((Object)e)), (Throwable)((Object)e));
                }
                throw e;
            }
        }
        if (minimumHealthy <= 0) {
            return;
        }
        this.waitForRoutes(minimumHealthy);
        LOG.info((Object)("connected to cluster endpoints: " + this.backends.keySet()));
    }

    synchronized void waitForRoutes(int minimum) throws IOException {
        long timeoutMs = this.configuration.endpointRefreshTimeoutMillis();
        long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs);
        Router<DaxAsyncClient> routes;
        while ((routes = this.routes) == null || routes.size() < minimum) {
            timeoutMs = TimeUnit.NANOSECONDS.toMillis(deadline - System.nanoTime());
            if (timeoutMs <= 0L) {
                throw new IOException("Not enough endpoints connected");
            }
            try {
                this.wait(timeoutMs);
            }
            catch (InterruptedException ie) {
                throw new InterruptedIOException();
            }
        }
        return;
    }

    public DaxAsyncClient client(DaxAsyncClient prev) throws IOException {
        Router<DaxAsyncClient> routes = this.routes;
        if (routes == null) {
            throw new IOException("No endpoints available");
        }
        return routes.nextAny(prev);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Cluster cluster = this;
        synchronized (cluster) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.refreshJob != null) {
                this.refreshJob.cancel(false);
            }
        }
        for (Map.Entry entry : this.backends.entrySet()) {
            ((Backend)entry.getValue()).close();
        }
        this.backends.clear();
        this.scheduler.shutdown();
        try {
            if (!this.scheduler.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
                this.scheduler.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.scheduler.shutdownNow();
        }
        this.routes = null;
    }

    private static long jitter(long value) {
        long shift = 2L;
        return value < 4L ? value : ThreadLocalRandom.current().nextLong(value - (value >> 2), value + (value >> 2));
    }

    DaxAsyncClient newClient(InetAddress addr, int port, Configuration config) throws IOException {
        return new DaxAsyncClient((Configuration)config.copy(c -> c.url(HostPort.url(config.ssl(), addr, port))), this.internalConfiguration);
    }

    private Backend newBackend(InetAddress addr, int port) throws ClientCreationException {
        return new Backend(addr, port, this.configuration, this.internalConfiguration, this::rebuildRoutes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void refresh() throws IOException {
        Source src;
        Cluster cluster = this;
        synchronized (cluster) {
            src = this.source;
            if (this.closed || src == null) {
                return;
            }
        }
        src.refresh();
    }

    public synchronized void update(Set<ServiceEndpoint> cfg) throws IOException {
        if (cfg != null && cfg.size() > 0) {
            this.cfg = cfg;
            this.updateEndpoints();
        }
    }

    private void updateEndpoints() throws IOException {
        Set<ServiceEndpoint> se = this.cfg;
        Map<InetSocketAddress, Backend> backends = this.backends;
        boolean rebuild = false;
        Set<InetSocketAddress> newBackends = this.expand(se);
        for (InetSocketAddress inetSocketAddress : newBackends) {
            Backend be = backends.get(inetSocketAddress);
            if (be != null) continue;
            rebuild = true;
            backends.put(inetSocketAddress, this.newBackend(inetSocketAddress.getAddress(), inetSocketAddress.getPort()));
        }
        Iterator<Map.Entry<InetSocketAddress, Backend>> it = backends.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<InetSocketAddress, Backend> entry = it.next();
            if (newBackends.contains(entry.getKey())) continue;
            Backend old = entry.getValue();
            it.remove();
            old.close();
            rebuild = true;
        }
        if (rebuild) {
            this.rebuildRoutes();
        }
    }

    private Set<InetSocketAddress> expand(Set<ServiceEndpoint> se) throws IOException {
        HashSet<InetSocketAddress> addrs = new HashSet<InetSocketAddress>();
        for (ServiceEndpoint ep : se) {
            addrs.add(Cluster.toAddr(ep));
        }
        return new HashSet<InetSocketAddress>(IpFilter.getInstance().filterAndSelectInetSocketAddress(addrs, this.configuration.ipDiscovery()));
    }

    Backend backendFor(InetSocketAddress addr) {
        return this.backends.get(addr);
    }

    static InetSocketAddress toAddr(ServiceEndpoint ep) throws IOException {
        InetSocketAddress addr = ep.address() != null && ep.address().length > 0 ? new InetSocketAddress(InetAddress.getByAddress(ep.address()), ep.port()) : new InetSocketAddress(ep.hostname(), ep.port());
        return addr;
    }

    private synchronized void rebuildRoutes() {
        int sz = this.backends.size();
        if (sz == 0) {
            this.routes = null;
            this.notify();
            return;
        }
        DaxAsyncClient[] cs = (DaxAsyncClient[])this.backends.values().stream().map(Backend::getClient).toArray(DaxAsyncClient[]::new);
        this.routes = cs.length >= 8 ? new ThreadAffinityRouter<DaxAsyncClient>(cs, sz) : new RandomRouter<DaxAsyncClient>(cs, sz);
        this.notify();
    }
}

