/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.resources;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.ReactorNetty;
import reactor.ipc.netty.resources.PoolResources;
import reactor.util.Logger;
import reactor.util.Loggers;

final class DefaultPoolResources
implements PoolResources {
    final ConcurrentMap<SocketAddressHolder, Pool> channelPools;
    final String name;
    final PoolFactory provider;
    static final Logger log = Loggers.getLogger(DefaultPoolResources.class);
    static final AttributeKey<Boolean> CLOSE_HANDLER_ADDED = AttributeKey.valueOf((String)"closeHandlerAdded");

    DefaultPoolResources(String name, PoolFactory provider) {
        this.name = name;
        this.provider = provider;
        this.channelPools = PlatformDependent.newConcurrentHashMap();
    }

    @Override
    public ChannelPool selectOrCreate(SocketAddress remote, Supplier<? extends Bootstrap> bootstrap, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) {
        SocketAddressHolder holder = new SocketAddressHolder(remote);
        Pool pool;
        while ((pool = (Pool)this.channelPools.get(holder)) == null) {
            if (log.isDebugEnabled()) {
                log.debug("New {} client pool for {}", new Object[]{this.name, remote});
            }
            if (this.channelPools.putIfAbsent(holder, pool = new Pool(bootstrap.get().remoteAddress(remote), this.provider, onChannelCreate, group)) == null) {
                return pool;
            }
            pool.close();
        }
        return pool;
    }

    public String toString() {
        return "DefaultPoolResources {name=" + this.name + ", provider=" + this.provider + '}';
    }

    @Override
    public void dispose() {
        this.disposeLater().subscribe();
    }

    @Override
    public Mono<Void> disposeLater() {
        return Mono.fromRunnable(() -> {
            for (SocketAddressHolder key : this.channelPools.keySet()) {
                Pool pool = (Pool)this.channelPools.remove(key);
                if (pool == null) continue;
                pool.close();
            }
        });
    }

    public boolean isDisposed() {
        return this.channelPools.isEmpty() || this.channelPools.values().stream().allMatch(AtomicBoolean::get);
    }

    static final class SocketAddressHolder {
        final SocketAddress holder;
        final String fqdn;

        SocketAddressHolder(SocketAddress holder) {
            this.holder = holder;
            this.fqdn = holder instanceof InetSocketAddress ? holder.toString() : null;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SocketAddressHolder that = (SocketAddressHolder)o;
            return this.holder.equals(that.holder) && (this.fqdn != null ? this.fqdn.equals(that.fqdn) : that.fqdn == null);
        }

        public int hashCode() {
            int result = this.holder.hashCode();
            result = 31 * result + (this.fqdn != null ? this.fqdn.hashCode() : 0);
            return result;
        }
    }

    static final class Pool
    extends AtomicBoolean
    implements ChannelPoolHandler,
    ChannelPool,
    ChannelHealthChecker,
    GenericFutureListener<Future<Channel>> {
        final ChannelPool pool;
        final Consumer<? super Channel> onChannelCreate;
        final EventLoopGroup defaultGroup;
        final AtomicInteger activeConnections = new AtomicInteger();
        final AtomicInteger inactiveConnections = new AtomicInteger();
        final Future<Boolean> HEALTHY;
        final Future<Boolean> UNHEALTHY;

        Pool(Bootstrap bootstrap, PoolFactory provider, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) {
            this.pool = provider.newPool(bootstrap, this, this);
            this.onChannelCreate = onChannelCreate;
            this.defaultGroup = group;
            this.HEALTHY = group.next().newSucceededFuture((Object)true);
            this.UNHEALTHY = group.next().newSucceededFuture((Object)false);
        }

        public Future<Boolean> isHealthy(Channel channel) {
            return channel.isActive() ? this.HEALTHY : this.UNHEALTHY;
        }

        public Future<Channel> acquire() {
            return this.acquire((Promise<Channel>)this.defaultGroup.next().newPromise());
        }

        public Future<Channel> acquire(Promise<Channel> promise) {
            return this.pool.acquire(promise).addListener((GenericFutureListener)this);
        }

        public void operationComplete(Future<Channel> future) throws Exception {
            if (future.isSuccess()) {
                Channel c = (Channel)future.get();
                this.activeConnections.incrementAndGet();
                this.inactiveConnections.decrementAndGet();
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(c, "Channel acquired, now {} active connections and {} inactive connections"), new Object[]{this.activeConnections, this.inactiveConnections});
                }
                if (c.attr(CLOSE_HANDLER_ADDED).setIfAbsent((Object)true) == null) {
                    if (log.isDebugEnabled()) {
                        log.debug(ReactorNetty.format(c, "Registering close event to pool release"));
                    }
                    c.closeFuture().addListener(ff -> {
                        this.pool.release(c);
                        this.inactiveConnections.decrementAndGet();
                        if (log.isDebugEnabled()) {
                            log.debug(ReactorNetty.format(c, "Channel closed, now {} active connections and {} inactive connections"), new Object[]{this.activeConnections, this.inactiveConnections});
                        }
                    });
                }
            } else {
                this.inactiveConnections.decrementAndGet();
                if (log.isDebugEnabled()) {
                    log.debug("Cannot acquire channel", future.cause());
                }
            }
        }

        public Future<Void> release(Channel channel) {
            return this.pool.release(channel);
        }

        public Future<Void> release(Channel channel, Promise<Void> promise) {
            return this.pool.release(channel, promise);
        }

        public void close() {
            if (this.compareAndSet(false, true)) {
                this.pool.close();
            }
        }

        public void channelReleased(Channel ch) throws Exception {
            this.activeConnections.decrementAndGet();
            this.inactiveConnections.incrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(ch, "Channel released, now {} active connections and {} inactive connections"), new Object[]{this.activeConnections, this.inactiveConnections});
            }
        }

        public void channelAcquired(Channel ch) throws Exception {
        }

        public void channelCreated(Channel ch) throws Exception {
            this.inactiveConnections.incrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(ch, "Created new pooled channel, now {} active connections and {} inactive connections"), new Object[]{this.activeConnections, this.inactiveConnections});
            }
            if (this.onChannelCreate != null) {
                this.onChannelCreate.accept((Channel)ch);
            }
        }

        @Override
        public String toString() {
            return this.pool.getClass().getSimpleName() + '{' + "activeConnections=" + this.activeConnections + ", inactiveConnections=" + this.inactiveConnections + '}';
        }
    }

    static interface PoolFactory {
        public ChannelPool newPool(Bootstrap var1, ChannelPoolHandler var2, ChannelHealthChecker var3);
    }
}

