/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport.netty4;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.io.netty.bootstrap.Bootstrap;
import org.elasticsearch.io.netty.bootstrap.ServerBootstrap;
import org.elasticsearch.io.netty.channel.AdaptiveRecvByteBufAllocator;
import org.elasticsearch.io.netty.channel.Channel;
import org.elasticsearch.io.netty.channel.ChannelFuture;
import org.elasticsearch.io.netty.channel.ChannelFutureListener;
import org.elasticsearch.io.netty.channel.ChannelHandler;
import org.elasticsearch.io.netty.channel.ChannelHandlerContext;
import org.elasticsearch.io.netty.channel.ChannelInitializer;
import org.elasticsearch.io.netty.channel.ChannelOption;
import org.elasticsearch.io.netty.channel.FixedRecvByteBufAllocator;
import org.elasticsearch.io.netty.channel.RecvByteBufAllocator;
import org.elasticsearch.io.netty.channel.nio.NioEventLoopGroup;
import org.elasticsearch.io.netty.channel.oio.OioEventLoopGroup;
import org.elasticsearch.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.elasticsearch.io.netty.channel.socket.nio.NioSocketChannel;
import org.elasticsearch.io.netty.channel.socket.oio.OioServerSocketChannel;
import org.elasticsearch.io.netty.channel.socket.oio.OioSocketChannel;
import org.elasticsearch.io.netty.util.concurrent.Future;
import org.elasticsearch.io.netty.util.concurrent.GenericFutureListener;
import org.elasticsearch.log4j.message.ParameterizedMessage;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty4.Netty4MessageChannelHandler;
import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler;
import org.elasticsearch.transport.netty4.Netty4SizeHeaderFrameDecoder;
import org.elasticsearch.transport.netty4.Netty4Utils;

