/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.client;

import io.rsocket.Closeable;
import io.rsocket.client.filter.RSocketSupplier;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public class RSocketSupplierPool
implements Supplier<Optional<RSocketSupplier>>,
Consumer<RSocketSupplier>,
Closeable {
    private static final Logger logger = LoggerFactory.getLogger(RSocketSupplierPool.class);
    private static final int EFFORT = 5;
    private final ArrayList<RSocketSupplier> factoryPool;
    private final ArrayList<RSocketSupplier> leasedSuppliers;
    private final MonoProcessor<Void> onClose = MonoProcessor.create();

    public RSocketSupplierPool(Publisher<? extends Collection<RSocketSupplier>> publisher) {
        this.factoryPool = new ArrayList();
        this.leasedSuppliers = new ArrayList();
        Disposable disposable = Flux.from(publisher).doOnNext(this::handleNewFactories).onErrorResume(t -> {
            logger.error("error streaming RSocketSuppliers", t);
            return Mono.delay((Duration)Duration.ofSeconds(10L)).then(Mono.error((Throwable)t));
        }).subscribe();
        this.onClose.doFinally(s -> disposable.dispose()).subscribe();
    }

    private synchronized void handleNewFactories(Collection<RSocketSupplier> newFactories) {
        HashSet<RSocketSupplier> current = new HashSet<RSocketSupplier>(this.factoryPool.size() + this.leasedSuppliers.size());
        current.addAll(this.factoryPool);
        current.addAll(this.leasedSuppliers);
        HashSet removed = new HashSet(current);
        removed.removeAll(newFactories);
        HashSet<RSocketSupplier> added = new HashSet<RSocketSupplier>(newFactories);
        added.removeAll(current);
        boolean changed = false;
        Iterator<RSocketSupplier> it0 = this.leasedSuppliers.iterator();
        while (it0.hasNext()) {
            RSocketSupplier supplier = it0.next();
            if (!removed.contains(supplier)) continue;
            it0.remove();
            try {
                changed = true;
                supplier.dispose();
            }
            catch (Exception e) {
                logger.warn("Exception while closing a RSocket", (Throwable)e);
            }
        }
        Iterator<RSocketSupplier> it1 = this.factoryPool.iterator();
        while (it1.hasNext()) {
            RSocketSupplier supplier = it1.next();
            if (!removed.contains(supplier)) continue;
            it1.remove();
            try {
                changed = true;
                supplier.dispose();
            }
            catch (Exception e) {
                logger.warn("Exception while closing a RSocket", (Throwable)e);
            }
        }
        this.factoryPool.addAll(added);
        if (changed && logger.isDebugEnabled()) {
            StringBuilder msgBuilder = new StringBuilder();
            msgBuilder.append("\nUpdated active factories (size: ").append(this.factoryPool.size()).append(")\n");
            for (RSocketSupplier f : this.factoryPool) {
                msgBuilder.append(" + ").append(f).append('\n');
            }
            msgBuilder.append("Active sockets:\n");
            for (RSocketSupplier socket : this.leasedSuppliers) {
                msgBuilder.append(" + ").append(socket).append('\n');
            }
            logger.debug(msgBuilder.toString());
        }
    }

    @Override
    public synchronized void accept(RSocketSupplier rSocketSupplier) {
        this.leasedSuppliers.remove(rSocketSupplier);
        if (!rSocketSupplier.isDisposed()) {
            this.factoryPool.add(rSocketSupplier);
        }
    }

    @Override
    public synchronized Optional<RSocketSupplier> get() {
        Optional<RSocketSupplier> optional = Optional.empty();
        int poolSize = this.factoryPool.size();
        if (poolSize == 1) {
            RSocketSupplier rSocketSupplier = this.factoryPool.get(0);
            if (rSocketSupplier.availability() > 0.0) {
                this.factoryPool.remove(0);
                this.leasedSuppliers.add(rSocketSupplier);
                optional = Optional.of(rSocketSupplier);
            }
        } else if (poolSize > 1) {
            ThreadLocalRandom rng = ThreadLocalRandom.current();
            int size = this.factoryPool.size();
            RSocketSupplier factory0 = null;
            RSocketSupplier factory1 = null;
            int i0 = 0;
            int i1 = 0;
            for (int i = 0; i < 5; ++i) {
                i0 = ((Random)rng).nextInt(size);
                i1 = ((Random)rng).nextInt(size - 1);
                if (i1 >= i0) {
                    ++i1;
                }
                factory0 = this.factoryPool.get(i0);
                factory1 = this.factoryPool.get(i1);
                if (factory0.availability() > 0.0 && factory1.availability() > 0.0) break;
            }
            if (factory0.availability() > factory1.availability()) {
                this.factoryPool.remove(i0);
                this.leasedSuppliers.add(factory0);
                optional = Optional.of(factory0);
            } else {
                this.factoryPool.remove(i1);
                this.leasedSuppliers.add(factory1);
                optional = Optional.of(factory1);
            }
        }
        return optional;
    }

    public Mono<Void> onClose() {
        return this.onClose;
    }

    public void dispose() {
        if (!this.onClose.isDisposed()) {
            this.onClose.onComplete();
            this.close(this.factoryPool);
            this.close(this.leasedSuppliers);
        }
    }

    private void close(Collection<RSocketSupplier> suppliers) {
        for (RSocketSupplier supplier : suppliers) {
            try {
                supplier.dispose();
            }
            catch (Throwable throwable) {}
        }
    }

    public synchronized int poolSize() {
        return this.factoryPool.size();
    }

    public synchronized boolean isPoolEmpty() {
        return this.factoryPool.isEmpty();
    }
}

