/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.cert.Certificate;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.cassandra.auth.IInternodeAuthenticator;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.AcceptVersions;
import org.apache.cassandra.net.AsyncChannelPromise;
import org.apache.cassandra.net.BufferPoolAllocator;
import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.FrameDecoder;
import org.apache.cassandra.net.FrameDecoderCrc;
import org.apache.cassandra.net.FrameDecoderLZ4;
import org.apache.cassandra.net.FrameDecoderUnprotected;
import org.apache.cassandra.net.FrameDecoderWith8bHeader;
import org.apache.cassandra.net.GlobalBufferPoolAllocator;
import org.apache.cassandra.net.HandshakeProtocol;
import org.apache.cassandra.net.InboundConnectionSettings;
import org.apache.cassandra.net.InboundMessageHandler;
import org.apache.cassandra.net.InternodeConnectionUtils;
import org.apache.cassandra.net.LocalBufferPoolAllocator;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.net.SocketFactory;
import org.apache.cassandra.security.ISslContextFactory;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.streaming.StreamDeserializingTask;
import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.streaming.async.NettyStreamingChannel;
import org.apache.cassandra.utils.memory.BufferPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InboundConnectionInitiator {
    private static final Logger logger = LoggerFactory.getLogger(InboundConnectionInitiator.class);

    private static ChannelFuture bind(Initializer initializer) throws ConfigurationException {
        InetAddressAndPort bind;
        ChannelFuture channelFuture;
        logger.info("Listening on {}", (Object)initializer.settings);
        ServerBootstrap bootstrap = ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)initializer.settings.socketFactory.newServerBootstrap().option(ChannelOption.SO_BACKLOG, (Object)512)).option(ChannelOption.ALLOCATOR, (Object)GlobalBufferPoolAllocator.instance)).option(ChannelOption.SO_REUSEADDR, (Object)true)).childHandler((ChannelHandler)initializer);
        int socketReceiveBufferSizeInBytes = initializer.settings.socketReceiveBufferSizeInBytes;
        if (socketReceiveBufferSizeInBytes > 0) {
            bootstrap.childOption(ChannelOption.SO_RCVBUF, (Object)socketReceiveBufferSizeInBytes);
        }
        if (!(channelFuture = bootstrap.bind((SocketAddress)new InetSocketAddress((bind = initializer.settings.bindAddress).getAddress(), bind.getPort()))).awaitUninterruptibly().isSuccess()) {
            if (channelFuture.channel().isOpen()) {
                channelFuture.channel().close();
            }
            Throwable failedChannelCause = channelFuture.cause();
            String causeString = "";
            if (failedChannelCause != null && failedChannelCause.getMessage() != null) {
                causeString = failedChannelCause.getMessage();
            }
            if (causeString.contains("in use")) {
                throw new ConfigurationException(bind + " is in use by another process.  Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
            }
            if (causeString.contains("cannot assign requested address")) {
                throw new ConfigurationException("Unable to bind to address " + bind + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
            }
            throw new ConfigurationException("failed to bind to: " + bind, failedChannelCause);
        }
        return channelFuture;
    }

    public static ChannelFuture bind(InboundConnectionSettings settings, ChannelGroup channelGroup, Consumer<ChannelPipeline> pipelineInjector) {
        return InboundConnectionInitiator.bind(new Initializer(settings, channelGroup, pipelineInjector));
    }

    private static SslHandler getSslHandler(String description, Channel channel, EncryptionOptions.ServerEncryptionOptions encryptionOptions) throws IOException {
        boolean verifyPeerCertificate = true;
        SslContext sslContext = SSLFactory.getOrCreateSslContext(encryptionOptions, true, ISslContextFactory.SocketType.SERVER);
        InetSocketAddress peer = encryptionOptions.require_endpoint_verification ? (InetSocketAddress)channel.remoteAddress() : null;
        SslHandler sslHandler = SocketFactory.newSslHandler(channel, sslContext, peer);
        logger.trace("{} inbound netty SslContext: context={}, engine={}", new Object[]{description, sslContext.getClass().getName(), sslHandler.engine().getClass().getName()});
        return sslHandler;
    }

    private static class RejectSslHandler
    extends ByteToMessageDecoder {
        private RejectSslHandler() {
        }

        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            if (in.readableBytes() < 5) {
                return;
            }
            if (SslHandler.isEncrypted((ByteBuf)in)) {
                logger.info("Rejected incoming TLS connection before negotiating from {} to {}. TLS is explicitly disabled by configuration.", (Object)ctx.channel().remoteAddress(), (Object)ctx.channel().localAddress());
                in.readBytes(in.readableBytes());
                ctx.close();
            } else {
                ctx.pipeline().remove((ChannelHandler)this);
            }
        }
    }

    private static class OptionalSslHandler
    extends ByteToMessageDecoder {
        private final EncryptionOptions.ServerEncryptionOptions encryptionOptions;

        OptionalSslHandler(EncryptionOptions.ServerEncryptionOptions encryptionOptions) {
            this.encryptionOptions = encryptionOptions;
        }

        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() < 5) {
                return;
            }
            if (SslHandler.isEncrypted((ByteBuf)in)) {
                SslHandler sslHandler = InboundConnectionInitiator.getSslHandler("replacing optional", ctx.channel(), this.encryptionOptions);
                ctx.pipeline().replace((ChannelHandler)this, InternodeConnectionUtils.SSL_HANDLER_NAME, (ChannelHandler)sslHandler);
            } else {
                ctx.pipeline().remove((ChannelHandler)this);
            }
        }
    }

    static class Handler
    extends ByteToMessageDecoder {
        private final InboundConnectionSettings settings;
        private HandshakeProtocol.Initiate initiate;
        private java.util.concurrent.Future<?> handshakeTimeout;

        Handler(InboundConnectionSettings settings) {
            this.settings = settings;
        }

        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            this.handshakeTimeout = ctx.executor().schedule(() -> {
                logger.error("Timeout handshaking with {} (on {})", (Object)SocketFactory.addressId(this.initiate.from, (InetSocketAddress)ctx.channel().remoteAddress()), (Object)this.settings.bindAddress);
                this.failHandshake(ctx);
            }, HandshakeProtocol.TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        }

        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (this.initiate != null) {
                throw new IllegalStateException("Should no longer be on pipeline");
            }
            this.initiate(ctx, in);
        }

        void initiate(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
            this.initiate = HandshakeProtocol.Initiate.maybeDecode(in);
            if (this.initiate == null) {
                return;
            }
            logger.trace("Received handshake initiation message from peer {}, message = {}", (Object)ctx.channel().remoteAddress(), (Object)this.initiate);
            if (this.isEncryptionRequired(this.initiate.from) && !this.isChannelEncrypted(ctx)) {
                logger.warn("peer {} attempted to establish an unencrypted connection (broadcast address {})", (Object)ctx.channel().remoteAddress(), (Object)this.initiate.from);
                this.failHandshake(ctx);
                return;
            }
            assert (this.initiate.acceptVersions != null);
            logger.trace("Connection version {} (min {}) from {}", new Object[]{this.initiate.acceptVersions.max, this.initiate.acceptVersions.min, this.initiate.from});
            AcceptVersions accept = this.initiate.type.isStreaming() ? this.settings.acceptStreaming : this.settings.acceptMessaging;
            int useMessagingVersion = Math.max(accept.min, Math.min(accept.max, this.initiate.acceptVersions.max));
            ByteBuf flush = new HandshakeProtocol.Accept(useMessagingVersion, accept.max).encode(ctx.alloc());
            AsyncChannelPromise.writeAndFlush(ctx, (Object)flush, (GenericFutureListener<? extends Future<? super Void>>)((ChannelFutureListener)future -> {
                if (!future.isSuccess()) {
                    this.exceptionCaught(future.channel(), future.cause());
                }
            }));
            if (this.initiate.acceptVersions.min > accept.max) {
                logger.info("peer {} only supports messaging versions higher ({}) than this node supports ({})", new Object[]{ctx.channel().remoteAddress(), this.initiate.acceptVersions.min, MessagingService.current_version});
                this.failHandshake(ctx);
            } else if (this.initiate.acceptVersions.max < accept.min) {
                logger.info("peer {} only supports messaging versions lower ({}) than this node supports ({})", new Object[]{ctx.channel().remoteAddress(), this.initiate.acceptVersions.max, 12});
                this.failHandshake(ctx);
            } else if (this.initiate.type.isStreaming()) {
                this.setupStreamingPipeline(this.initiate.from, ctx);
            } else {
                this.setupMessagingPipeline(this.initiate.from, useMessagingVersion, this.initiate.acceptVersions.max, ctx.pipeline());
            }
        }

        private boolean isEncryptionRequired(InetAddressAndPort peer) {
            return !this.settings.encryption.isExplicitlyOptional() && this.settings.encryption.shouldEncrypt(peer);
        }

        private boolean isChannelEncrypted(ChannelHandlerContext ctx) {
            return ctx.pipeline().get(SslHandler.class) != null;
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            this.exceptionCaught(ctx.channel(), cause);
        }

        private void exceptionCaught(Channel channel, Throwable cause) {
            block4: {
                SocketAddress remoteAddress = channel.remoteAddress();
                boolean reportingExclusion = DatabaseDescriptor.getInternodeErrorReportingExclusions().contains(remoteAddress);
                if (reportingExclusion) {
                    logger.debug("Excluding internode exception for {}; address contained in internode_error_reporting_exclusions", (Object)remoteAddress, (Object)cause);
                } else {
                    logger.error("Failed to properly handshake with peer {}. Closing the channel.", (Object)remoteAddress, (Object)cause);
                }
                try {
                    this.failHandshake(channel);
                }
                catch (Throwable t) {
                    if (reportingExclusion) break block4;
                    logger.error("Unexpected exception in {}.exceptionCaught", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)t);
                }
            }
        }

        private void failHandshake(ChannelHandlerContext ctx) {
            this.failHandshake(ctx.channel());
        }

        private void failHandshake(Channel channel) {
            if (this.handshakeTimeout != null) {
                this.handshakeTimeout.cancel(true);
            }
            try {
                channel.pipeline().remove((ChannelHandler)this);
            }
            catch (NoSuchElementException noSuchElementException) {
            }
            finally {
                channel.close();
            }
        }

        private void setupStreamingPipeline(InetAddressAndPort from, ChannelHandlerContext ctx) {
            this.handshakeTimeout.cancel(true);
            assert (this.initiate.framing == OutboundConnectionSettings.Framing.UNPROTECTED);
            ChannelPipeline pipeline = ctx.pipeline();
            Channel channel = ctx.channel();
            if (from == null) {
                InetSocketAddress address = (InetSocketAddress)channel.remoteAddress();
                from = InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(), address.getPort());
            }
            BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
            NettyStreamingChannel streamingChannel = new NettyStreamingChannel(channel, StreamingChannel.Kind.CONTROL);
            pipeline.replace((ChannelHandler)this, "streamInbound", (ChannelHandler)streamingChannel);
            ExecutorFactory.Global.executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", from, channel.id()), new StreamDeserializingTask(null, streamingChannel, MessagingService.current_version));
            logger.info("{} streaming connection established, version = {}, framing = {}, encryption = {}", new Object[]{SocketFactory.channelId(from, (InetSocketAddress)channel.remoteAddress(), this.settings.bindAddress, (InetSocketAddress)channel.localAddress(), ConnectionType.STREAMING, channel.id().asShortText()), MessagingService.current_version, this.initiate.framing, SocketFactory.encryptionConnectionSummary(pipeline.channel())});
        }

        @VisibleForTesting
        void setupMessagingPipeline(InetAddressAndPort from, int useMessagingVersion, int maxMessagingVersion, ChannelPipeline pipeline) {
            FrameDecoderWith8bHeader frameDecoder;
            this.handshakeTimeout.cancel(true);
            MessagingService.instance().versions.set(from, maxMessagingVersion);
            BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
            BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
            if (this.initiate.type == ConnectionType.LARGE_MESSAGES) {
                allocator = new LocalBufferPoolAllocator(pipeline.channel().eventLoop());
                pipeline.channel().config().setAllocator((ByteBufAllocator)allocator);
            }
            switch (this.initiate.framing) {
                case LZ4: {
                    frameDecoder = FrameDecoderLZ4.fast(allocator);
                    break;
                }
                case CRC: {
                    frameDecoder = FrameDecoderCrc.create(allocator);
                    break;
                }
                case UNPROTECTED: {
                    frameDecoder = new FrameDecoderUnprotected(allocator);
                    break;
                }
                default: {
                    throw new AssertionError();
                }
            }
            ((FrameDecoder)frameDecoder).addLastTo(pipeline);
            InboundMessageHandler handler = this.settings.handlers.apply(from).createHandler(frameDecoder, this.initiate.type, pipeline.channel(), useMessagingVersion);
            logger.info("{} messaging connection established, version = {}, framing = {}, encryption = {}", new Object[]{handler.id(true), useMessagingVersion, this.initiate.framing, SocketFactory.encryptionConnectionSummary(pipeline.channel())});
            pipeline.addLast("deserialize", (ChannelHandler)handler);
            try {
                pipeline.remove((ChannelHandler)this);
            }
            catch (NoSuchElementException noSuchElementException) {
                // empty catch block
            }
        }
    }

    private static class ClientAuthenticationHandler
    extends ByteToMessageDecoder {
        private final IInternodeAuthenticator authenticator;

        public ClientAuthenticationHandler(IInternodeAuthenticator authenticator) {
            this.authenticator = authenticator;
        }

        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            Certificate[] certificates = InternodeConnectionUtils.certificates(channelHandlerContext.channel());
            if (!this.authenticate(channelHandlerContext.channel().remoteAddress(), certificates)) {
                logger.error("Unable to authenticate peer {} for internode authentication", (Object)channelHandlerContext.channel());
                channelHandlerContext.pipeline().replace((ChannelHandler)this, InternodeConnectionUtils.DISCARD_HANDLER_NAME, (ChannelHandler)new InternodeConnectionUtils.ByteBufDiscardHandler());
                channelHandlerContext.pipeline().close();
            } else {
                channelHandlerContext.pipeline().remove((ChannelHandler)this);
            }
        }

        private boolean authenticate(SocketAddress socketAddress, Certificate[] certificates) throws IOException {
            if (socketAddress.getClass().getSimpleName().equals("EmbeddedSocketAddress")) {
                return true;
            }
            if (!(socketAddress instanceof InetSocketAddress)) {
                throw new IOException(String.format("Unexpected SocketAddress type: %s, %s", socketAddress.getClass(), socketAddress));
            }
            InetSocketAddress addr = (InetSocketAddress)socketAddress;
            if (!this.authenticator.authenticate(addr.getAddress(), addr.getPort(), certificates, IInternodeAuthenticator.InternodeConnectionDirection.INBOUND)) {
                logger.info("Authenticate rejected inbound internode connection from {}", (Object)addr);
                return false;
            }
            return true;
        }
    }

    private static class InternodeErrorExclusionsHandler
    extends ChannelInboundHandlerAdapter {
        private InternodeErrorExclusionsHandler() {
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (DatabaseDescriptor.getInternodeErrorReportingExclusions().contains(ctx.channel().remoteAddress())) {
                logger.debug("Excluding internode exception for {}; address contained in internode_error_reporting_exclusions", (Object)ctx.channel().remoteAddress(), (Object)cause);
                return;
            }
            super.exceptionCaught(ctx, cause);
        }
    }

    private static class Initializer
    extends ChannelInitializer<SocketChannel> {
        private static final String PIPELINE_INTERNODE_ERROR_EXCLUSIONS = "Internode Error Exclusions";
        private final InboundConnectionSettings settings;
        private final ChannelGroup channelGroup;
        private final Consumer<ChannelPipeline> pipelineInjector;

        Initializer(InboundConnectionSettings settings, ChannelGroup channelGroup, Consumer<ChannelPipeline> pipelineInjector) {
            this.settings = settings;
            this.channelGroup = channelGroup;
            this.pipelineInjector = pipelineInjector;
        }

        public void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addFirst(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, (ChannelHandler)new InternodeErrorExclusionsHandler());
            this.channelGroup.add((Object)channel);
            channel.config().setOption(ChannelOption.ALLOCATOR, (Object)GlobalBufferPoolAllocator.instance);
            channel.config().setOption(ChannelOption.SO_KEEPALIVE, (Object)true);
            channel.config().setOption(ChannelOption.SO_REUSEADDR, (Object)true);
            channel.config().setOption(ChannelOption.TCP_NODELAY, (Object)true);
            ChannelPipeline pipeline = channel.pipeline();
            this.pipelineInjector.accept(pipeline);
            switch (this.settings.encryption.tlsEncryptionPolicy()) {
                case UNENCRYPTED: {
                    pipeline.addAfter(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, "rejectssl", (ChannelHandler)new RejectSslHandler());
                    break;
                }
                case OPTIONAL: {
                    pipeline.addAfter(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, InternodeConnectionUtils.SSL_HANDLER_NAME, (ChannelHandler)new OptionalSslHandler(this.settings.encryption));
                    break;
                }
                case ENCRYPTED: {
                    SslHandler sslHandler = InboundConnectionInitiator.getSslHandler("creating", (Channel)channel, this.settings.encryption);
                    pipeline.addAfter(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, InternodeConnectionUtils.SSL_HANDLER_NAME, (ChannelHandler)sslHandler);
                }
            }
            pipeline.addLast("client-authentication", (ChannelHandler)new ClientAuthenticationHandler(this.settings.authenticator));
            channel.pipeline().addLast("handshake", (ChannelHandler)new Handler(this.settings));
        }
    }
}