public class Netty4Transport
extends TcpTransport<Channel> {
    public static final Setting<Integer> WORKER_COUNT;
    public static final Setting<ByteSizeValue> NETTY_MAX_CUMULATION_BUFFER_CAPACITY;
    public static final Setting<Integer> NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS;
    public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_SIZE;
    public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MIN;
    public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MAX;
    public static final Setting<Integer> NETTY_BOSS_COUNT;
    protected final ByteSizeValue maxCumulationBufferCapacity;
    protected final int maxCompositeBufferComponents;
    protected final RecvByteBufAllocator recvByteBufAllocator;
    protected final int workerCount;
    protected final ByteSizeValue receivePredictorMin;
    protected final ByteSizeValue receivePredictorMax;
    volatile Netty4OpenChannelsHandler serverOpenChannels;
    protected volatile Bootstrap bootstrap;
    protected final Map<String, ServerBootstrap> serverBootstraps = ConcurrentCollections.newConcurrentMap();

    public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
        super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
        this.workerCount = WORKER_COUNT.get(settings);
        this.maxCumulationBufferCapacity = NETTY_MAX_CUMULATION_BUFFER_CAPACITY.get(settings);
        this.maxCompositeBufferComponents = NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
        this.receivePredictorMin = NETTY_RECEIVE_PREDICTOR_MIN.get(settings);
        this.receivePredictorMax = NETTY_RECEIVE_PREDICTOR_MAX.get(settings);
        this.recvByteBufAllocator = this.receivePredictorMax.getBytes() == this.receivePredictorMin.getBytes() ? new FixedRecvByteBufAllocator((int)this.receivePredictorMax.getBytes()) : new AdaptiveRecvByteBufAllocator((int)this.receivePredictorMin.getBytes(), (int)this.receivePredictorMin.getBytes(), (int)this.receivePredictorMax.getBytes());
    }

    TransportServiceAdapter transportServiceAdapter() {
        return this.transportServiceAdapter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doStart() {
        boolean success = false;
        try {
            this.bootstrap = this.createBootstrap();
            if (NetworkService.NETWORK_SERVER.get(this.settings).booleanValue()) {
                Netty4OpenChannelsHandler openChannels;
                this.serverOpenChannels = openChannels = new Netty4OpenChannelsHandler(this.logger);
                for (Map.Entry<String, Settings> entry : this.buildProfileSettings().entrySet()) {
                    Settings settings = Settings.builder().put(this.createFallbackSettings()).put(entry.getValue()).build();
                    this.createServerBootstrap(entry.getKey(), settings);
                    this.bindServer(entry.getKey(), settings);
                }
            }
            super.doStart();
            success = true;
        }
        finally {
            if (!success) {
                this.doStop();
            }
        }
    }

    private Bootstrap createBootstrap() {
        ByteSizeValue tcpReceiveBufferSize;
        Bootstrap bootstrap = new Bootstrap();
        if (((Boolean)TCP_BLOCKING_CLIENT.get(this.settings)).booleanValue()) {
            bootstrap.group(new OioEventLoopGroup(1, EsExecutors.daemonThreadFactory(this.settings, "transport_client_worker")));
            bootstrap.channel(OioSocketChannel.class);
        } else {
            bootstrap.group(new NioEventLoopGroup(this.workerCount, EsExecutors.daemonThreadFactory(this.settings, "transport_client_boss")));
            bootstrap.channel(NioSocketChannel.class);
        }
        bootstrap.handler(this.getClientChannelInitializer());
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(this.defaultConnectionProfile.getConnectTimeout().millis()));
        bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(this.settings));
        bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(this.settings));
        ByteSizeValue tcpSendBufferSize = (ByteSizeValue)TCP_SEND_BUFFER_SIZE.get(this.settings);
        if (tcpSendBufferSize.getBytes() > 0L) {
            bootstrap.option(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
        }
        if ((tcpReceiveBufferSize = (ByteSizeValue)TCP_RECEIVE_BUFFER_SIZE.get(this.settings)).getBytes() > 0L) {
            bootstrap.option(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
        }
        bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, this.recvByteBufAllocator);
        boolean reuseAddress = (Boolean)TCP_REUSE_ADDRESS.get(this.settings);
        bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
        bootstrap.validate();
        return bootstrap;
    }

    private Settings createFallbackSettings() {
        ByteSizeValue fallbackTcpBufferSize;
        List<String> fallbackPublishHost;
        Settings.Builder fallbackSettingsBuilder = Settings.builder();
        List<String> fallbackBindHost = TransportSettings.BIND_HOST.get(this.settings);
        if (!fallbackBindHost.isEmpty()) {
            fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost);
        }
        if (!(fallbackPublishHost = TransportSettings.PUBLISH_HOST.get(this.settings)).isEmpty()) {
            fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost);
        }
        boolean fallbackTcpNoDelay = this.settings.getAsBoolean("transport.netty.tcp_no_delay", NetworkService.TcpSettings.TCP_NO_DELAY.get(this.settings));
        fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
        boolean fallbackTcpKeepAlive = this.settings.getAsBoolean("transport.netty.tcp_keep_alive", NetworkService.TcpSettings.TCP_KEEP_ALIVE.get(this.settings));
        fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive);
        boolean fallbackReuseAddress = this.settings.getAsBoolean("transport.netty.reuse_address", NetworkService.TcpSettings.TCP_REUSE_ADDRESS.get(this.settings));
        fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
        ByteSizeValue fallbackTcpSendBufferSize = this.settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", (ByteSizeValue)TCP_SEND_BUFFER_SIZE.get(this.settings));
        if (fallbackTcpSendBufferSize.getBytes() >= 0L) {
            fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize);
        }
        if ((fallbackTcpBufferSize = this.settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", (ByteSizeValue)TCP_RECEIVE_BUFFER_SIZE.get(this.settings))).getBytes() >= 0L) {
            fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize);
        }
        return fallbackSettingsBuilder.build();
    }

    private void createServerBootstrap(String name, Settings settings) {
        ByteSizeValue tcpReceiveBufferSize;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", name, this.workerCount, settings.get("port"), settings.get("bind_host"), settings.get("publish_host"), this.compress, this.defaultConnectionProfile.getConnectTimeout(), this.defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY), this.defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK), this.defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.REG), this.defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE), this.defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.PING), this.receivePredictorMin, this.receivePredictorMax);
        }
        ThreadFactory workerFactory = EsExecutors.daemonThreadFactory(this.settings, "transport_server_worker", name);
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        if (((Boolean)TCP_BLOCKING_SERVER.get(settings)).booleanValue()) {
            serverBootstrap.group(new OioEventLoopGroup(this.workerCount, workerFactory));
            serverBootstrap.channel(OioServerSocketChannel.class);
        } else {
            serverBootstrap.group(new NioEventLoopGroup(this.workerCount, workerFactory));
            serverBootstrap.channel(NioServerSocketChannel.class);
        }
        serverBootstrap.childHandler(this.getServerChannelInitializer(name, settings));
        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));
        ByteSizeValue tcpSendBufferSize = (ByteSizeValue)TCP_SEND_BUFFER_SIZE.getDefault(settings);
        if (tcpSendBufferSize != null && tcpSendBufferSize.getBytes() > 0L) {
            serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
        }
        if ((tcpReceiveBufferSize = (ByteSizeValue)TCP_RECEIVE_BUFFER_SIZE.getDefault(settings)) != null && tcpReceiveBufferSize.getBytes() > 0L) {
            serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.bytesAsInt()));
        }
        serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, this.recvByteBufAllocator);
        serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, this.recvByteBufAllocator);
        boolean reuseAddress = (Boolean)TCP_REUSE_ADDRESS.get(settings);
        serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
        serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
        serverBootstrap.validate();
        this.serverBootstraps.put(name, serverBootstrap);
    }

    protected ChannelHandler getServerChannelInitializer(String name, Settings settings) {
        return new ServerChannelInitializer(name, settings);
    }

    protected ChannelHandler getClientChannelInitializer() {
        return new ClientChannelInitializer();
    }

    protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
        Throwable t = unwrapped != null ? unwrapped : cause;
        this.onException(ctx.channel(), t instanceof Exception ? (Exception)t : new ElasticsearchException(t));
    }

    @Override
    public long serverOpen() {
        Netty4OpenChannelsHandler channels = this.serverOpenChannels;
        return channels == null ? 0L : channels.numberOfOpenChannels();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected TcpTransport.NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) {
        Channel[] channels = new Channel[profile.getNumConnections()];
        TcpTransport.NodeChannels nodeChannels = new TcpTransport.NodeChannels(node, channels, profile);
        boolean success = false;
        try {
            TimeValue connectTimeout;
            Bootstrap bootstrap;
            TimeValue defaultConnectTimeout = this.defaultConnectionProfile.getConnectTimeout();
            if (profile.getConnectTimeout() != null && !profile.getConnectTimeout().equals(defaultConnectTimeout)) {
                bootstrap = this.bootstrap.clone(this.bootstrap.config().group());
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(profile.getConnectTimeout().millis()));
                connectTimeout = profile.getConnectTimeout();
            } else {
                connectTimeout = defaultConnectTimeout;
                bootstrap = this.bootstrap;
            }
            ArrayList<ChannelFuture> connections = new ArrayList<ChannelFuture>(channels.length);
            InetSocketAddress address = ((InetSocketTransportAddress)node.getAddress()).address();
            for (int i = 0; i < channels.length; ++i) {
                connections.add(bootstrap.connect(address));
            }
            Iterator iterator = connections.iterator();
            try {
                for (int i = 0; i < channels.length; ++i) {
                    assert (iterator.hasNext());
                    ChannelFuture future = (ChannelFuture)iterator.next();
                    future.awaitUninterruptibly((long)((double)connectTimeout.millis() * 1.5));
                    if (!future.isSuccess()) {
                        throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", future.cause());
                    }
                    channels[i] = future.channel();
                    channels[i].closeFuture().addListener(new ChannelCloseListener(node));
                }
                assert (!iterator.hasNext()) : "not all created connection have been consumed";
            }
            catch (RuntimeException e) {
                for (ChannelFuture future : Collections.unmodifiableList(connections)) {
                    FutureUtils.cancel(future);
                    if (future.channel() == null || !future.channel().isOpen()) continue;
                    try {
                        future.channel().close();
                    }
                    catch (Exception inner) {
                        e.addSuppressed(inner);
                    }
                }
                throw e;
            }
            success = true;
        }
        finally {
            if (!success) {
                try {
                    nodeChannels.close();
                }
                catch (IOException e) {
                    this.logger.trace("exception while closing channels", (Throwable)e);
                }
            }
        }
        return nodeChannels;
    }

    @Override
    protected void sendMessage(Channel channel, BytesReference reference, Runnable sendListener) {
        ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference));
        future.addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> sendListener.run()));
    }

    @Override
    protected void closeChannels(List<Channel> channels) throws IOException {
        Netty4Utils.closeChannels(channels);
    }

    @Override
    protected InetSocketAddress getLocalAddress(Channel channel) {
        return (InetSocketAddress)channel.localAddress();
    }

    @Override
    protected Channel bind(String name, InetSocketAddress address) {
        return this.serverBootstraps.get(name).bind(address).syncUninterruptibly().channel();
    }

    TcpTransport.ScheduledPing getPing() {
        return this.scheduledPing;
    }

    @Override
    protected boolean isOpen(Channel channel) {
        return channel.isOpen();
    }

    @Override
    @SuppressForbidden(reason="debug")
    protected void stopInternal() {
        Releasables.close(this.serverOpenChannels, () -> {
            ArrayList serverBootstrapCloseFutures = new ArrayList(this.serverBootstraps.size());
            for (Map.Entry<String, ServerBootstrap> entry : this.serverBootstraps.entrySet()) {
                serverBootstrapCloseFutures.add(Tuple.tuple(entry.getKey(), entry.getValue().config().group().shutdownGracefully(0L, 5L, TimeUnit.SECONDS)));
            }
            for (Tuple tuple : serverBootstrapCloseFutures) {
                ((Future)tuple.v2()).awaitUninterruptibly();
                if (((Future)tuple.v2()).isSuccess()) continue;
                this.logger.debug(() -> new ParameterizedMessage("Error closing server bootstrap for profile [{}]", future.v1()), ((Future)tuple.v2()).cause());
            }
            this.serverBootstraps.clear();
            if (this.bootstrap != null) {
                this.bootstrap.config().group().shutdownGracefully(0L, 5L, TimeUnit.SECONDS).awaitUninterruptibly();
                this.bootstrap = null;
            }
        });
    }

    static {
        Netty4Utils.setup();
        WORKER_COUNT = new Setting<Integer>("transport.netty.worker_count", s -> Integer.toString(EsExecutors.boundedNumberOfProcessors(s) * 2), s -> Setting.parseInt(s, 1, "transport.netty.worker_count"), Setting.Property.NodeScope, Setting.Property.Shared);
        NETTY_MAX_CUMULATION_BUFFER_CAPACITY = Setting.byteSizeSetting("transport.netty.max_cumulation_buffer_capacity", new ByteSizeValue(-1L), Setting.Property.NodeScope, Setting.Property.Shared);
        NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS = Setting.intSetting("transport.netty.max_composite_buffer_components", -1, -1, Setting.Property.NodeScope, Setting.Property.Shared);
        NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting("transport.netty.receive_predictor_size", settings -> {
            long defaultReceiverPredictor = 524288L;
            if (JvmInfo.jvmInfo().getMem().getDirectMemoryMax().getBytes() > 0L) {
                long l = (long)(0.3 * (double)JvmInfo.jvmInfo().getMem().getDirectMemoryMax().getBytes() / (double)WORKER_COUNT.get((Settings)settings).intValue());
                defaultReceiverPredictor = Math.min(defaultReceiverPredictor, Math.max(l, 65536L));
            }
            return new ByteSizeValue(defaultReceiverPredictor).toString();
        }, Setting.Property.NodeScope, Setting.Property.Shared);
        NETTY_RECEIVE_PREDICTOR_MIN = Setting.byteSizeSetting("transport.netty.receive_predictor_min", NETTY_RECEIVE_PREDICTOR_SIZE, Setting.Property.NodeScope, Setting.Property.Shared);
        NETTY_RECEIVE_PREDICTOR_MAX = Setting.byteSizeSetting("transport.netty.receive_predictor_max", NETTY_RECEIVE_PREDICTOR_SIZE, Setting.Property.NodeScope, Setting.Property.Shared);
        NETTY_BOSS_COUNT = Setting.intSetting("transport.netty.boss_count", 1, 1, Setting.Property.NodeScope, Setting.Property.Shared);
    }

    protected class ServerChannelInitializer
    extends ChannelInitializer<Channel> {
        protected final String name;
        protected final Settings settings;

        protected ServerChannelInitializer(String name, Settings settings) {
            this.name = name;
            this.settings = settings;
        }

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast("open_channels", (ChannelHandler)Netty4Transport.this.serverOpenChannels);
            ch.pipeline().addLast("size", (ChannelHandler)new Netty4SizeHeaderFrameDecoder());
            ch.pipeline().addLast("dispatcher", (ChannelHandler)new Netty4MessageChannelHandler(Netty4Transport.this, this.name));
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Netty4Utils.maybeDie(cause);
            super.exceptionCaught(ctx, cause);
        }
    }

    protected class ClientChannelInitializer
    extends ChannelInitializer<Channel> {
        protected ClientChannelInitializer() {
        }

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast("size", (ChannelHandler)new Netty4SizeHeaderFrameDecoder());
            ch.pipeline().addLast("dispatcher", (ChannelHandler)new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Netty4Utils.maybeDie(cause);
            super.exceptionCaught(ctx, cause);
        }
    }

    private class ChannelCloseListener
    implements ChannelFutureListener {
        private final DiscoveryNode node;

        private ChannelCloseListener(DiscoveryNode node) {
            this.node = node;
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Netty4Transport.this.onChannelClosed(future.channel());
            TcpTransport.NodeChannels nodeChannels = (TcpTransport.NodeChannels)Netty4Transport.this.connectedNodes.get(this.node);
            if (nodeChannels != null && nodeChannels.hasChannel(future.channel())) {
                Netty4Transport.this.threadPool.generic().execute(() -> Netty4Transport.this.disconnectFromNode(this.node, future.channel(), "channel closed event"));
            }
        }
    }
}

