/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.resources;

import io.netty.channel.Channel;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.internal.PlatformDependent;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.ReactorNetty;
import reactor.netty.internal.shaded.reactor.pool.AllocationStrategy;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
import reactor.netty.internal.shaded.reactor.pool.Pool;
import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
import reactor.netty.internal.shaded.reactor.pool.PoolConfig;
import reactor.netty.internal.shaded.reactor.pool.PooledRef;
import reactor.netty.internal.shaded.reactor.pool.PooledRefMetadata;
import reactor.netty.internal.shaded.reactor.pool.introspection.SamplingAllocationStrategy;
import reactor.netty.resources.ConnectionPoolMetrics;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.DelegatingConnectionPoolMetrics;
import reactor.netty.resources.MicrometerPooledConnectionProviderMeterRegistrar;
import reactor.netty.transport.TransportConfig;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

public abstract class PooledConnectionProvider<T extends Connection>
implements ConnectionProvider {
    final PoolFactory<T> defaultPoolFactory;
    final ConcurrentMap<PoolKey, InstrumentedPool<T>> channelPools = PlatformDependent.newConcurrentHashMap();
    private final Map<PoolKey, ConnectionPoolMetrics> poolMetrics = new WeakHashMap<PoolKey, ConnectionPoolMetrics>();
    final String name;
    static final Logger log = Loggers.getLogger(PooledConnectionProvider.class);

    protected PooledConnectionProvider(ConnectionProvider.Builder builder) {
        this.name = builder.name;
        this.defaultPoolFactory = new PoolFactory(builder);
    }

    @Override
    public final Mono<? extends Connection> acquire(TransportConfig config, ConnectionObserver connectionObserver, @Nullable Supplier<? extends SocketAddress> remote, @Nullable AddressResolverGroup<?> resolverGroup) {
        Objects.requireNonNull(config, "config");
        Objects.requireNonNull(connectionObserver, "connectionObserver");
        Objects.requireNonNull(remote, "remoteAddress");
        Objects.requireNonNull(resolverGroup, "resolverGroup");
        return Mono.create((MonoSink<T> sink) -> {
            SocketAddress remoteAddress = Objects.requireNonNull((SocketAddress)remote.get(), "Remote Address supplier returned null");
            PoolKey holder = new PoolKey(remoteAddress, config.channelHash());
            PoolFactory<T> poolFactory = this.poolFactory(remoteAddress);
            InstrumentedPool pool = this.channelPools.computeIfAbsent(holder, poolKey -> {
                if (log.isDebugEnabled()) {
                    log.debug("Creating a new [{}] client pool [{}] for [{}]", this.name, poolFactory, remoteAddress);
                }
                InstrumentedPool<T> newPool = this.createPool(config, poolFactory, remoteAddress, resolverGroup);
                if (poolFactory.metricsEnabled || config.metricsRecorder() != null) {
                    MicrometerPooledConnectionProviderMeterRegistrar registrar = poolFactory.registrar != null ? poolFactory.registrar.get() : MicrometerPooledConnectionProviderMeterRegistrar.INSTANCE;
                    DelegatingConnectionPoolMetrics metrics = new DelegatingConnectionPoolMetrics(newPool.metrics());
                    this.poolMetrics.put((PoolKey)poolKey, metrics);
                    registrar.registerMetrics(this.name, poolKey.hashCode() + "", remoteAddress, metrics);
                }
                return newPool;
            });
            pool.acquire(Duration.ofMillis(poolFactory.pendingAcquireTimeout)).subscribe(this.createDisposableAcquire(config, connectionObserver, poolFactory.pendingAcquireTimeout, pool, (MonoSink<Connection>)sink));
        });
    }

    @Override
    public final Mono<Void> disposeLater() {
        return Mono.defer(() -> {
            List pools = this.channelPools.values().stream().map(Pool::disposeLater).collect(Collectors.toList());
            if (pools.isEmpty()) {
                return Mono.empty();
            }
            this.channelPools.clear();
            return Mono.when(pools);
        });
    }

    @Override
    public final void disposeWhen(SocketAddress address) {
        List<Map.Entry> toDispose = this.channelPools.entrySet().stream().filter(p -> this.compareAddresses(((PoolKey)p.getKey()).holder, address)).collect(Collectors.toList());
        toDispose.forEach(e -> {
            if (this.channelPools.remove(e.getKey(), e.getValue())) {
                if (log.isDebugEnabled()) {
                    log.debug("ConnectionProvider[name={}]: Disposing pool for [{}]", this.name, ((PoolKey)e.getKey()).fqdn);
                }
                ((InstrumentedPool)e.getValue()).dispose();
            }
        });
    }

    @Override
    public final boolean isDisposed() {
        return this.channelPools.isEmpty() || this.channelPools.values().stream().allMatch(Disposable::isDisposed);
    }

    @Override
    public int maxConnections() {
        return this.defaultPoolFactory.maxConnections;
    }

    protected abstract CoreSubscriber<PooledRef<T>> createDisposableAcquire(TransportConfig var1, ConnectionObserver var2, long var3, InstrumentedPool<T> var5, MonoSink<Connection> var6);

    protected abstract InstrumentedPool<T> createPool(TransportConfig var1, PoolFactory<T> var2, SocketAddress var3, AddressResolverGroup<?> var4);

    protected PoolFactory<T> poolFactory(SocketAddress remoteAddress) {
        return this.defaultPoolFactory;
    }

    final boolean compareAddresses(SocketAddress origin, SocketAddress target) {
        if (origin.equals(target)) {
            return true;
        }
        if (origin instanceof InetSocketAddress && target instanceof InetSocketAddress) {
            InetSocketAddress isaOrigin = (InetSocketAddress)origin;
            InetSocketAddress isaTarget = (InetSocketAddress)target;
            if (isaOrigin.getPort() == isaTarget.getPort()) {
                InetAddress iaTarget = isaTarget.getAddress();
                return iaTarget != null && iaTarget.isAnyLocalAddress() || Objects.equals(isaOrigin.getHostString(), isaTarget.getHostString());
            }
        }
        return false;
    }

    protected static void logPoolState(Channel channel, InstrumentedPool<? extends Connection> pool, String msg) {
        PooledConnectionProvider.logPoolState(channel, pool, msg, null);
    }

    protected static void logPoolState(Channel channel, InstrumentedPool<? extends Connection> pool, String msg, @Nullable Throwable t) {
        InstrumentedPool.PoolMetrics metrics = pool.metrics();
        log.debug(ReactorNetty.format(channel, "{}, now: {} active connections, {} inactive connections and {} pending acquire requests."), msg, metrics.acquiredSize(), metrics.idleSize(), metrics.pendingAcquireSize(), t == null ? "" : t);
    }

    static final class PoolKey {
        final String fqdn;
        final SocketAddress holder;
        final int pipelineKey;

        PoolKey(SocketAddress holder, int pipelineKey) {
            this.fqdn = holder.toString();
            this.holder = holder;
            this.pipelineKey = pipelineKey;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PoolKey poolKey = (PoolKey)o;
            return Objects.equals(this.fqdn, poolKey.fqdn) && Objects.equals(this.holder, poolKey.holder) && this.pipelineKey == poolKey.pipelineKey;
        }

        public int hashCode() {
            return Objects.hash(this.fqdn, this.holder, this.pipelineKey);
        }
    }

    protected static final class PoolFactory<T extends Connection> {
        static final double DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE;
        static final double DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE;
        final Duration evictionInterval;
        final String leasingStrategy;
        final int maxConnections;
        final long maxIdleTime;
        final long maxLifeTime;
        final boolean metricsEnabled;
        final int pendingAcquireMaxCount;
        final long pendingAcquireTimeout;
        final Supplier<? extends ConnectionProvider.MeterRegistrar> registrar;

        PoolFactory(ConnectionProvider.ConnectionPoolSpec<?> conf) {
            this.evictionInterval = conf.evictionInterval;
            this.leasingStrategy = conf.leasingStrategy;
            this.maxConnections = conf.maxConnections;
            this.maxIdleTime = conf.maxIdleTime != null ? conf.maxIdleTime.toMillis() : -1L;
            this.maxLifeTime = conf.maxLifeTime != null ? conf.maxLifeTime.toMillis() : -1L;
            this.metricsEnabled = conf.metricsEnabled;
            this.pendingAcquireMaxCount = conf.pendingAcquireMaxCount == -2 ? 2 * conf.maxConnections : conf.pendingAcquireMaxCount;
            this.pendingAcquireTimeout = conf.pendingAcquireTimeout.toMillis();
            this.registrar = conf.registrar;
        }

        public InstrumentedPool<T> newPool(Publisher<T> allocator, @Nullable AllocationStrategy allocationStrategy, Function<T, Publisher<Void>> destroyHandler, BiPredicate<T, PooledRefMetadata> evictionPredicate) {
            PoolBuilder<Connection, PoolConfig<Object>> poolBuilder = PoolBuilder.from(allocator).destroyHandler(destroyHandler).evictionPredicate(evictionPredicate.or((poolable, meta) -> this.maxIdleTime != -1L && meta.idleTime() >= this.maxIdleTime || this.maxLifeTime != -1L && meta.lifeTime() >= this.maxLifeTime)).maxPendingAcquire(this.pendingAcquireMaxCount).evictInBackground(this.evictionInterval);
            poolBuilder = DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE > 0.0 && DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE <= 1.0 && DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE > 0.0 && DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE <= 1.0 ? poolBuilder.allocationStrategy(SamplingAllocationStrategy.sizeBetweenWithSampling(0, this.maxConnections, DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE, DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE)) : poolBuilder.sizeBetween(0, this.maxConnections);
            if ("fifo".equals(this.leasingStrategy)) {
                return poolBuilder.idleResourceReuseLruOrder().buildPool();
            }
            return poolBuilder.idleResourceReuseMruOrder().buildPool();
        }

        public String toString() {
            return "PoolFactory{evictionInterval=" + this.evictionInterval + ", leasingStrategy=" + this.leasingStrategy + ", maxConnections=" + this.maxConnections + ", maxIdleTime=" + this.maxIdleTime + ", maxLifeTime=" + this.maxLifeTime + ", metricsEnabled=" + this.metricsEnabled + ", pendingAcquireMaxCount=" + this.pendingAcquireMaxCount + ", pendingAcquireTimeout=" + this.pendingAcquireTimeout + '}';
        }

        static {
            double getPermitsSamplingRate = Double.parseDouble(System.getProperty("reactor.netty.pool.getPermitsSamplingRate", "0"));
            if (getPermitsSamplingRate > 1.0) {
                DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE = 0.0;
                log.warn("Invalid configuration [reactor.netty.pool.getPermitsSamplingRate=" + getPermitsSamplingRate + "], the value must be between 0d and 1d (percentage). SamplingAllocationStrategy in not enabled.");
            } else {
                DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE = getPermitsSamplingRate;
            }
            double returnPermitsSamplingRate = Double.parseDouble(System.getProperty("reactor.netty.pool.returnPermitsSamplingRate", "0"));
            if (returnPermitsSamplingRate > 1.0) {
                DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE = 0.0;
                log.warn("Invalid configuration [reactor.netty.pool.returnPermitsSamplingRate=" + returnPermitsSamplingRate + "], the value must be between 0d and 1d (percentage). SamplingAllocationStrategy is enabled.");
            } else {
                DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE = returnPermitsSamplingRate;
            }
        }
    }
}

