/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.messaging.impl;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.ChannelPool;
import io.atomix.cluster.messaging.impl.ClientConnection;
import io.atomix.cluster.messaging.impl.Connection;
import io.atomix.cluster.messaging.impl.HandlerRegistry;
import io.atomix.cluster.messaging.impl.LocalClientConnection;
import io.atomix.cluster.messaging.impl.MessagingProtocol;
import io.atomix.cluster.messaging.impl.ProtocolMessage;
import io.atomix.cluster.messaging.impl.ProtocolReply;
import io.atomix.cluster.messaging.impl.ProtocolRequest;
import io.atomix.cluster.messaging.impl.ProtocolVersion;
import io.atomix.cluster.messaging.impl.RemoteClientConnection;
import io.atomix.cluster.messaging.impl.RemoteServerConnection;
import io.atomix.utils.AtomixRuntimeException;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.OrderedFuture;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.net.Address;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.security.KeyStore;
import java.security.MessageDigest;
import java.security.cert.Certificate;
import java.time.Duration;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyMessagingService
implements ManagedMessagingService {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final Address returnAddress;
    private final int preamble;
    private final MessagingConfig config;
    private final ProtocolVersion protocolVersion;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final HandlerRegistry handlers = new HandlerRegistry();
    private volatile LocalClientConnection localConnection;
    private final Map<Channel, RemoteClientConnection> connections = Maps.newConcurrentMap();
    private final AtomicLong messageIdGenerator = new AtomicLong(0L);
    private final ChannelPool channelPool;
    private EventLoopGroup serverGroup;
    private EventLoopGroup clientGroup;
    private Class<? extends ServerChannel> serverChannelClass;
    private Class<? extends Channel> clientChannelClass;
    private ScheduledExecutorService timeoutExecutor;
    private Channel serverChannel;
    protected boolean enableNettyTls;
    protected TrustManagerFactory trustManager;
    protected KeyManagerFactory keyManager;

    public NettyMessagingService(String cluster, Address address, MessagingConfig config) {
        this(cluster, address, config, ProtocolVersion.latest());
    }

    NettyMessagingService(String cluster, Address address, MessagingConfig config, ProtocolVersion protocolVersion) {
        this.preamble = cluster.hashCode();
        this.returnAddress = address;
        this.config = config;
        this.protocolVersion = protocolVersion;
        this.channelPool = new ChannelPool(this::openChannel, config.getConnectionPoolSize());
    }

    @Override
    public Address address() {
        return this.returnAddress;
    }

    public CompletableFuture<MessagingService> start() {
        if (this.started.get()) {
            this.log.warn("Already running at local address: {}", (Object)this.returnAddress);
            return CompletableFuture.completedFuture(this);
        }
        this.enableNettyTls = this.loadKeyStores();
        this.initEventLoopGroup();
        return ((CompletableFuture)this.bootstrapServer().thenRun(() -> {
            this.timeoutExecutor = Executors.newScheduledThreadPool(4, Threads.namedThreads((String)"netty-messaging-timeout-%d", (Logger)this.log));
            this.localConnection = new LocalClientConnection(this.timeoutExecutor, this.handlers);
            this.started.set(true);
            this.log.info("Started");
        })).thenApply(v -> this);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    private boolean loadKeyStores() {
        KeyManagerFactory kmf;
        TrustManagerFactory tmf;
        if (!this.config.getTlsConfig().isEnabled()) {
            return false;
        }
        try {
            String ksLocation = this.config.getTlsConfig().getKeyStore();
            String tsLocation = this.config.getTlsConfig().getTrustStore();
            char[] ksPwd = this.config.getTlsConfig().getKeyStorePassword().toCharArray();
            char[] tsPwd = this.config.getTlsConfig().getTrustStorePassword().toCharArray();
            tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType());
            try (FileInputStream fileInputStream = new FileInputStream(tsLocation);){
                ts.load(fileInputStream, tsPwd);
            }
            tmf.init(ts);
            kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
            try (FileInputStream fileInputStream = new FileInputStream(ksLocation);){
                ks.load(fileInputStream, ksPwd);
            }
            kmf.init(ks, ksPwd);
            if (this.log.isInfoEnabled()) {
                this.logKeyStore(ks, ksLocation);
            }
        }
        catch (FileNotFoundException e) {
            throw new AtomixRuntimeException("Could not load cluster keystore: {}", new Object[]{e.getMessage()});
        }
        catch (Exception e) {
            throw new AtomixRuntimeException("Error loading cluster keystore", (Throwable)e);
        }
        this.trustManager = tmf;
        this.keyManager = kmf;
        return true;
    }

    private void logKeyStore(KeyStore ks, String ksLocation) {
        if (this.log.isInfoEnabled()) {
            this.log.info("Loaded cluster key store from: {}", (Object)ksLocation);
            try {
                Enumeration<String> e = ks.aliases();
                while (e.hasMoreElements()) {
                    byte[] encodedKey;
                    String alias = e.nextElement();
                    Certificate[] certs = ks.getCertificateChain(alias);
                    this.log.debug("{} -> {}", (Object)alias, (Object)certs);
                    if (certs != null && certs.length > 0) {
                        encodedKey = certs[0].getEncoded();
                    } else {
                        this.log.info("Could not find cert chain for {}, using fingerprint of key instead...", (Object)alias);
                        encodedKey = ks.getCertificate(alias).getEncoded();
                    }
                    MessageDigest digest = MessageDigest.getInstance("SHA1");
                    digest.update(encodedKey);
                    StringJoiner fingerprint = new StringJoiner(":");
                    for (byte b : digest.digest()) {
                        fingerprint.add(String.format("%02X", b));
                    }
                    this.log.info("{} -> {}", (Object)alias, (Object)fingerprint);
                }
            }
            catch (Exception e) {
                this.log.warn("Unable to print contents of key store: {}", (Object)ksLocation, (Object)e);
            }
        }
    }

    private void initEventLoopGroup() {
        try {
            this.clientGroup = new EpollEventLoopGroup(0, Threads.namedThreads((String)"netty-messaging-event-epoll-client-%d", (Logger)this.log));
            this.serverGroup = new EpollEventLoopGroup(0, Threads.namedThreads((String)"netty-messaging-event-epoll-server-%d", (Logger)this.log));
            this.serverChannelClass = EpollServerSocketChannel.class;
            this.clientChannelClass = EpollSocketChannel.class;
            return;
        }
        catch (Throwable e) {
            this.log.debug("Failed to initialize native (epoll) transport. Reason: {}. Proceeding with nio.", (Object)e.getMessage());
            this.clientGroup = new NioEventLoopGroup(0, Threads.namedThreads((String)"netty-messaging-event-nio-client-%d", (Logger)this.log));
            this.serverGroup = new NioEventLoopGroup(0, Threads.namedThreads((String)"netty-messaging-event-nio-server-%d", (Logger)this.log));
            this.serverChannelClass = NioServerSocketChannel.class;
            this.clientChannelClass = NioSocketChannel.class;
            return;
        }
    }

    @Override
    public CompletableFuture<Void> sendAsync(Address address, String type, byte[] payload, boolean keepAlive) {
        long messageId = this.messageIdGenerator.incrementAndGet();
        ProtocolRequest message = new ProtocolRequest(messageId, this.returnAddress, type, payload);
        return this.executeOnPooledConnection(address, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
    }

    @Override
    public CompletableFuture<byte[]> sendAndReceive(Address address, String type, byte[] payload, boolean keepAlive) {
        return this.sendAndReceive(address, type, payload, keepAlive, null, MoreExecutors.directExecutor());
    }

    @Override
    public CompletableFuture<byte[]> sendAndReceive(Address address, String type, byte[] payload, boolean keepAlive, Executor executor) {
        return this.sendAndReceive(address, type, payload, keepAlive, null, executor);
    }

    @Override
    public CompletableFuture<byte[]> sendAndReceive(Address address, String type, byte[] payload, boolean keepAlive, Duration timeout) {
        return this.sendAndReceive(address, type, payload, keepAlive, timeout, MoreExecutors.directExecutor());
    }

    @Override
    public CompletableFuture<byte[]> sendAndReceive(Address address, String type, byte[] payload, boolean keepAlive, Duration timeout, Executor executor) {
        long messageId = this.messageIdGenerator.incrementAndGet();
        ProtocolRequest message = new ProtocolRequest(messageId, this.returnAddress, type, payload);
        if (keepAlive) {
            return this.executeOnPooledConnection(address, type, c -> c.sendAndReceive(message, timeout), executor);
        }
        return this.executeOnTransientConnection(address, c -> c.sendAndReceive(message, timeout), executor);
    }

    private <T> CompletableFuture<T> executeOnPooledConnection(Address address, String type, Function<ClientConnection, CompletableFuture<T>> callback, Executor executor) {
        CompletableFuture future = new CompletableFuture();
        this.executeOnPooledConnection(address, type, callback, executor, future);
        return future;
    }

    private <T> void executeOnPooledConnection(Address address, String type, Function<ClientConnection, CompletableFuture<T>> callback, Executor executor, CompletableFuture<T> future) {
        if (address.equals((Object)this.returnAddress)) {
            callback.apply(this.localConnection).whenComplete((result, error) -> {
                if (error == null) {
                    executor.execute(() -> future.complete(result));
                } else {
                    executor.execute(() -> future.completeExceptionally((Throwable)error));
                }
            });
            return;
        }
        this.channelPool.getChannel(address, type).whenComplete((channel, channelError) -> {
            if (channelError == null) {
                RemoteClientConnection connection = this.getOrCreateClientConnection((Channel)channel);
                ((CompletableFuture)callback.apply(connection)).whenComplete((result, sendError) -> {
                    if (sendError == null) {
                        executor.execute(() -> future.complete(result));
                    } else {
                        Throwable cause = Throwables.getRootCause((Throwable)sendError);
                        if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
                            channel.close().addListener(f -> {
                                this.log.debug("Closing connection to {}", (Object)channel.remoteAddress());
                                connection.close();
                                this.connections.remove(channel);
                            });
                        }
                        executor.execute(() -> future.completeExceptionally((Throwable)sendError));
                    }
                });
            } else {
                executor.execute(() -> future.completeExceptionally((Throwable)channelError));
            }
        });
    }

    private <T> CompletableFuture<T> executeOnTransientConnection(Address address, Function<ClientConnection, CompletableFuture<T>> callback, Executor executor) {
        CompletableFuture future = new CompletableFuture();
        if (address.equals((Object)this.returnAddress)) {
            callback.apply(this.localConnection).whenComplete((result, error) -> {
                if (error == null) {
                    executor.execute(() -> future.complete(result));
                } else {
                    executor.execute(() -> future.completeExceptionally((Throwable)error));
                }
            });
            return future;
        }
        this.openChannel(address).whenComplete((channel, channelError) -> {
            if (channelError == null) {
                ((CompletableFuture)callback.apply(this.getOrCreateClientConnection((Channel)channel))).whenComplete((result, sendError) -> {
                    if (sendError == null) {
                        executor.execute(() -> future.complete(result));
                    } else {
                        executor.execute(() -> future.completeExceptionally((Throwable)sendError));
                    }
                    channel.close();
                });
            } else {
                executor.execute(() -> future.completeExceptionally((Throwable)channelError));
            }
        });
        return future;
    }

    private RemoteClientConnection getOrCreateClientConnection(Channel channel) {
        RemoteClientConnection connection = this.connections.get(channel);
        if (connection == null) {
            connection = this.connections.computeIfAbsent(channel, c -> new RemoteClientConnection(this.timeoutExecutor, (Channel)c));
            channel.closeFuture().addListener(f -> {
                RemoteClientConnection removedConnection = this.connections.remove(channel);
                if (removedConnection != null) {
                    removedConnection.close();
                }
            });
        }
        return connection;
    }

    @Override
    public void registerHandler(String type, BiConsumer<Address, byte[]> handler, Executor executor) {
        this.handlers.register(type, (message, connection) -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
    }

    @Override
    public void registerHandler(String type, BiFunction<Address, byte[], byte[]> handler, Executor executor) {
        this.handlers.register(type, (message, connection) -> executor.execute(() -> {
            byte[] responsePayload = null;
            ProtocolReply.Status status = ProtocolReply.Status.OK;
            try {
                responsePayload = (byte[])handler.apply(message.sender(), message.payload());
            }
            catch (Exception e) {
                this.log.warn("An error occurred in a message handler: {}", (Throwable)e);
                status = ProtocolReply.Status.ERROR_HANDLER_EXCEPTION;
            }
            connection.reply((ProtocolRequest)message, status, Optional.ofNullable(responsePayload));
        }));
    }

    @Override
    public void registerHandler(String type, BiFunction<Address, byte[], CompletableFuture<byte[]>> handler) {
        this.handlers.register(type, (message, connection) -> ((CompletableFuture)handler.apply(message.sender(), message.payload())).whenComplete((result, error) -> {
            ProtocolReply.Status status;
            if (error == null) {
                status = ProtocolReply.Status.OK;
            } else {
                this.log.warn("An error occurred in a message handler: {}", error);
                status = ProtocolReply.Status.ERROR_HANDLER_EXCEPTION;
            }
            connection.reply((ProtocolRequest)message, status, Optional.ofNullable(result));
        }));
    }

    @Override
    public void unregisterHandler(String type) {
        this.handlers.unregister(type);
    }

    private CompletableFuture<Channel> openChannel(Address address) {
        return this.bootstrapClient(address);
    }

    private CompletableFuture<Channel> bootstrapClient(Address address) {
        OrderedFuture future = new OrderedFuture();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(327680, 655360));
        bootstrap.option(ChannelOption.SO_RCVBUF, (Object)0x100000);
        bootstrap.option(ChannelOption.SO_SNDBUF, (Object)0x100000);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        bootstrap.option(ChannelOption.TCP_NODELAY, (Object)true);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)1000);
        bootstrap.group(this.clientGroup);
        bootstrap.channel(this.clientChannelClass);
        bootstrap.remoteAddress(address.address(true), address.port());
        if (this.enableNettyTls) {
            try {
                bootstrap.handler((ChannelHandler)new SslClientChannelInitializer((CompletableFuture<Channel>)future, address));
            }
            catch (SSLException e) {
                return Futures.exceptionalFuture((Throwable)e);
            }
        } else {
            bootstrap.handler((ChannelHandler)new BasicClientChannelInitializer((CompletableFuture<Channel>)future));
        }
        bootstrap.connect().addListener(arg_0 -> NettyMessagingService.lambda$bootstrapClient$30((CompletableFuture)future, arg_0));
        return future;
    }

    private CompletableFuture<Void> bootstrapServer() {
        ServerBootstrap b = new ServerBootstrap();
        b.option(ChannelOption.SO_REUSEADDR, (Object)true);
        b.option(ChannelOption.SO_BACKLOG, (Object)128);
        b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(8192, 32768));
        b.childOption(ChannelOption.SO_RCVBUF, (Object)0x100000);
        b.childOption(ChannelOption.SO_SNDBUF, (Object)0x100000);
        b.childOption(ChannelOption.SO_KEEPALIVE, (Object)true);
        b.childOption(ChannelOption.TCP_NODELAY, (Object)true);
        b.childOption(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        b.group(this.serverGroup, this.clientGroup);
        b.channel(this.serverChannelClass);
        if (this.enableNettyTls) {
            try {
                b.childHandler((ChannelHandler)new SslServerChannelInitializer());
            }
            catch (SSLException e) {
                return Futures.exceptionalFuture((Throwable)e);
            }
        } else {
            b.childHandler((ChannelHandler)new BasicServerChannelInitializer());
        }
        return this.bind(b);
    }

    private CompletableFuture<Void> bind(ServerBootstrap bootstrap) {
        int port;
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        int n = port = this.config.getPort() != null ? this.config.getPort().intValue() : this.returnAddress.port();
        if (this.config.getInterfaces().isEmpty()) {
            this.bind(bootstrap, Lists.newArrayList((Object[])new String[]{"0.0.0.0"}).iterator(), port, future);
        } else {
            this.bind(bootstrap, this.config.getInterfaces().iterator(), port, future);
        }
        return future;
    }

    private void bind(ServerBootstrap bootstrap, Iterator<String> ifaces, int port, CompletableFuture<Void> future) {
        if (ifaces.hasNext()) {
            String iface = ifaces.next();
            bootstrap.bind(iface, port).addListener((GenericFutureListener)((ChannelFutureListener)f -> {
                if (f.isSuccess()) {
                    this.log.info("TCP server listening for connections on {}:{}", (Object)iface, (Object)port);
                    this.serverChannel = f.channel();
                    this.bind(bootstrap, ifaces, port, future);
                } else {
                    this.log.warn("Failed to bind TCP server to port {}:{} due to {}", new Object[]{iface, port, f.cause()});
                    future.completeExceptionally(f.cause());
                }
            }));
        } else {
            future.complete(null);
        }
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            return CompletableFuture.supplyAsync(() -> {
                boolean interrupted = false;
                try {
                    try {
                        this.serverChannel.close().sync();
                    }
                    catch (InterruptedException e) {
                        interrupted = true;
                    }
                    Future serverShutdownFuture = this.serverGroup.shutdownGracefully();
                    Future clientShutdownFuture = this.clientGroup.shutdownGracefully();
                    try {
                        serverShutdownFuture.sync();
                    }
                    catch (InterruptedException e) {
                        interrupted = true;
                    }
                    try {
                        clientShutdownFuture.sync();
                    }
                    catch (InterruptedException e) {
                        interrupted = true;
                    }
                    this.timeoutExecutor.shutdown();
                }
                finally {
                    this.log.info("Stopped");
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
                return null;
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    private static /* synthetic */ void lambda$bootstrapClient$30(CompletableFuture future, Future f) throws Exception {
        if (!f.isSuccess()) {
            future.completeExceptionally(f.cause());
        }
    }

    private class SslClientChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final CompletableFuture<Channel> future;
        private final Address address;
        private final SslContext sslContext;

        SslClientChannelInitializer(CompletableFuture<Channel> future, Address address) throws SSLException {
            this.future = future;
            this.address = address;
            this.sslContext = SslContextBuilder.forClient().keyManager(NettyMessagingService.this.keyManager).trustManager(NettyMessagingService.this.trustManager).build();
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast("ssl", (ChannelHandler)this.sslContext.newHandler(channel.alloc(), this.address.host(), this.address.port())).addLast("handshake", (ChannelHandler)new ClientHandshakeHandlerAdapter(this.future));
        }
    }

    private class BasicClientChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final CompletableFuture<Channel> future;

        BasicClientChannelInitializer(CompletableFuture<Channel> future) {
            this.future = future;
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast("handshake", (ChannelHandler)new ClientHandshakeHandlerAdapter(this.future));
        }
    }

    private class SslServerChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final SslContext sslContext;

        private SslServerChannelInitializer() throws SSLException {
            this.sslContext = SslContextBuilder.forServer((KeyManagerFactory)NettyMessagingService.this.keyManager).clientAuth(ClientAuth.REQUIRE).trustManager(NettyMessagingService.this.trustManager).build();
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast("ssl", (ChannelHandler)this.sslContext.newHandler(channel.alloc())).addLast("handshake", (ChannelHandler)new ServerHandshakeHandlerAdapter());
        }
    }

    private class BasicServerChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private BasicServerChannelInitializer() {
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast("handshake", (ChannelHandler)new ServerHandshakeHandlerAdapter());
        }
    }

    private class MessageDispatcher<M extends ProtocolMessage>
    extends SimpleChannelInboundHandler<Object> {
        private final Connection<M> connection;

        MessageDispatcher(Connection<M> connection) {
            this.connection = connection;
        }

        protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
            try {
                this.connection.dispatch((ProtocolMessage)message);
            }
            catch (RejectedExecutionException e) {
                NettyMessagingService.this.log.warn("Unable to dispatch message due to {}", (Object)e.getMessage());
            }
        }

        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
            NettyMessagingService.this.log.error("Exception inside channel handling pipeline", cause);
            this.connection.close();
            context.close();
        }

        public void channelInactive(ChannelHandlerContext context) throws Exception {
            this.connection.close();
            context.close();
        }

        public boolean acceptInboundMessage(Object msg) {
            return msg instanceof ProtocolMessage;
        }
    }

    private class ServerHandshakeHandlerAdapter
    extends HandshakeHandlerAdapter<ProtocolRequest> {
        private ServerHandshakeHandlerAdapter() {
        }

        public void channelRead(ChannelHandlerContext context, Object message) throws Exception {
            this.readProtocolVersion(context, (ByteBuf)message).ifPresent(version -> {
                ProtocolVersion protocolVersion = ProtocolVersion.valueOf(version);
                if (protocolVersion == null) {
                    protocolVersion = ProtocolVersion.latest();
                }
                this.writeProtocolVersion(context, protocolVersion);
                this.activateProtocolVersion(context, new RemoteServerConnection(NettyMessagingService.this.handlers, context.channel()), protocolVersion);
            });
        }

        @Override
        void activateProtocolVersion(ChannelHandlerContext context, Connection<ProtocolRequest> connection, ProtocolVersion protocolVersion) {
            NettyMessagingService.this.log.debug("Activating server protocol version {} for connection to {}", (Object)protocolVersion, (Object)context.channel().remoteAddress());
            super.activateProtocolVersion(context, connection, protocolVersion);
        }
    }

    private class ClientHandshakeHandlerAdapter
    extends HandshakeHandlerAdapter<ProtocolReply> {
        private final CompletableFuture<Channel> future;

        ClientHandshakeHandlerAdapter(CompletableFuture<Channel> future) {
            this.future = future;
        }

        public void channelActive(ChannelHandlerContext context) throws Exception {
            NettyMessagingService.this.log.debug("Writing client protocol version {} for connection to {}", (Object)NettyMessagingService.this.protocolVersion, (Object)context.channel().remoteAddress());
            this.writeProtocolVersion(context, NettyMessagingService.this.protocolVersion);
        }

        public void channelRead(ChannelHandlerContext context, Object message) throws Exception {
            this.readProtocolVersion(context, (ByteBuf)message).ifPresent(version -> {
                ProtocolVersion protocolVersion = ProtocolVersion.valueOf(version);
                if (protocolVersion != null) {
                    this.activateProtocolVersion(context, NettyMessagingService.this.getOrCreateClientConnection(context.channel()), protocolVersion);
                } else {
                    NettyMessagingService.this.log.error("Failed to negotiate protocol version");
                    context.close();
                }
            });
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            this.future.completeExceptionally(cause);
        }

        @Override
        void activateProtocolVersion(ChannelHandlerContext context, Connection<ProtocolReply> connection, ProtocolVersion protocolVersion) {
            NettyMessagingService.this.log.debug("Activating client protocol version {} for connection to {}", (Object)protocolVersion, (Object)context.channel().remoteAddress());
            super.activateProtocolVersion(context, connection, protocolVersion);
            this.future.complete(context.channel());
        }
    }

    private abstract class HandshakeHandlerAdapter<M extends ProtocolMessage>
    extends ChannelInboundHandlerAdapter {
        private HandshakeHandlerAdapter() {
        }

        void writeProtocolVersion(ChannelHandlerContext context, ProtocolVersion version) {
            ByteBuf buffer = context.alloc().buffer(6);
            buffer.writeInt(NettyMessagingService.this.preamble);
            buffer.writeShort((int)version.version());
            context.writeAndFlush((Object)buffer);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        OptionalInt readProtocolVersion(ChannelHandlerContext context, ByteBuf buffer) {
            try {
                int preamble = buffer.readInt();
                if (preamble != NettyMessagingService.this.preamble) {
                    NettyMessagingService.this.log.warn("Received invalid handshake, closing connection");
                    context.close();
                    OptionalInt optionalInt = OptionalInt.empty();
                    return optionalInt;
                }
                OptionalInt optionalInt = OptionalInt.of(buffer.readShort());
                return optionalInt;
            }
            finally {
                buffer.release();
            }
        }

        void activateProtocolVersion(ChannelHandlerContext context, Connection<M> connection, ProtocolVersion protocolVersion) {
            MessagingProtocol protocol = protocolVersion.createProtocol(NettyMessagingService.this.returnAddress);
            context.pipeline().remove((ChannelHandler)this);
            context.pipeline().addLast("encoder", protocol.newEncoder());
            context.pipeline().addLast("decoder", (ChannelHandler)protocol.newDecoder());
            context.pipeline().addLast("handler", new MessageDispatcher<M>(connection));
        }
    }
}

