/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.net.ssl.SSLSession;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.AcceptVersions;
import org.apache.cassandra.net.AsyncChannelPromise;
import org.apache.cassandra.net.BufferPoolAllocator;
import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.FrameDecoder;
import org.apache.cassandra.net.FrameDecoderCrc;
import org.apache.cassandra.net.FrameDecoderLZ4;
import org.apache.cassandra.net.FrameDecoderLegacy;
import org.apache.cassandra.net.FrameDecoderLegacyLZ4;
import org.apache.cassandra.net.FrameDecoderUnprotected;
import org.apache.cassandra.net.GlobalBufferPoolAllocator;
import org.apache.cassandra.net.HandshakeProtocol;
import org.apache.cassandra.net.InboundConnectionSettings;
import org.apache.cassandra.net.InboundMessageHandler;
import org.apache.cassandra.net.LocalBufferPoolAllocator;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.net.SocketFactory;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.streaming.async.StreamingInboundHandler;
import org.apache.cassandra.utils.memory.BufferPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InboundConnectionInitiator {
    private static final Logger logger = LoggerFactory.getLogger(InboundConnectionInitiator.class);

    private static ChannelFuture bind(Initializer initializer) throws ConfigurationException {
        logger.info("Listening on {}", (Object)initializer.settings);
        ServerBootstrap bootstrap = ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((Initializer)initializer).settings.socketFactory.newServerBootstrap().option(ChannelOption.SO_BACKLOG, (Object)512)).option(ChannelOption.ALLOCATOR, (Object)GlobalBufferPoolAllocator.instance)).option(ChannelOption.SO_REUSEADDR, (Object)true)).childHandler((ChannelHandler)initializer);
        int socketReceiveBufferSizeInBytes = ((Initializer)initializer).settings.socketReceiveBufferSizeInBytes;
        if (socketReceiveBufferSizeInBytes > 0) {
            bootstrap.childOption(ChannelOption.SO_RCVBUF, (Object)socketReceiveBufferSizeInBytes);
        }
        InetAddressAndPort bind = ((Initializer)initializer).settings.bindAddress;
        ChannelFuture channelFuture = bootstrap.bind((SocketAddress)new InetSocketAddress(bind.address, bind.port));
        if (!channelFuture.awaitUninterruptibly().isSuccess()) {
            if (channelFuture.channel().isOpen()) {
                channelFuture.channel().close();
            }
            Throwable failedChannelCause = channelFuture.cause();
            String causeString = "";
            if (failedChannelCause != null && failedChannelCause.getMessage() != null) {
                causeString = failedChannelCause.getMessage();
            }
            if (causeString.contains("in use")) {
                throw new ConfigurationException(bind + " is in use by another process.  Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
            }
            if (causeString.contains("annot assign requested address")) {
                throw new ConfigurationException("Unable to bind to address " + bind + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
            }
            throw new ConfigurationException("failed to bind to: " + bind, failedChannelCause);
        }
        return channelFuture;
    }

    public static ChannelFuture bind(InboundConnectionSettings settings, ChannelGroup channelGroup, Consumer<ChannelPipeline> pipelineInjector) {
        return InboundConnectionInitiator.bind(new Initializer(settings, channelGroup, pipelineInjector));
    }

    private static class OptionalSslHandler
    extends ByteToMessageDecoder {
        private final EncryptionOptions.ServerEncryptionOptions encryptionOptions;

        OptionalSslHandler(EncryptionOptions.ServerEncryptionOptions encryptionOptions) {
            this.encryptionOptions = encryptionOptions;
        }

        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() < 5) {
                return;
            }
            if (SslHandler.isEncrypted((ByteBuf)in)) {
                SslContext sslContext = SSLFactory.getOrCreateSslContext(this.encryptionOptions, true, SSLFactory.SocketType.SERVER);
                Channel channel = ctx.channel();
                InetSocketAddress peer = this.encryptionOptions.require_endpoint_verification ? (InetSocketAddress)channel.remoteAddress() : null;
                SslHandler sslHandler = SocketFactory.newSslHandler(channel, sslContext, peer);
                ctx.pipeline().replace((ChannelHandler)this, "ssl", (ChannelHandler)sslHandler);
            } else {
                ctx.pipeline().remove((ChannelHandler)this);
            }
        }
    }

    static class Handler
    extends ByteToMessageDecoder {
        private final InboundConnectionSettings settings;
        private HandshakeProtocol.Initiate initiate;
        private HandshakeProtocol.ConfirmOutboundPre40 confirmOutboundPre40;
        private java.util.concurrent.Future<?> handshakeTimeout;

        Handler(InboundConnectionSettings settings) {
            this.settings = settings;
        }

        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            this.handshakeTimeout = ctx.executor().schedule(() -> {
                logger.error("Timeout handshaking with {} (on {})", (Object)SocketFactory.addressId(this.initiate.from, (InetSocketAddress)ctx.channel().remoteAddress()), (Object)this.settings.bindAddress);
                this.failHandshake(ctx);
            }, HandshakeProtocol.TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            this.logSsl(ctx);
            this.authenticate(ctx.channel().remoteAddress());
        }

        private void authenticate(SocketAddress socketAddress) throws IOException {
            if (socketAddress.getClass().getSimpleName().equals("EmbeddedSocketAddress")) {
                return;
            }
            if (!(socketAddress instanceof InetSocketAddress)) {
                throw new IOException(String.format("Unexpected SocketAddress type: %s, %s", socketAddress.getClass(), socketAddress));
            }
            InetSocketAddress addr = (InetSocketAddress)socketAddress;
            if (!this.settings.authenticate(addr.getAddress(), addr.getPort())) {
                throw new IOException("Authentication failure for inbound connection from peer " + addr);
            }
        }

        private void logSsl(ChannelHandlerContext ctx) {
            SslHandler sslHandler = (SslHandler)ctx.pipeline().get(SslHandler.class);
            if (sslHandler != null) {
                SSLSession session = sslHandler.engine().getSession();
                logger.info("connection from peer {} to {}, protocol = {}", new Object[]{ctx.channel().remoteAddress(), ctx.channel().localAddress(), session.getProtocol()});
            }
        }

        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (this.initiate == null) {
                this.initiate(ctx, in);
            } else if (this.initiate.acceptVersions == null && this.confirmOutboundPre40 == null) {
                this.confirmPre40(ctx, in);
            } else {
                throw new IllegalStateException("Should no longer be on pipeline");
            }
        }

        void initiate(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
            this.initiate = HandshakeProtocol.Initiate.maybeDecode(in);
            if (this.initiate == null) {
                return;
            }
            logger.trace("Received handshake initiation message from peer {}, message = {}", (Object)ctx.channel().remoteAddress(), (Object)this.initiate);
            if (this.initiate.acceptVersions != null) {
                logger.trace("Connection version {} (min {}) from {}", new Object[]{this.initiate.acceptVersions.max, this.initiate.acceptVersions.min, this.initiate.from});
                AcceptVersions accept = this.initiate.type.isStreaming() ? this.settings.acceptStreaming : this.settings.acceptMessaging;
                int useMessagingVersion = Math.max(accept.min, Math.min(accept.max, this.initiate.acceptVersions.max));
                ByteBuf flush = new HandshakeProtocol.Accept(useMessagingVersion, accept.max).encode(ctx.alloc());
                AsyncChannelPromise.writeAndFlush(ctx, (Object)flush, (GenericFutureListener<? extends Future<? super Void>>)((ChannelFutureListener)future -> {
                    if (!future.isSuccess()) {
                        this.exceptionCaught(future.channel(), future.cause());
                    }
                }));
                if (this.initiate.acceptVersions.min > accept.max) {
                    logger.info("peer {} only supports messaging versions higher ({}) than this node supports ({})", new Object[]{ctx.channel().remoteAddress(), this.initiate.acceptVersions.min, 12});
                    this.failHandshake(ctx);
                } else if (this.initiate.acceptVersions.max < accept.min) {
                    logger.info("peer {} only supports messaging versions lower ({}) than this node supports ({})", new Object[]{ctx.channel().remoteAddress(), this.initiate.acceptVersions.max, 10});
                    this.failHandshake(ctx);
                } else if (this.initiate.type.isStreaming()) {
                    this.setupStreamingPipeline(this.initiate.from, ctx);
                } else {
                    this.setupMessagingPipeline(this.initiate.from, useMessagingVersion, this.initiate.acceptVersions.max, ctx.pipeline());
                }
            } else {
                int version = this.initiate.requestMessagingVersion;
                assert (version < 12 && version >= this.settings.acceptMessaging.min);
                logger.trace("Connection version {} from {}", (Object)version, (Object)ctx.channel().remoteAddress());
                if (this.initiate.type.isStreaming()) {
                    if (version != this.settings.acceptStreaming.max) {
                        logger.warn("Received stream using protocol version {} (my version {}). Terminating connection", (Object)version, (Object)this.settings.acceptStreaming.max);
                        this.failHandshake(ctx);
                    }
                    this.setupStreamingPipeline(this.initiate.from, ctx);
                } else {
                    ByteBuf response = HandshakeProtocol.Accept.respondPre40(this.settings.acceptMessaging.max, ctx.alloc());
                    AsyncChannelPromise.writeAndFlush(ctx, (Object)response, (GenericFutureListener<? extends Future<? super Void>>)((ChannelFutureListener)future -> {
                        if (!future.isSuccess()) {
                            this.exceptionCaught(future.channel(), future.cause());
                        }
                    }));
                    if (version < 10) {
                        throw new IOException(String.format("Unable to read obsolete message version %s from %s; The earliest version supported is 3.0.0", version, ctx.channel().remoteAddress()));
                    }
                }
            }
        }

        @VisibleForTesting
        void confirmPre40(ChannelHandlerContext ctx, ByteBuf in) {
            this.confirmOutboundPre40 = HandshakeProtocol.ConfirmOutboundPre40.maybeDecode(in);
            if (this.confirmOutboundPre40 == null) {
                return;
            }
            logger.trace("Received third handshake message from peer {}, message = {}", (Object)ctx.channel().remoteAddress(), (Object)this.confirmOutboundPre40);
            this.setupMessagingPipeline(this.confirmOutboundPre40.from, this.initiate.requestMessagingVersion, this.confirmOutboundPre40.maxMessagingVersion, ctx.pipeline());
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            this.exceptionCaught(ctx.channel(), cause);
        }

        private void exceptionCaught(Channel channel, Throwable cause) {
            logger.error("Failed to properly handshake with peer {}. Closing the channel.", (Object)channel.remoteAddress(), (Object)cause);
            try {
                this.failHandshake(channel);
            }
            catch (Throwable t) {
                logger.error("Unexpected exception in {}.exceptionCaught", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)t);
            }
        }

        private void failHandshake(ChannelHandlerContext ctx) {
            this.failHandshake(ctx.channel());
        }

        private void failHandshake(Channel channel) {
            channel.close();
            if (this.handshakeTimeout != null) {
                this.handshakeTimeout.cancel(true);
            }
        }

        private void setupStreamingPipeline(InetAddressAndPort from, ChannelHandlerContext ctx) {
            this.handshakeTimeout.cancel(true);
            assert (this.initiate.framing == OutboundConnectionSettings.Framing.UNPROTECTED);
            ChannelPipeline pipeline = ctx.pipeline();
            Channel channel = ctx.channel();
            if (from == null) {
                InetSocketAddress address = (InetSocketAddress)channel.remoteAddress();
                from = InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(), address.getPort());
            }
            BufferPool.setRecycleWhenFreeForCurrentThread(false);
            pipeline.replace((ChannelHandler)this, "streamInbound", (ChannelHandler)new StreamingInboundHandler(from, 12, null));
            logger.info("{} streaming connection established, version = {}, framing = {}, encryption = {}", new Object[]{SocketFactory.channelId(from, (InetSocketAddress)channel.remoteAddress(), this.settings.bindAddress, (InetSocketAddress)channel.localAddress(), ConnectionType.STREAMING, channel.id().asShortText()), 12, this.initiate.framing, pipeline.get("ssl") != null ? SocketFactory.encryptionLogStatement(pipeline.channel(), this.settings.encryption) : "disabled"});
        }

        @VisibleForTesting
        void setupMessagingPipeline(InetAddressAndPort from, int useMessagingVersion, int maxMessagingVersion, ChannelPipeline pipeline) {
            FrameDecoder frameDecoder;
            this.handshakeTimeout.cancel(true);
            MessagingService.instance().versions.set(from, maxMessagingVersion);
            BufferPool.setRecycleWhenFreeForCurrentThread(false);
            BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
            if (this.initiate.type == ConnectionType.LARGE_MESSAGES) {
                allocator = new LocalBufferPoolAllocator(pipeline.channel().eventLoop());
                pipeline.channel().config().setAllocator((ByteBufAllocator)allocator);
            }
            switch (this.initiate.framing) {
                case LZ4: {
                    if (useMessagingVersion >= 12) {
                        frameDecoder = FrameDecoderLZ4.fast(allocator);
                        break;
                    }
                    frameDecoder = new FrameDecoderLegacyLZ4(allocator, useMessagingVersion);
                    break;
                }
                case CRC: {
                    if (useMessagingVersion >= 12) {
                        frameDecoder = FrameDecoderCrc.create(allocator);
                        break;
                    }
                }
                case UNPROTECTED: {
                    if (useMessagingVersion >= 12) {
                        frameDecoder = new FrameDecoderUnprotected(allocator);
                        break;
                    }
                    frameDecoder = new FrameDecoderLegacy(allocator, useMessagingVersion);
                    break;
                }
                default: {
                    throw new AssertionError();
                }
            }
            ((FrameDecoder)frameDecoder).addLastTo(pipeline);
            InboundMessageHandler handler = this.settings.handlers.apply(from).createHandler(frameDecoder, this.initiate.type, pipeline.channel(), useMessagingVersion);
            logger.info("{} messaging connection established, version = {}, framing = {}, encryption = {}", new Object[]{handler.id(true), useMessagingVersion, this.initiate.framing, pipeline.get("ssl") != null ? SocketFactory.encryptionLogStatement(pipeline.channel(), this.settings.encryption) : "disabled"});
            pipeline.addLast("deserialize", (ChannelHandler)handler);
            pipeline.remove((ChannelHandler)this);
        }
    }

    private static class Initializer
    extends ChannelInitializer<SocketChannel> {
        private final InboundConnectionSettings settings;
        private final ChannelGroup channelGroup;
        private final Consumer<ChannelPipeline> pipelineInjector;

        Initializer(InboundConnectionSettings settings, ChannelGroup channelGroup, Consumer<ChannelPipeline> pipelineInjector) {
            this.settings = settings;
            this.channelGroup = channelGroup;
            this.pipelineInjector = pipelineInjector;
        }

        public void initChannel(SocketChannel channel) throws Exception {
            this.channelGroup.add((Object)channel);
            channel.config().setOption(ChannelOption.ALLOCATOR, (Object)GlobalBufferPoolAllocator.instance);
            channel.config().setOption(ChannelOption.SO_KEEPALIVE, (Object)true);
            channel.config().setOption(ChannelOption.SO_REUSEADDR, (Object)true);
            channel.config().setOption(ChannelOption.TCP_NODELAY, (Object)true);
            ChannelPipeline pipeline = channel.pipeline();
            this.pipelineInjector.accept(pipeline);
            if (this.settings.encryption.optional.booleanValue()) {
                pipeline.addFirst("ssl", (ChannelHandler)new OptionalSslHandler(this.settings.encryption));
            } else {
                SslContext sslContext = SSLFactory.getOrCreateSslContext(this.settings.encryption, true, SSLFactory.SocketType.SERVER);
                InetSocketAddress peer = this.settings.encryption.require_endpoint_verification ? channel.remoteAddress() : null;
                SslHandler sslHandler = SocketFactory.newSslHandler((Channel)channel, sslContext, peer);
                logger.trace("creating inbound netty SslContext: context={}, engine={}", (Object)sslContext.getClass().getName(), (Object)sslHandler.engine().getClass().getName());
                pipeline.addFirst("ssl", (ChannelHandler)sslHandler);
            }
            channel.pipeline().addLast("handshake", (ChannelHandler)new Handler(this.settings));
        }
    }
}

