/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.transport;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.DecoderException;
import io.netty.util.AttributeKey;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.netty.ChannelBindException;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.DisposableServer;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.transport.AddressUtils;
import reactor.netty.transport.ServerTransportConfig;
import reactor.netty.transport.Transport;
import reactor.netty.transport.TransportConfig;
import reactor.netty.transport.TransportConnector;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

public abstract class ServerTransport<T extends ServerTransport<T, CONF>, CONF extends ServerTransportConfig<CONF>>
extends Transport<T, CONF> {
    static final Logger log = Loggers.getLogger(ServerTransport.class);

    public Mono<? extends DisposableServer> bind() {
        ServerTransportConfig config = (ServerTransportConfig)this.configuration();
        Objects.requireNonNull(config.bindAddress(), "bindAddress");
        Mono mono = Mono.create(sink -> {
            DisposableBind disposableServer;
            InetSocketAddress localInet;
            SocketAddress local = Objects.requireNonNull(config.bindAddress().get(), "Bind Address supplier returned null");
            if (local instanceof InetSocketAddress && (localInet = (InetSocketAddress)local).isUnresolved()) {
                local = AddressUtils.createResolved(localInet.getHostName(), localInet.getPort());
            }
            boolean isDomainSocket = false;
            if (local instanceof DomainSocketAddress) {
                isDomainSocket = true;
                disposableServer = new UdsDisposableBind((MonoSink<DisposableServer>)sink, config, local);
            } else {
                disposableServer = new InetDisposableBind((MonoSink<DisposableServer>)sink, config, local);
            }
            ChildObserver childObs = new ChildObserver(config.defaultChildObserver().then(config.childObserver()));
            Acceptor acceptor = new Acceptor(config.childEventLoopGroup(), config.channelInitializer(childObs, null, true), config.childOptions, config.childAttrs, isDomainSocket);
            TransportConnector.bind(config, new AcceptorInitializer(acceptor), local, isDomainSocket).subscribe(disposableServer);
        });
        if (config.doOnBind() != null) {
            mono = mono.doOnSubscribe(s -> config.doOnBind().accept(config));
        }
        return mono;
    }

    public final DisposableServer bindNow() {
        return this.bindNow(Duration.ofSeconds(45L));
    }

    public final DisposableServer bindNow(Duration timeout) {
        Objects.requireNonNull(timeout, "timeout");
        try {
            return Objects.requireNonNull(this.bind().block(timeout), "aborted");
        }
        catch (IllegalStateException e) {
            if (e.getMessage().contains("blocking read")) {
                throw new IllegalStateException(this.getClass().getSimpleName() + " couldn't be started within " + timeout.toMillis() + "ms");
            }
            throw e;
        }
    }

    public final void bindUntilJavaShutdown(Duration timeout, @Nullable Consumer<DisposableServer> onStart) {
        Objects.requireNonNull(timeout, "timeout");
        DisposableServer facade = Objects.requireNonNull(this.bindNow(), "facade");
        if (onStart != null) {
            onStart.accept(facade);
        }
        Runtime.getRuntime().addShutdownHook(new Thread(() -> facade.disposeNow(timeout)));
        facade.onDispose().block();
    }

    public <A> T childAttr(AttributeKey<A> key, @Nullable A value) {
        Objects.requireNonNull(key, "key");
        ServerTransport dup = (ServerTransport)this.duplicate();
        ((ServerTransportConfig)dup.configuration()).childAttrs = TransportConfig.updateMap(((ServerTransportConfig)this.configuration()).childAttrs, key, value);
        return (T)dup;
    }

    public T childObserve(ConnectionObserver observer) {
        Objects.requireNonNull(observer, "observer");
        ServerTransport dup = (ServerTransport)this.duplicate();
        ConnectionObserver current = ((ServerTransportConfig)this.configuration()).childObserver;
        ((ServerTransportConfig)dup.configuration()).childObserver = current == null ? observer : current.then(observer);
        return (T)dup;
    }

    public <A> T childOption(ChannelOption<A> key, @Nullable A value) {
        Objects.requireNonNull(key, "key");
        if (ChannelOption.AUTO_READ == key) {
            if (value instanceof Boolean && Boolean.TRUE.equals(value)) {
                log.error("ChannelOption.AUTO_READ is configured to be [false], it cannot be set to [true]");
            }
            ServerTransport dup = this;
            return (T)dup;
        }
        ServerTransport dup = (ServerTransport)this.duplicate();
        ((ServerTransportConfig)dup.configuration()).childOptions = TransportConfig.updateMap(((ServerTransportConfig)this.configuration()).childOptions, key, value);
        return (T)dup;
    }

    public T doOnBind(Consumer<? super CONF> doOnBind) {
        Objects.requireNonNull(doOnBind, "doOnBind");
        ServerTransport dup = (ServerTransport)this.duplicate();
        Consumer current = ((ServerTransportConfig)this.configuration()).doOnBind;
        ((ServerTransportConfig)dup.configuration()).doOnBind = current == null ? doOnBind : current.andThen(doOnBind);
        return (T)dup;
    }

    public T doOnBound(Consumer<? super DisposableServer> doOnBound) {
        Objects.requireNonNull(doOnBound, "doOnBound");
        ServerTransport dup = (ServerTransport)this.duplicate();
        Consumer<? super DisposableServer> current = ((ServerTransportConfig)this.configuration()).doOnBound;
        ((ServerTransportConfig)dup.configuration()).doOnBound = current == null ? doOnBound : current.andThen(doOnBound);
        return (T)dup;
    }

    public T doOnConnection(Consumer<? super Connection> doOnConnection) {
        Objects.requireNonNull(doOnConnection, "doOnConnected");
        ServerTransport dup = (ServerTransport)this.duplicate();
        Consumer<? super Connection> current = ((ServerTransportConfig)this.configuration()).doOnConnection;
        ((ServerTransportConfig)dup.configuration()).doOnConnection = current == null ? doOnConnection : current.andThen(doOnConnection);
        return (T)dup;
    }

    public T doOnUnbound(Consumer<? super DisposableServer> doOnUnbound) {
        Objects.requireNonNull(doOnUnbound, "doOnUnbound");
        ServerTransport dup = (ServerTransport)this.duplicate();
        Consumer<? super DisposableServer> current = ((ServerTransportConfig)this.configuration()).doOnUnbound;
        ((ServerTransportConfig)dup.configuration()).doOnUnbound = current == null ? doOnUnbound : current.andThen(doOnUnbound);
        return (T)dup;
    }

    public T host(String host) {
        Objects.requireNonNull(host, "host");
        return (T)((ServerTransport)this.bindAddress(() -> AddressUtils.updateHost(((ServerTransportConfig)this.configuration()).bindAddress(), host)));
    }

    public T port(int port) {
        return (T)((ServerTransport)this.bindAddress(() -> AddressUtils.updatePort(((ServerTransportConfig)this.configuration()).bindAddress(), port)));
    }

    public Mono<Void> warmup() {
        return Mono.fromRunnable(() -> {
            ((ServerTransportConfig)this.configuration()).childEventLoopGroup();
            ((ServerTransportConfig)this.configuration()).eventLoopGroup();
        });
    }

    static final class UdsDisposableBind
    extends DisposableBind {
        UdsDisposableBind(MonoSink<DisposableServer> sink, TransportConfig config, SocketAddress bindAddress) {
            super(sink, config, bindAddress);
        }

        @Override
        public DomainSocketAddress address() {
            return (DomainSocketAddress)this.channel().localAddress();
        }

        @Override
        public String path() {
            return this.address().path();
        }
    }

    static final class InetDisposableBind
    extends DisposableBind {
        InetDisposableBind(MonoSink<DisposableServer> sink, TransportConfig config, SocketAddress bindAddress) {
            super(sink, config, bindAddress);
        }

        @Override
        public InetSocketAddress address() {
            return (InetSocketAddress)this.channel().localAddress();
        }

        @Override
        public String host() {
            return this.address().getHostString();
        }

        @Override
        public int port() {
            return this.address().getPort();
        }
    }

    static class DisposableBind
    implements CoreSubscriber<Channel>,
    DisposableServer,
    Connection {
        final MonoSink<DisposableServer> sink;
        final Context currentContext;
        final TransportConfig config;
        final SocketAddress bindAddress;
        Channel channel;
        Subscription subscription;

        DisposableBind(MonoSink<DisposableServer> sink, TransportConfig config, SocketAddress bindAddress) {
            this.sink = sink;
            this.currentContext = Context.of(sink.contextView());
            this.config = config;
            this.bindAddress = bindAddress;
        }

        @Override
        public Channel channel() {
            return this.channel;
        }

        @Override
        public Context currentContext() {
            return this.currentContext;
        }

        @Override
        public final void dispose() {
            if (this.channel != null) {
                if (this.channel.isActive()) {
                    this.channel.close();
                    LoopResources loopResources = this.config.loopResources();
                    if (loopResources instanceof ConnectionProvider) {
                        ((ConnectionProvider)((Object)loopResources)).disposeWhen(this.bindAddress);
                    }
                }
            } else {
                this.subscription.cancel();
            }
        }

        @Override
        public void disposeNow(Duration timeout) {
            if (this.isDisposed()) {
                return;
            }
            this.dispose();
            Mono<Object> terminateSignals = Mono.empty();
            if (this.config.channelGroup != null) {
                ArrayList channels = new ArrayList();
                this.config.channelGroup.forEach(channel -> {
                    ChannelOperations<?, ?> ops = ChannelOperations.get(channel);
                    if (ops != null) {
                        channels.add(ops.onTerminate().doFinally(sig -> ops.dispose()));
                    } else {
                        channel.close();
                    }
                });
                if (!channels.isEmpty()) {
                    terminateSignals = Mono.when(channels);
                }
            }
            try {
                this.onDispose().then(terminateSignals).block(timeout);
            }
            catch (IllegalStateException e) {
                if (e.getMessage().contains("blocking read")) {
                    throw new IllegalStateException("Socket couldn't be stopped within " + timeout.toMillis() + "ms");
                }
                throw e;
            }
        }

        @Override
        public void onComplete() {
        }

        @Override
        public void onError(Throwable t) {
            this.sink.error(ChannelBindException.fail(this.bindAddress, t));
        }

        @Override
        public void onNext(Channel channel) {
            this.channel = channel;
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(channel, "Bound new server"));
            }
            this.sink.success(this);
            this.config.defaultConnectionObserver().then(this.config.connectionObserver()).onStateChange(this, ConnectionObserver.State.CONNECTED);
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (Operators.validate(this.subscription, s)) {
                this.subscription = s;
                this.sink.onCancel(this);
                s.request(Long.MAX_VALUE);
            }
        }
    }

    static final class ChildObserver
    implements ConnectionObserver {
        final ConnectionObserver childObs;

        ChildObserver(ConnectionObserver childObs) {
            this.childObs = childObs;
        }

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            ChannelOperations<?, ?> ops = ChannelOperations.get(connection.channel());
            if (ops == null && (error instanceof IOException || AbortedException.isConnectionReset(error) || error instanceof DecoderException)) {
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(connection.channel(), "onUncaughtException(" + connection + ")"), error);
                }
            } else {
                log.error(ReactorNetty.format(connection.channel(), "onUncaughtException(" + connection + ")"), error);
            }
            connection.dispose();
        }

        @Override
        public void onStateChange(Connection connection, ConnectionObserver.State newState) {
            if (newState == ConnectionObserver.State.DISCONNECTING && connection.channel().isActive() && !connection.isPersistent()) {
                connection.dispose();
            }
            this.childObs.onStateChange(connection, newState);
        }
    }

    static final class AcceptorInitializer
    extends ChannelInitializer<Channel> {
        final Acceptor acceptor;

        AcceptorInitializer(Acceptor acceptor) {
            this.acceptor = acceptor;
        }

        @Override
        public void initChannel(Channel ch) {
            ch.eventLoop().execute(() -> ch.pipeline().addLast(this.acceptor));
        }
    }

    static class Acceptor
    extends ChannelInboundHandlerAdapter {
        final EventLoopGroup childGroup;
        final ChannelHandler childHandler;
        final Map<ChannelOption<?>, ?> childOptions;
        final Map<AttributeKey<?>, ?> childAttrs;
        final boolean isDomainSocket;
        Runnable enableAutoReadTask;

        Acceptor(EventLoopGroup childGroup, ChannelHandler childHandler, Map<ChannelOption<?>, ?> childOptions, Map<AttributeKey<?>, ?> childAttrs, boolean isDomainSocket) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;
            this.isDomainSocket = isDomainSocket;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            Channel child = (Channel)msg;
            child.pipeline().addLast(this.childHandler);
            TransportConnector.setChannelOptions(child, this.childOptions, this.isDomainSocket);
            TransportConnector.setAttributes(child, this.childAttrs);
            try {
                this.childGroup.register(child).addListener(future -> {
                    if (!future.isSuccess()) {
                        Acceptor.forceClose(child, future.cause());
                    }
                });
            }
            catch (Throwable t) {
                Acceptor.forceClose(child, t);
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ChannelConfig config = ctx.channel().config();
            if (config.isAutoRead()) {
                config.setAutoRead(false);
                ctx.channel().eventLoop().schedule(this.enableAutoReadTask, 1L, TimeUnit.SECONDS).addListener(future -> {
                    if (!future.isSuccess() && log.isDebugEnabled()) {
                        log.debug(ReactorNetty.format(ctx.channel(), "Cannot enable auto-read"), future.cause());
                    }
                });
            }
        }

        void enableAutoReadTask(Channel channel) {
            this.enableAutoReadTask = () -> channel.config().setAutoRead(true);
        }

        static void forceClose(Channel child, Throwable t) {
            child.unsafe().closeForcibly();
            if (log.isWarnEnabled()) {
                log.warn(ReactorNetty.format(child, "Failed to register an accepted channel: {}"), child, t);
            }
        }
    }
}

