/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.http.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.lang.invoke.LambdaMetafactory;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Queue;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientState;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
import reactor.netty.internal.shaded.reactor.pool.PooledRef;
import reactor.netty.internal.shaded.reactor.pool.PooledRefMetadata;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.PooledConnectionProvider;
import reactor.netty.transport.TransportConfig;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

final class Http2ConnectionProvider
extends PooledConnectionProvider<Connection> {
    final ConnectionProvider parent;
    static final Logger log = Loggers.getLogger(Http2ConnectionProvider.class);
    static final AttributeKey<ConnectionObserver> OWNER = AttributeKey.valueOf("http2ConnectionOwner");

    Http2ConnectionProvider(ConnectionProvider parent, ConnectionProvider.Builder builder) {
        super(builder);
        this.parent = parent;
    }

    @Override
    protected CoreSubscriber<PooledRef<Connection>> createDisposableAcquire(TransportConfig config, ConnectionObserver connectionObserver, long pendingAcquireTimeout, InstrumentedPool<Connection> pool, MonoSink<Connection> sink) {
        boolean acceptGzip = config instanceof HttpClientConfig && ((HttpClientConfig)config).acceptGzip;
        return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(), acceptGzip, pendingAcquireTimeout, pool, sink);
    }

    @Override
    protected InstrumentedPool<Connection> createPool(TransportConfig config, PooledConnectionProvider.PoolFactory<Connection> poolFactory, SocketAddress remoteAddress, AddressResolverGroup<?> resolverGroup) {
        return new PooledConnectionAllocator((ConnectionProvider)this.parent, (TransportConfig)config, poolFactory, (Supplier<SocketAddress>)LambdaMetafactory.metafactory(null, null, null, ()Ljava/lang/Object;, lambda$createPool$0(java.net.SocketAddress ), ()Ljava/net/SocketAddress;)((SocketAddress)remoteAddress), resolverGroup).pool;
    }

    static void invalidate(@Nullable ConnectionObserver owner, Channel channel) {
        if (owner instanceof DisposableAcquire) {
            DisposableAcquire da = (DisposableAcquire)owner;
            da.pooledRef.invalidate().subscribe(null, null, () -> {
                if (log.isDebugEnabled()) {
                    Http2ConnectionProvider.logPoolState(channel, da.pool, "Channel removed from the pool");
                }
            });
        }
    }

    static void logStreamsState(Channel channel, Http2Connection.Endpoint<Http2LocalFlowController> localEndpoint, String msg) {
        log.debug(ReactorNetty.format(channel, "{}, now: {} active streams and {} max active streams."), msg, localEndpoint.numActiveStreams(), localEndpoint.maxActiveStreams());
    }

    static void registerClose(Channel channel) {
        ConnectionObserver owner = channel.attr(OWNER).get();
        channel.closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> {
            Channel parent = channel.parent();
            Http2FrameCodec frameCodec = parent.pipeline().get(Http2FrameCodec.class);
            Http2Connection.Endpoint<Http2LocalFlowController> localEndpoint = frameCodec.connection().local();
            if (log.isDebugEnabled()) {
                Http2ConnectionProvider.logStreamsState(channel, localEndpoint, "Stream closed");
            }
            if (localEndpoint.numActiveStreams() == 0) {
                channel.attr(OWNER).set(null);
                Http2ConnectionProvider.invalidate(owner, parent);
            }
        }));
    }

    private static /* synthetic */ SocketAddress lambda$createPool$0(SocketAddress remoteAddress) {
        return remoteAddress;
    }

    static final class PooledConnectionAllocator {
        final ConnectionProvider parent;
        final HttpClientConfig config;
        final InstrumentedPool<Connection> pool;
        final Supplier<SocketAddress> remoteAddress;
        final AddressResolverGroup<?> resolver;
        static final BiPredicate<Connection, PooledRefMetadata> DEFAULT_EVICTION_PREDICATE = (connection, metadata) -> !connection.channel().isActive() || !connection.isPersistent();
        static final Function<Connection, Publisher<Void>> DEFAULT_DESTROY_HANDLER = connection -> {
            Http2FrameCodec frameCodec;
            Channel channel = connection.channel();
            if (channel.isActive() && (frameCodec = channel.pipeline().get(Http2FrameCodec.class)) != null && frameCodec.connection().local().numActiveStreams() == 0) {
                ChannelOperations ops = connection.as(ChannelOperations.class);
                if (ops != null) {
                    ops.listener().onStateChange(ops, ConnectionObserver.State.DISCONNECTING);
                } else if (connection instanceof ConnectionObserver) {
                    ((ConnectionObserver)((Object)connection)).onStateChange((Connection)connection, ConnectionObserver.State.DISCONNECTING);
                } else {
                    connection.dispose();
                }
            }
            return Mono.empty();
        };

        PooledConnectionAllocator(ConnectionProvider parent, TransportConfig config, PooledConnectionProvider.PoolFactory<Connection> poolFactory, Supplier<SocketAddress> remoteAddress, AddressResolverGroup<?> resolver) {
            this.parent = parent;
            this.config = (HttpClientConfig)config;
            this.remoteAddress = remoteAddress;
            this.resolver = resolver;
            this.pool = poolFactory.newPool(this.connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE);
        }

        Publisher<Connection> connectChannel() {
            return this.parent.acquire(this.config, new DelegatingConnectionObserver(), this.remoteAddress, this.resolver).map(conn -> {
                if (log.isDebugEnabled()) {
                    Http2ConnectionProvider.logPoolState(conn.channel(), this.pool, "Channel acquired from the parent pool");
                }
                return conn;
            });
        }
    }

    static final class PendingConnectionObserver
    implements ConnectionObserver {
        final Queue<Pending> pendingQueue = Queues.unbounded(4).get();

        PendingConnectionObserver() {
        }

        @Override
        public void onStateChange(Connection connection, ConnectionObserver.State newState) {
            this.pendingQueue.add(new Pending(connection, null, newState));
        }

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            this.pendingQueue.add(new Pending(connection, error, null));
        }

        static class Pending {
            final Connection connection;
            final Throwable error;
            final ConnectionObserver.State state;

            Pending(Connection connection, @Nullable Throwable error, @Nullable ConnectionObserver.State state) {
                this.connection = connection;
                this.error = error;
                this.state = state;
            }
        }
    }

    static final class DisposableAcquire
    implements CoreSubscriber<PooledRef<Connection>>,
    ConnectionObserver,
    Disposable,
    GenericFutureListener<Future<Http2StreamChannel>> {
        final Disposable.Composite cancellations;
        final ConnectionObserver obs;
        final ChannelOperations.OnSetup opsFactory;
        final boolean acceptGzip;
        final long pendingAcquireTimeout;
        final InstrumentedPool<Connection> pool;
        final boolean retried;
        final MonoSink<Connection> sink;
        PooledRef<Connection> pooledRef;
        Subscription subscription;

        DisposableAcquire(ConnectionObserver obs, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, long pendingAcquireTimeout, InstrumentedPool<Connection> pool, MonoSink<Connection> sink) {
            this.cancellations = Disposables.composite();
            this.obs = obs;
            this.opsFactory = opsFactory;
            this.acceptGzip = acceptGzip;
            this.pendingAcquireTimeout = pendingAcquireTimeout;
            this.pool = pool;
            this.retried = false;
            this.sink = sink;
        }

        DisposableAcquire(DisposableAcquire parent) {
            this.cancellations = parent.cancellations;
            this.obs = parent.obs;
            this.opsFactory = parent.opsFactory;
            this.acceptGzip = parent.acceptGzip;
            this.pendingAcquireTimeout = parent.pendingAcquireTimeout;
            this.pool = parent.pool;
            this.retried = true;
            this.sink = parent.sink;
        }

        @Override
        public Context currentContext() {
            return this.sink.currentContext();
        }

        @Override
        public void dispose() {
            this.subscription.cancel();
        }

        @Override
        public void onComplete() {
        }

        @Override
        public void onError(Throwable t) {
            this.sink.error(t);
        }

        @Override
        public void onNext(PooledRef<Connection> pooledRef) {
            ConnectionObserver current;
            this.pooledRef = pooledRef;
            Channel channel = pooledRef.poolable().channel();
            if (log.isDebugEnabled()) {
                Http2ConnectionProvider.logPoolState(channel, this.pool, "Channel activated");
            }
            if ((current = channel.attr(OWNER).getAndSet(this)) instanceof PendingConnectionObserver) {
                PendingConnectionObserver.Pending p;
                PendingConnectionObserver pending = (PendingConnectionObserver)current;
                while ((p = pending.pendingQueue.poll()) != null) {
                    if (p.error != null) {
                        this.onUncaughtException(p.connection, p.error);
                        continue;
                    }
                    if (p.state == null) continue;
                    this.onStateChange(p.connection, p.state);
                }
            }
            if (this.notHttp2()) {
                return;
            }
            if (this.isH2cUpgrade()) {
                return;
            }
            HttpClientConfig.openStream(channel, this.obs, this.opsFactory, this.acceptGzip).addListener(this);
        }

        @Override
        public void onStateChange(Connection connection, ConnectionObserver.State newState) {
            if (newState == HttpClientState.UPGRADE_SUCCESSFUL || newState == ConnectionObserver.State.DISCONNECTING) {
                DisposableAcquire.release(connection.channel());
            } else if (newState == HttpClientState.UPGRADE_REJECTED) {
                Http2ConnectionProvider.invalidate(connection.channel().attr(OWNER).get(), connection.channel());
            }
            this.obs.onStateChange(connection, newState);
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (Operators.validate(this.subscription, s)) {
                this.subscription = s;
                this.cancellations.add(this);
                if (!this.retried) {
                    this.sink.onCancel(this.cancellations);
                }
                s.request(Long.MAX_VALUE);
            }
        }

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            this.obs.onUncaughtException(connection, error);
        }

        @Override
        public void operationComplete(Future<Http2StreamChannel> future) {
            Channel channel = this.pooledRef.poolable().channel();
            Http2FrameCodec frameCodec = channel.pipeline().get(Http2FrameCodec.class);
            if (future.isSuccess()) {
                Http2StreamChannel ch = future.getNow();
                if (!frameCodec.connection().local().canOpenStream()) {
                    if (!this.retried) {
                        if (log.isDebugEnabled()) {
                            log.debug(ReactorNetty.format(ch, "Immediately aborted pooled channel, max active streams is reached, re-acquiring a new channel"));
                        }
                        this.pool.acquire(Duration.ofMillis(this.pendingAcquireTimeout)).subscribe(new DisposableAcquire(this));
                    } else {
                        this.sink.error(new IOException("Error while acquiring from " + this.pool + ". Max active streams is reached."));
                    }
                } else {
                    ChannelOperations<?, ?> ops = ChannelOperations.get(ch);
                    if (ops != null) {
                        this.obs.onStateChange(ops, HttpClientState.STREAM_CONFIGURED);
                        this.sink.success(ops);
                    }
                    Http2Connection.Endpoint<Http2LocalFlowController> localEndpoint = frameCodec.connection().local();
                    if (log.isDebugEnabled()) {
                        Http2ConnectionProvider.logStreamsState(ch, localEndpoint, "Stream opened");
                    }
                }
            } else {
                this.sink.error(future.cause());
            }
            DisposableAcquire.release(this, channel);
        }

        boolean isH2cUpgrade() {
            ChannelOperations<?, ?> ops;
            Channel channel = this.pooledRef.poolable().channel();
            if (channel.pipeline().get("reactor.left.h2cUpgradeHandler") != null && channel.pipeline().get("reactor.left.h2MultiplexHandler") == null && (ops = ChannelOperations.get(channel)) != null) {
                this.sink.success(ops);
                return true;
            }
            return false;
        }

        boolean notHttp2() {
            ChannelOperations<?, ?> ops;
            Channel channel = this.pooledRef.poolable().channel();
            ChannelPipeline pipeline = channel.pipeline();
            SslHandler handler = pipeline.get(SslHandler.class);
            if (handler != null) {
                String protocol;
                String string = protocol = handler.applicationProtocol() != null ? handler.applicationProtocol() : "http/1.1";
                if ("http/1.1".equals(protocol)) {
                    ChannelOperations<?, ?> ops2 = ChannelOperations.get(channel);
                    if (ops2 != null) {
                        this.sink.success(ops2);
                        Http2ConnectionProvider.invalidate(this, channel);
                        return true;
                    }
                } else if (!"h2".equals(handler.applicationProtocol())) {
                    channel.attr(OWNER).set(null);
                    Http2ConnectionProvider.invalidate(this, channel);
                    this.sink.error(new IOException("Unknown protocol [" + protocol + "]."));
                    return true;
                }
            } else if (pipeline.get("reactor.left.h2cUpgradeHandler") == null && pipeline.get("reactor.left.h2MultiplexHandler") == null && (ops = ChannelOperations.get(channel)) != null) {
                this.sink.success(ops);
                Http2ConnectionProvider.invalidate(this, channel);
                return true;
            }
            return false;
        }

        static void release(Channel channel) {
            DisposableAcquire.release(channel.attr(OWNER).get(), channel);
        }

        static void release(@Nullable ConnectionObserver owner, Channel channel) {
            if (owner instanceof DisposableAcquire) {
                DisposableAcquire da = (DisposableAcquire)owner;
                da.pooledRef.release().subscribe(null, null, () -> {
                    if (log.isDebugEnabled()) {
                        Http2ConnectionProvider.logPoolState(channel, da.pool, "Channel deactivated");
                    }
                });
            }
        }
    }

    static final class DelegatingConnectionObserver
    implements ConnectionObserver {
        DelegatingConnectionObserver() {
        }

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            this.owner(connection.channel()).onUncaughtException(connection, error);
        }

        @Override
        public void onStateChange(Connection connection, ConnectionObserver.State newState) {
            this.owner(connection.channel()).onStateChange(connection, newState);
        }

        ConnectionObserver owner(Channel channel) {
            ConnectionObserver obs;
            do {
                if ((obs = channel.attr(OWNER).get()) != null) {
                    return obs;
                }
                obs = new PendingConnectionObserver();
            } while (!channel.attr(OWNER).compareAndSet(null, obs));
            return obs;
        }
    }
}

