/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.exceptions.IncompatibleSchemaException;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.AbstractMessageHandler;
import org.apache.cassandra.net.ChunkedInputPlus;
import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.Crc;
import org.apache.cassandra.net.FrameDecoder;
import org.apache.cassandra.net.InboundMessageCallbacks;
import org.apache.cassandra.net.InvalidSerializedSizeException;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.net.ShareableBytes;
import org.apache.cassandra.net.SocketFactory;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InboundMessageHandler
extends AbstractMessageHandler {
    private static final Logger logger = LoggerFactory.getLogger(InboundMessageHandler.class);
    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
    private static final Message.Serializer serializer = Message.serializer;
    private final ConnectionType type;
    private final InetAddressAndPort self;
    private final InetAddressAndPort peer;
    private final int version;
    private final InboundMessageCallbacks callbacks;
    private final Consumer<Message<?>> consumer;

    InboundMessageHandler(FrameDecoder decoder, ConnectionType type, Channel channel, InetAddressAndPort self, InetAddressAndPort peer, int version, int largeThreshold, long queueCapacity, ResourceLimits.Limit endpointReserveCapacity, ResourceLimits.Limit globalReserveCapacity, AbstractMessageHandler.WaitQueue endpointWaitQueue, AbstractMessageHandler.WaitQueue globalWaitQueue, AbstractMessageHandler.OnHandlerClosed onClosed, InboundMessageCallbacks callbacks, Consumer<Message<?>> consumer) {
        super(decoder, channel, largeThreshold, queueCapacity, endpointReserveCapacity, globalReserveCapacity, endpointWaitQueue, globalWaitQueue, onClosed);
        this.type = type;
        this.self = self;
        this.peer = peer;
        this.version = version;
        this.callbacks = callbacks;
        this.consumer = consumer;
    }

    @Override
    protected boolean processOneContainedMessage(ShareableBytes bytes, ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) throws IOException {
        ByteBuffer buf = bytes.get();
        long currentTimeNanos = MonotonicClock.Global.approxTime.now();
        Message.Header header = serializer.extractHeader(buf, this.peer, currentTimeNanos, this.version);
        long timeElapsed = currentTimeNanos - header.createdAtNanos;
        int size = serializer.inferMessageSize(buf, buf.position(), buf.limit(), this.version);
        if (MonotonicClock.Global.approxTime.isAfter(currentTimeNanos, header.expiresAtNanos)) {
            this.callbacks.onHeaderArrived(size, header, timeElapsed, TimeUnit.NANOSECONDS);
            this.callbacks.onArrivedExpired(size, header, false, timeElapsed, TimeUnit.NANOSECONDS);
            ++this.receivedCount;
            this.receivedBytes += (long)size;
            bytes.skipBytes(size);
            return true;
        }
        if (!this.acquireCapacity(endpointReserve, globalReserve, size, currentTimeNanos, header.expiresAtNanos)) {
            return false;
        }
        this.callbacks.onHeaderArrived(size, header, timeElapsed, TimeUnit.NANOSECONDS);
        this.callbacks.onArrived(size, header, timeElapsed, TimeUnit.NANOSECONDS);
        ++this.receivedCount;
        this.receivedBytes += (long)size;
        if (size <= this.largeThreshold) {
            this.processSmallMessage(bytes, size, header);
        } else {
            this.processLargeMessage(bytes, size, header);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processSmallMessage(ShareableBytes bytes, int size, Message.Header header) {
        ByteBuffer buf = bytes.get();
        int begin = buf.position();
        int end = buf.limit();
        buf.limit(begin + size);
        Message message = null;
        try (DataInputBuffer in = new DataInputBuffer(buf, false);){
            Message m = serializer.deserialize((DataInputPlus)in, header, this.version);
            if (in.available() > 0) {
                throw new InvalidSerializedSizeException(header.verb, size, size - in.available());
            }
            message = m;
        }
        catch (IncompatibleSchemaException e) {
            this.callbacks.onFailedDeserialize(size, header, e);
            noSpamLogger.info("{} incompatible schema encountered while deserializing a message", this, e);
        }
        catch (Throwable t) {
            JVMStabilityInspector.inspectThrowable(t);
            this.callbacks.onFailedDeserialize(size, header, t);
            logger.error("{} unexpected exception caught while deserializing a message", (Object)this.id(), (Object)t);
        }
        finally {
            if (null == message) {
                this.releaseCapacity(size);
            }
            buf.position(begin + size);
            buf.limit(end);
        }
        if (null != message) {
            this.dispatch(new ProcessSmallMessage(message, size));
        }
    }

    private void processLargeMessage(ShareableBytes bytes, int size, Message.Header header) {
        new LargeMessage(size, header, bytes.sliceAndConsume(size).share()).schedule();
    }

    @Override
    protected boolean processFirstFrameOfLargeMessage(FrameDecoder.IntactFrame frame, ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) throws IOException {
        ShareableBytes bytes = frame.contents;
        ByteBuffer buf = bytes.get();
        long currentTimeNanos = MonotonicClock.Global.approxTime.now();
        Message.Header header = serializer.extractHeader(buf, this.peer, currentTimeNanos, this.version);
        int size = serializer.inferMessageSize(buf, buf.position(), buf.limit(), this.version);
        boolean expired = MonotonicClock.Global.approxTime.isAfter(currentTimeNanos, header.expiresAtNanos);
        if (!expired && !this.acquireCapacity(endpointReserve, globalReserve, size, currentTimeNanos, header.expiresAtNanos)) {
            return false;
        }
        this.callbacks.onHeaderArrived(size, header, currentTimeNanos - header.createdAtNanos, TimeUnit.NANOSECONDS);
        this.receivedBytes += (long)buf.remaining();
        this.largeMessage = new LargeMessage(size, header, expired);
        this.largeMessage.supply(frame);
        return true;
    }

    @Override
    protected void processCorruptFrame(FrameDecoder.CorruptFrame frame) throws Crc.InvalidCrc {
        if (!frame.isRecoverable()) {
            ++this.corruptFramesUnrecovered;
            throw new Crc.InvalidCrc(frame.readCRC, frame.computedCRC);
        }
        if (frame.isSelfContained) {
            this.receivedBytes += (long)frame.frameSize;
            ++this.corruptFramesRecovered;
            noSpamLogger.warn("{} invalid, recoverable CRC mismatch detected while reading messages (corrupted self-contained frame)", this.id());
        } else {
            if (null == this.largeMessage) {
                this.receivedBytes += (long)frame.frameSize;
                ++this.corruptFramesUnrecovered;
                noSpamLogger.error("{} invalid, unrecoverable CRC mismatch detected while reading messages (corrupted first frame of a large message)", this.id());
                throw new Crc.InvalidCrc(frame.readCRC, frame.computedCRC);
            }
            this.processSubsequentFrameOfLargeMessage(frame);
            ++this.corruptFramesRecovered;
            noSpamLogger.warn("{} invalid, recoverable CRC mismatch detected while reading a large message", this.id());
        }
    }

    String id(boolean includeReal) {
        if (!includeReal) {
            return this.id();
        }
        return SocketFactory.channelId(this.peer, (InetSocketAddress)this.channel.remoteAddress(), this.self, (InetSocketAddress)this.channel.localAddress(), this.type, this.channel.id().asShortText());
    }

    @Override
    protected String id() {
        return SocketFactory.channelId(this.peer, this.self, this.type, this.channel.id().asShortText());
    }

    public String toString() {
        return this.id();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        try {
            this.fatalExceptionCaught(cause);
        }
        catch (Throwable t) {
            logger.error("Unexpected exception in {}.exceptionCaught", (Object)this.getClass().getSimpleName(), (Object)t);
        }
    }

    @Override
    protected void fatalExceptionCaught(Throwable cause) {
        this.decoder.discard();
        JVMStabilityInspector.inspectThrowable(cause);
        if (cause instanceof Message.InvalidLegacyProtocolMagic) {
            logger.error("{} invalid, unrecoverable CRC mismatch detected while reading messages - closing the connection", (Object)this.id());
        } else {
            logger.error("{} unexpected exception caught while processing inbound messages; terminating connection", (Object)this.id(), (Object)cause);
        }
        this.channel.close();
    }

    private void dispatch(ProcessMessage task) {
        Message.Header header = task.header();
        TraceState state = Tracing.instance.initializeFromMessage(header);
        if (state != null) {
            state.trace("{} message received from {}", (Object)header.verb, (Object)header.from);
        }
        this.callbacks.onDispatched(task.size(), header);
        header.verb.stage.execute(ExecutorLocals.create(state), task);
    }

    private class ProcessLargeMessage
    extends ProcessMessage {
        private final LargeMessage message;

        ProcessLargeMessage(LargeMessage message) {
            this.message = message;
        }

        @Override
        int size() {
            return this.message.size;
        }

        @Override
        Message.Header header() {
            return (Message.Header)this.message.header;
        }

        @Override
        Message provideMessage() {
            return this.message.deserialize();
        }

        @Override
        void releaseResources() {
            this.message.releaseBuffers();
        }
    }

    private class ProcessSmallMessage
    extends ProcessMessage {
        private final int size;
        private final Message message;

        ProcessSmallMessage(Message message, int size) {
            this.size = size;
            this.message = message;
        }

        @Override
        int size() {
            return this.size;
        }

        @Override
        Message.Header header() {
            return this.message.header;
        }

        @Override
        Message provideMessage() {
            return this.message;
        }
    }

    private abstract class ProcessMessage
    implements Runnable {
        private ProcessMessage() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Message.Header header = this.header();
            long currentTimeNanos = MonotonicClock.Global.approxTime.now();
            boolean expired = MonotonicClock.Global.approxTime.isAfter(currentTimeNanos, header.expiresAtNanos);
            boolean processed = false;
            try {
                InboundMessageHandler.this.callbacks.onExecuting(this.size(), header, currentTimeNanos - header.createdAtNanos, TimeUnit.NANOSECONDS);
                if (expired) {
                    InboundMessageHandler.this.callbacks.onExpired(this.size(), header, currentTimeNanos - header.createdAtNanos, TimeUnit.NANOSECONDS);
                    return;
                }
                Message message = this.provideMessage();
                if (null != message) {
                    InboundMessageHandler.this.consumer.accept(message);
                    processed = true;
                    InboundMessageHandler.this.callbacks.onProcessed(this.size(), header);
                }
            }
            finally {
                if (processed) {
                    InboundMessageHandler.this.releaseProcessedCapacity(this.size(), header);
                } else {
                    InboundMessageHandler.this.releaseCapacity(this.size());
                }
                this.releaseResources();
                InboundMessageHandler.this.callbacks.onExecuted(this.size(), header, MonotonicClock.Global.approxTime.now() - currentTimeNanos, TimeUnit.NANOSECONDS);
            }
        }

        abstract int size();

        abstract Message.Header header();

        abstract Message provideMessage();

        void releaseResources() {
        }
    }

    private class LargeMessage
    extends AbstractMessageHandler.LargeMessage<Message.Header> {
        private LargeMessage(int size, Message.Header header, boolean isExpired) {
            super(size, header, header.expiresAtNanos, isExpired);
        }

        private LargeMessage(int size, Message.Header header, ShareableBytes bytes) {
            super(size, header, header.expiresAtNanos, bytes);
        }

        private void schedule() {
            InboundMessageHandler.this.dispatch(new ProcessLargeMessage(this));
        }

        @Override
        protected void onComplete() {
            long timeElapsed = MonotonicClock.Global.approxTime.now() - ((Message.Header)this.header).createdAtNanos;
            if (!this.isExpired && !this.isCorrupt) {
                InboundMessageHandler.this.callbacks.onArrived(this.size, (Message.Header)this.header, timeElapsed, TimeUnit.NANOSECONDS);
                this.schedule();
            } else if (this.isExpired) {
                InboundMessageHandler.this.callbacks.onArrivedExpired(this.size, (Message.Header)this.header, this.isCorrupt, timeElapsed, TimeUnit.NANOSECONDS);
            } else {
                InboundMessageHandler.this.callbacks.onArrivedCorrupt(this.size, (Message.Header)this.header, timeElapsed, TimeUnit.NANOSECONDS);
            }
        }

        @Override
        protected void abort() {
            if (!this.isExpired && !this.isCorrupt) {
                this.releaseBuffersAndCapacity();
            }
            InboundMessageHandler.this.callbacks.onClosedBeforeArrival(this.size, (Message.Header)this.header, this.received, this.isCorrupt, this.isExpired);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Loose catch block
         */
        private Message deserialize() {
            try {
                Message message;
                Throwable throwable;
                ChunkedInputPlus input;
                block20: {
                    block21: {
                        input = ChunkedInputPlus.of(this.buffers);
                        throwable = null;
                        Message m = serializer.deserialize((DataInputPlus)input, (Message.Header)this.header, InboundMessageHandler.this.version);
                        int remainder = input.remainder();
                        if (remainder > 0) {
                            throw new InvalidSerializedSizeException(((Message.Header)this.header).verb, this.size, this.size - remainder);
                        }
                        message = m;
                        if (input == null) break block20;
                        if (throwable == null) break block21;
                        try {
                            input.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                        break block20;
                    }
                    input.close();
                }
                return message;
                catch (Throwable throwable3) {
                    try {
                        try {
                            throwable = throwable3;
                            throw throwable3;
                        }
                        catch (Throwable throwable4) {
                            if (input != null) {
                                if (throwable != null) {
                                    try {
                                        input.close();
                                    }
                                    catch (Throwable throwable5) {
                                        throwable.addSuppressed(throwable5);
                                    }
                                } else {
                                    input.close();
                                }
                            }
                            throw throwable4;
                        }
                    }
                    catch (IncompatibleSchemaException e) {
                        InboundMessageHandler.this.callbacks.onFailedDeserialize(this.size, (Message.Header)this.header, e);
                        noSpamLogger.info("{} incompatible schema encountered while deserializing a message", InboundMessageHandler.this, e);
                    }
                    catch (Throwable t) {
                        JVMStabilityInspector.inspectThrowable(t);
                        InboundMessageHandler.this.callbacks.onFailedDeserialize(this.size, (Message.Header)this.header, t);
                        logger.error("{} unexpected exception caught while deserializing a message", (Object)InboundMessageHandler.this.id(), (Object)t);
                    }
                }
            }
            finally {
                this.buffers.clear();
            }
            return null;
        }
    }
}

