/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.client;

import io.rsocket.AbstractRSocket;
import io.rsocket.Availability;
import io.rsocket.Closeable;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.client.LoadBalancerSocketMetrics;
import io.rsocket.client.NoAvailableRSocketException;
import io.rsocket.client.RSocketSupplierPool;
import io.rsocket.client.TimeoutException;
import io.rsocket.client.TransportException;
import io.rsocket.client.filter.RSocketSupplier;
import io.rsocket.stat.Ewma;
import io.rsocket.stat.FrugalQuantile;
import io.rsocket.stat.Median;
import io.rsocket.stat.Quantile;
import io.rsocket.util.Clock;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public abstract class LoadBalancedRSocketMono
extends Mono<RSocket>
implements Availability,
Closeable {
    public static final double DEFAULT_EXP_FACTOR = 4.0;
    public static final double DEFAULT_LOWER_QUANTILE = 0.2;
    public static final double DEFAULT_HIGHER_QUANTILE = 0.8;
    public static final double DEFAULT_MIN_PENDING = 1.0;
    public static final double DEFAULT_MAX_PENDING = 2.0;
    public static final int DEFAULT_MIN_APERTURE = 3;
    public static final int DEFAULT_MAX_APERTURE = 100;
    public static final long DEFAULT_MAX_REFRESH_PERIOD_MS = TimeUnit.MILLISECONDS.convert(5L, TimeUnit.MINUTES);
    private static final Logger logger = LoggerFactory.getLogger(LoadBalancedRSocketMono.class);
    private static final long APERTURE_REFRESH_PERIOD = Clock.unit().convert(15L, TimeUnit.SECONDS);
    private static final int EFFORT = 5;
    private static final long DEFAULT_INITIAL_INTER_ARRIVAL_TIME = Clock.unit().convert(1L, TimeUnit.SECONDS);
    private static final int DEFAULT_INTER_ARRIVAL_FACTOR = 500;
    private static final FailingRSocket FAILING_REACTIVE_SOCKET = new FailingRSocket();
    protected final Mono<RSocket> rSocketMono;
    private final double minPendings;
    private final double maxPendings;
    private final int minAperture;
    private final int maxAperture;
    private final long maxRefreshPeriod;
    private final double expFactor;
    private final Quantile lowerQuantile;
    private final Quantile higherQuantile;
    private final ArrayList<WeightedSocket> activeSockets;
    private final Ewma pendings;
    private final MonoProcessor<Void> onClose = MonoProcessor.create();
    private final RSocketSupplierPool pool;
    private final long weightedSocketRetries;
    private final Duration weightedSocketBackOff;
    private final Duration weightedSocketMaxBackOff;
    private volatile int targetAperture;
    private long lastApertureRefresh;
    private long refreshPeriod;
    private int pendingSockets;
    private volatile long lastRefresh;

    private LoadBalancedRSocketMono(Publisher<? extends Collection<RSocketSupplier>> factories, double expFactor, double lowQuantile, double highQuantile, double minPendings, double maxPendings, int minAperture, int maxAperture, long maxRefreshPeriodMs, long weightedSocketRetries, Duration weightedSocketBackOff, Duration weightedSocketMaxBackOff) {
        this.weightedSocketRetries = weightedSocketRetries;
        this.weightedSocketBackOff = weightedSocketBackOff;
        this.weightedSocketMaxBackOff = weightedSocketMaxBackOff;
        this.expFactor = expFactor;
        this.lowerQuantile = new FrugalQuantile(lowQuantile);
        this.higherQuantile = new FrugalQuantile(highQuantile);
        this.activeSockets = new ArrayList();
        this.pendingSockets = 0;
        this.minPendings = minPendings;
        this.maxPendings = maxPendings;
        this.pendings = new Ewma(15L, TimeUnit.SECONDS, (minPendings + maxPendings) / 2.0);
        this.minAperture = minAperture;
        this.maxAperture = maxAperture;
        this.targetAperture = minAperture;
        this.maxRefreshPeriod = Clock.unit().convert(maxRefreshPeriodMs, TimeUnit.MILLISECONDS);
        this.lastApertureRefresh = Clock.now();
        this.refreshPeriod = Clock.unit().convert(15L, TimeUnit.SECONDS);
        this.lastRefresh = Clock.now();
        this.pool = new RSocketSupplierPool(factories);
        this.refreshSockets();
        this.rSocketMono = Mono.fromSupplier(this::select);
        this.onClose.doFinally(signalType -> this.pool.dispose()).subscribe();
    }

    public static LoadBalancedRSocketMono create(Publisher<? extends Collection<RSocketSupplier>> factories) {
        return LoadBalancedRSocketMono.create(factories, 4.0, 0.2, 0.8, 1.0, 2.0, 3, 100, DEFAULT_MAX_REFRESH_PERIOD_MS);
    }

    public static LoadBalancedRSocketMono create(Publisher<? extends Collection<RSocketSupplier>> factories, double expFactor, double lowQuantile, double highQuantile, double minPendings, double maxPendings, int minAperture, int maxAperture, long maxRefreshPeriodMs, long weightedSocketRetries, Duration weightedSocketBackOff, Duration weightedSocketMaxBackOff) {
        return new LoadBalancedRSocketMono((Publisher)factories, expFactor, lowQuantile, highQuantile, minPendings, maxPendings, minAperture, maxAperture, maxRefreshPeriodMs, weightedSocketRetries, weightedSocketBackOff, weightedSocketMaxBackOff){

            public void subscribe(CoreSubscriber<? super RSocket> s) {
                this.rSocketMono.subscribe(s);
            }
        };
    }

    public static LoadBalancedRSocketMono create(Publisher<? extends Collection<RSocketSupplier>> factories, double expFactor, double lowQuantile, double highQuantile, double minPendings, double maxPendings, int minAperture, int maxAperture, long maxRefreshPeriodMs) {
        return new LoadBalancedRSocketMono((Publisher)factories, expFactor, lowQuantile, highQuantile, minPendings, maxPendings, minAperture, maxAperture, maxRefreshPeriodMs, 5L, Duration.ofMillis(500L), Duration.ofSeconds(5L)){

            public void subscribe(CoreSubscriber<? super RSocket> s) {
                this.rSocketMono.subscribe(s);
            }
        };
    }

    private synchronized void refreshSockets() {
        this.refreshAperture();
        int n = this.activeSockets.size();
        if (n < this.targetAperture && !this.pool.isPoolEmpty()) {
            logger.debug("aperture {} is below target {}, adding {} sockets", new Object[]{n, this.targetAperture, this.targetAperture - n});
            this.addSockets(this.targetAperture - n);
        } else if (this.targetAperture < this.activeSockets.size()) {
            logger.debug("aperture {} is above target {}, quicking 1 socket", (Object)n, (Object)this.targetAperture);
            this.quickSlowestRS();
        }
        long now = Clock.now();
        if (now - this.lastRefresh >= this.refreshPeriod) {
            long prev = this.refreshPeriod;
            this.refreshPeriod = (long)Math.min((double)this.refreshPeriod * 1.5, (double)this.maxRefreshPeriod);
            logger.debug("Bumping refresh period, {}->{}", (Object)(prev / 1000L), (Object)(this.refreshPeriod / 1000L));
            this.lastRefresh = now;
            this.addSockets(1);
        }
    }

    private synchronized void addSockets(int numberOfNewSocket) {
        Object optional;
        int n = numberOfNewSocket;
        int poolSize = this.pool.poolSize();
        if (n > poolSize) {
            n = poolSize;
            logger.debug("addSockets({}) restricted by the number of factories, i.e. addSockets({})", (Object)numberOfNewSocket, (Object)n);
        }
        for (int i = 0; i < n && ((Optional)(optional = this.pool.get())).isPresent(); ++i) {
            RSocketSupplier supplier = (RSocketSupplier)((Optional)optional).get();
            WeightedSocket weightedSocket = new WeightedSocket(supplier, this.lowerQuantile, this.higherQuantile);
        }
    }

    private synchronized void refreshAperture() {
        boolean underRateLimit;
        int n = this.activeSockets.size();
        if (n == 0) {
            return;
        }
        double p = 0.0;
        for (WeightedSocket wrs : this.activeSockets) {
            p += (double)wrs.getPending();
        }
        this.pendings.insert(p /= (double)(n + this.pendingSockets));
        double avgPending = this.pendings.value();
        long now = Clock.now();
        boolean bl = underRateLimit = now - this.lastApertureRefresh > APERTURE_REFRESH_PERIOD;
        if (avgPending < 1.0 && underRateLimit) {
            this.updateAperture(this.targetAperture - 1, now);
        } else if (2.0 < avgPending && underRateLimit) {
            this.updateAperture(this.targetAperture + 1, now);
        }
    }

    private void updateAperture(int newValue, long now) {
        int previous = this.targetAperture;
        this.targetAperture = newValue;
        this.targetAperture = Math.max(this.minAperture, this.targetAperture);
        int maxAperture = Math.min(this.maxAperture, this.activeSockets.size() + this.pool.poolSize());
        this.targetAperture = Math.min(maxAperture, this.targetAperture);
        this.lastApertureRefresh = now;
        this.pendings.reset((this.minPendings + this.maxPendings) / 2.0);
        if (this.targetAperture != previous) {
            logger.debug("Current pending={}, new target={}, previous target={}", new Object[]{this.pendings.value(), this.targetAperture, previous});
        }
    }

    private synchronized void quickSlowestRS() {
        if (this.activeSockets.size() <= 1) {
            return;
        }
        WeightedSocket slowest = null;
        double lowestAvailability = Double.MAX_VALUE;
        for (WeightedSocket socket : this.activeSockets) {
            double load = socket.availability();
            if (load == 0.0) {
                slowest = socket;
                break;
            }
            if (socket.getPredictedLatency() != 0.0) {
                load *= 1.0 / socket.getPredictedLatency();
            }
            if (!(load < lowestAvailability)) continue;
            lowestAvailability = load;
            slowest = socket;
        }
        if (slowest != null) {
            logger.debug("Disposing slowest WeightedSocket {}", slowest);
            slowest.dispose();
        }
    }

    public synchronized double availability() {
        double currentAvailability = 0.0;
        if (!this.activeSockets.isEmpty()) {
            for (WeightedSocket rs : this.activeSockets) {
                currentAvailability += rs.availability();
            }
            currentAvailability /= (double)this.activeSockets.size();
        }
        return currentAvailability;
    }

    private synchronized RSocket select() {
        double w2;
        double w1;
        this.refreshSockets();
        if (this.activeSockets.isEmpty()) {
            return FAILING_REACTIVE_SOCKET;
        }
        int size = this.activeSockets.size();
        if (size == 1) {
            return (RSocket)this.activeSockets.get(0);
        }
        WeightedSocket rsc1 = null;
        WeightedSocket rsc2 = null;
        ThreadLocalRandom rng = ThreadLocalRandom.current();
        for (int i = 0; i < 5; ++i) {
            int i1 = ((Random)rng).nextInt(size);
            int i2 = ((Random)rng).nextInt(size - 1);
            if (i2 >= i1) {
                ++i2;
            }
            rsc1 = this.activeSockets.get(i1);
            rsc2 = this.activeSockets.get(i2);
            if (rsc1.availability() > 0.0 && rsc2.availability() > 0.0) break;
            if (i + 1 != 5 || this.pool.isPoolEmpty()) continue;
            this.addSockets(1);
        }
        if ((w1 = this.algorithmicWeight(rsc1)) < (w2 = this.algorithmicWeight(rsc2))) {
            return rsc2;
        }
        return rsc1;
    }

    private double algorithmicWeight(WeightedSocket socket) {
        if (socket == null || socket.availability() == 0.0) {
            return 0.0;
        }
        int pendings = socket.getPending();
        double latency = socket.getPredictedLatency();
        double low = this.lowerQuantile.estimation();
        double high = Math.max(this.higherQuantile.estimation(), low * 1.001);
        double bandWidth = Math.max(high - low, 1.0);
        if (latency < low) {
            double alpha = (low - latency) / bandWidth;
            double bonusFactor = Math.pow(1.0 + alpha, this.expFactor);
            latency /= bonusFactor;
        } else if (latency > high) {
            double alpha = (latency - high) / bandWidth;
            double penaltyFactor = Math.pow(1.0 + alpha, this.expFactor);
            latency *= penaltyFactor;
        }
        return socket.availability() * 1.0 / (1.0 + latency * (double)(pendings + 1));
    }

    public synchronized String toString() {
        return "LoadBalancer(a:" + this.activeSockets.size() + ", f: " + this.pool.poolSize() + ", avgPendings=" + this.pendings.value() + ", targetAperture=" + this.targetAperture + ", band=[" + this.lowerQuantile.estimation() + ", " + this.higherQuantile.estimation() + "])";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dispose() {
        LoadBalancedRSocketMono loadBalancedRSocketMono = this;
        synchronized (loadBalancedRSocketMono) {
            this.activeSockets.forEach(AbstractRSocket::dispose);
            this.activeSockets.clear();
            this.onClose.onComplete();
        }
    }

    public boolean isDisposed() {
        return this.onClose.isDisposed();
    }

    public Mono<Void> onClose() {
        return this.onClose;
    }

    private class WeightedSocket
    extends AbstractRSocket
    implements LoadBalancerSocketMetrics {
        private static final double STARTUP_PENALTY = 2.251799813685247E15;
        private final Quantile lowerQuantile;
        private final Quantile higherQuantile;
        private final long inactivityFactor;
        private final MonoProcessor<RSocket> rSocketMono = MonoProcessor.create();
        private volatile int pending;
        private long stamp;
        private long stamp0;
        private long duration;
        private Median median;
        private Ewma interArrivalTime;
        private AtomicLong pendingStreams;
        private volatile double availability = 0.0;

        WeightedSocket(RSocketSupplier factory, Quantile lowerQuantile, Quantile higherQuantile, int inactivityFactor) {
            long now;
            this.lowerQuantile = lowerQuantile;
            this.higherQuantile = higherQuantile;
            this.inactivityFactor = inactivityFactor;
            this.stamp = now = Clock.now();
            this.stamp0 = now;
            this.duration = 0L;
            this.pending = 0;
            this.median = new Median();
            this.interArrivalTime = new Ewma(1L, TimeUnit.MINUTES, DEFAULT_INITIAL_INTER_ARRIVAL_TIME);
            this.pendingStreams = new AtomicLong();
            logger.debug("Creating WeightedSocket {} from factory {}", (Object)this, (Object)factory);
            this.onClose().doFinally(s -> {
                LoadBalancedRSocketMono.this.pool.accept(factory);
                LoadBalancedRSocketMono.this.activeSockets.remove(this);
                logger.debug("Removed {} from factory {} from activeSockets", (Object)this, (Object)factory);
                LoadBalancedRSocketMono.this.refreshSockets();
            }).subscribe();
            factory.get().retryBackoff(LoadBalancedRSocketMono.this.weightedSocketRetries, LoadBalancedRSocketMono.this.weightedSocketBackOff, LoadBalancedRSocketMono.this.weightedSocketMaxBackOff).doOnError(throwable -> {
                logger.error("error while connecting {} from factory {}", new Object[]{this, factory, throwable});
                this.dispose();
            }).subscribe(rSocket -> {
                rSocket.onClose().doFinally(signalType -> {
                    logger.info("RSocket {} from factory {} closed", (Object)this, (Object)factory);
                    this.dispose();
                }).subscribe();
                factory.onClose().doFinally(signalType -> {
                    logger.info("Factory {} closed", (Object)factory);
                    rSocket.dispose();
                }).subscribe();
                this.onClose().doFinally(signalType -> {
                    logger.info("WeightedSocket {} from factory {} closed", (Object)this, (Object)factory);
                    rSocket.dispose();
                }).subscribe();
                this.rSocketMono.onNext(rSocket);
                this.availability = 1.0;
                if (!this.isDisposed()) {
                    LoadBalancedRSocketMono.this.activeSockets.add(this);
                    logger.debug("Added WeightedSocket {} from factory {} to activeSockets", (Object)this, (Object)factory);
                }
            });
        }

        WeightedSocket(RSocketSupplier factory, Quantile lowerQuantile, Quantile higherQuantile) {
            this(factory, lowerQuantile, higherQuantile, 500);
        }

        public Mono<Payload> requestResponse(Payload payload) {
            return this.rSocketMono.flatMap(source -> Mono.from(subscriber -> source.requestResponse(payload).subscribe(new LatencySubscriber(subscriber, this))));
        }

        public Flux<Payload> requestStream(Payload payload) {
            return this.rSocketMono.flatMapMany(source -> Flux.from(subscriber -> source.requestStream(payload).subscribe(new CountingSubscriber(subscriber, this))));
        }

        public Mono<Void> fireAndForget(Payload payload) {
            return this.rSocketMono.flatMap(source -> Mono.from(subscriber -> source.fireAndForget(payload).subscribe(new CountingSubscriber(subscriber, this))));
        }

        public Mono<Void> metadataPush(Payload payload) {
            return this.rSocketMono.flatMap(source -> Mono.from(subscriber -> source.metadataPush(payload).subscribe(new CountingSubscriber(subscriber, this))));
        }

        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            return this.rSocketMono.flatMapMany(source -> Flux.from(subscriber -> source.requestChannel(payloads).subscribe(new CountingSubscriber(subscriber, this))));
        }

        synchronized double getPredictedLatency() {
            double weight;
            long now = Clock.now();
            long elapsed = Math.max(now - this.stamp, 1L);
            double prediction = this.median.estimation();
            if (prediction == 0.0) {
                weight = this.pending == 0 ? 0.0 : 2.251799813685247E15 + (double)this.pending;
            } else if (this.pending == 0 && (double)elapsed > (double)this.inactivityFactor * this.interArrivalTime.value()) {
                this.median.insert(0.0);
                weight = this.median.estimation();
            } else {
                double predicted = prediction * (double)this.pending;
                double instant = this.instantaneous(now);
                weight = predicted < instant ? instant / (double)this.pending : prediction;
            }
            return weight;
        }

        int getPending() {
            return this.pending;
        }

        private synchronized long instantaneous(long now) {
            return this.duration + (now - this.stamp0) * (long)this.pending;
        }

        private synchronized long incr() {
            long now = Clock.now();
            this.interArrivalTime.insert(now - this.stamp);
            this.duration += Math.max(0L, now - this.stamp0) * (long)this.pending;
            ++this.pending;
            this.stamp = now;
            this.stamp0 = now;
            return now;
        }

        private synchronized long decr(long timestamp) {
            long now = Clock.now();
            this.duration += Math.max(0L, now - this.stamp0) * (long)this.pending - (now - timestamp);
            --this.pending;
            this.stamp0 = now;
            return now;
        }

        private synchronized void observe(double rtt) {
            this.median.insert(rtt);
            this.lowerQuantile.insert(rtt);
            this.higherQuantile.insert(rtt);
        }

        public double availability() {
            return this.availability;
        }

        public String toString() {
            return "WeightedSocket(median=" + this.median.estimation() + " quantile-low=" + this.lowerQuantile.estimation() + " quantile-high=" + this.higherQuantile.estimation() + " inter-arrival=" + this.interArrivalTime.value() + " duration/pending=" + (this.pending == 0 ? 0.0 : (double)this.duration / (double)this.pending) + " pending=" + this.pending + " availability= " + this.availability() + ")->";
        }

        @Override
        public double medianLatency() {
            return this.median.estimation();
        }

        @Override
        public double lowerQuantileLatency() {
            return this.lowerQuantile.estimation();
        }

        @Override
        public double higherQuantileLatency() {
            return this.higherQuantile.estimation();
        }

        @Override
        public double interArrivalTime() {
            return this.interArrivalTime.value();
        }

        @Override
        public int pending() {
            return this.pending;
        }

        @Override
        public long lastTimeUsedMillis() {
            return this.stamp0;
        }

        private class CountingSubscriber<U>
        implements Subscriber<U> {
            private final Subscriber<U> child;
            private final WeightedSocket socket;

            CountingSubscriber(Subscriber<U> child, WeightedSocket socket) {
                this.child = child;
                this.socket = socket;
            }

            public void onSubscribe(Subscription s) {
                this.socket.pendingStreams.incrementAndGet();
                this.child.onSubscribe(s);
            }

            public void onNext(U u) {
                this.child.onNext(u);
            }

            public void onError(Throwable t) {
                this.socket.pendingStreams.decrementAndGet();
                this.child.onError(t);
                if (t instanceof TransportException || t instanceof ClosedChannelException) {
                    logger.debug("Disposing {} from activeSockets because of error {}", (Object)this.socket, (Object)t);
                    this.socket.dispose();
                }
            }

            public void onComplete() {
                this.socket.pendingStreams.decrementAndGet();
                this.child.onComplete();
            }
        }

        private class LatencySubscriber<U>
        implements Subscriber<U> {
            private final Subscriber<U> child;
            private final WeightedSocket socket;
            private final AtomicBoolean done;
            private long start;

            LatencySubscriber(Subscriber<U> child, WeightedSocket socket) {
                this.child = child;
                this.socket = socket;
                this.done = new AtomicBoolean(false);
            }

            public void onSubscribe(final Subscription s) {
                this.start = WeightedSocket.this.incr();
                this.child.onSubscribe(new Subscription(){

                    public void request(long n) {
                        s.request(n);
                    }

                    public void cancel() {
                        if (LatencySubscriber.this.done.compareAndSet(false, true)) {
                            s.cancel();
                            WeightedSocket.this.decr(LatencySubscriber.this.start);
                        }
                    }
                });
            }

            public void onNext(U u) {
                this.child.onNext(u);
            }

            public void onError(Throwable t) {
                if (this.done.compareAndSet(false, true)) {
                    this.child.onError(t);
                    long now = WeightedSocket.this.decr(this.start);
                    if (t instanceof TransportException || t instanceof ClosedChannelException) {
                        this.socket.dispose();
                    } else if (t instanceof TimeoutException) {
                        WeightedSocket.this.observe(now - this.start);
                    }
                }
            }

            public void onComplete() {
                if (this.done.compareAndSet(false, true)) {
                    long now = WeightedSocket.this.decr(this.start);
                    WeightedSocket.this.observe(now - this.start);
                    this.child.onComplete();
                }
            }
        }
    }

    private static class FailingRSocket
    implements RSocket {
        private static final Mono<Void> errorVoid = Mono.error((Throwable)NoAvailableRSocketException.INSTANCE);
        private static final Mono<Payload> errorPayload = Mono.error((Throwable)NoAvailableRSocketException.INSTANCE);

        private FailingRSocket() {
        }

        public Mono<Void> fireAndForget(Payload payload) {
            return errorVoid;
        }

        public Mono<Payload> requestResponse(Payload payload) {
            return errorPayload;
        }

        public Flux<Payload> requestStream(Payload payload) {
            return errorPayload.flux();
        }

        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            return errorPayload.flux();
        }

        public Mono<Void> metadataPush(Payload payload) {
            return errorVoid;
        }

        public double availability() {
            return 0.0;
        }

        public void dispose() {
        }

        public boolean isDisposed() {
            return true;
        }

        public Mono<Void> onClose() {
            return Mono.empty();
        }
    }
}

