/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.cassandra.net.Crc;
import org.apache.cassandra.net.FrameDecoder;
import org.apache.cassandra.net.ManyToOneConcurrentLinkedQueue;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.net.ShareableBytes;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMessageHandler
extends ChannelInboundHandlerAdapter
implements FrameDecoder.FrameProcessor {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessageHandler.class);
    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
    protected final FrameDecoder decoder;
    protected final Channel channel;
    protected final int largeThreshold;
    protected LargeMessage<?> largeMessage;
    protected final long queueCapacity;
    volatile long queueSize = 0L;
    private static final AtomicLongFieldUpdater<AbstractMessageHandler> queueSizeUpdater = AtomicLongFieldUpdater.newUpdater(AbstractMessageHandler.class, "queueSize");
    protected final ResourceLimits.Limit endpointReserveCapacity;
    protected final WaitQueue endpointWaitQueue;
    protected final ResourceLimits.Limit globalReserveCapacity;
    protected final WaitQueue globalWaitQueue;
    protected final OnHandlerClosed onClosed;
    private WaitQueue.Ticket ticket = null;
    protected long corruptFramesRecovered;
    protected long corruptFramesUnrecovered;
    protected long receivedCount;
    protected long receivedBytes;
    protected long throttledCount;
    protected long throttledNanos;
    private boolean isClosed;

    public AbstractMessageHandler(FrameDecoder decoder, Channel channel, int largeThreshold, long queueCapacity, ResourceLimits.Limit endpointReserveCapacity, ResourceLimits.Limit globalReserveCapacity, WaitQueue endpointWaitQueue, WaitQueue globalWaitQueue, OnHandlerClosed onClosed) {
        this.decoder = decoder;
        this.channel = channel;
        this.largeThreshold = largeThreshold;
        this.queueCapacity = queueCapacity;
        this.endpointReserveCapacity = endpointReserveCapacity;
        this.endpointWaitQueue = endpointWaitQueue;
        this.globalReserveCapacity = globalReserveCapacity;
        this.globalWaitQueue = globalWaitQueue;
        this.onClosed = onClosed;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        throw new IllegalStateException("InboundMessageHandler doesn't expect channelRead() to be invoked");
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.decoder.activate(this);
    }

    @Override
    public boolean process(FrameDecoder.Frame frame) throws IOException {
        if (frame instanceof FrameDecoder.IntactFrame) {
            return this.processIntactFrame((FrameDecoder.IntactFrame)frame, this.endpointReserveCapacity, this.globalReserveCapacity);
        }
        this.processCorruptFrame((FrameDecoder.CorruptFrame)frame);
        return true;
    }

    private boolean processIntactFrame(FrameDecoder.IntactFrame frame, ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) throws IOException {
        if (frame.isSelfContained) {
            return this.processFrameOfContainedMessages(frame.contents, endpointReserve, globalReserve);
        }
        if (null == this.largeMessage) {
            return this.processFirstFrameOfLargeMessage(frame, endpointReserve, globalReserve);
        }
        return this.processSubsequentFrameOfLargeMessage(frame);
    }

    private boolean processFrameOfContainedMessages(ShareableBytes bytes, ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) throws IOException {
        while (bytes.hasRemaining()) {
            if (this.processOneContainedMessage(bytes, endpointReserve, globalReserve)) continue;
            return false;
        }
        return true;
    }

    protected abstract boolean processOneContainedMessage(ShareableBytes var1, ResourceLimits.Limit var2, ResourceLimits.Limit var3) throws IOException;

    protected abstract boolean processFirstFrameOfLargeMessage(FrameDecoder.IntactFrame var1, ResourceLimits.Limit var2, ResourceLimits.Limit var3) throws IOException;

    protected boolean processSubsequentFrameOfLargeMessage(FrameDecoder.Frame frame) {
        this.receivedBytes += (long)frame.frameSize;
        if (this.largeMessage.supply(frame)) {
            ++this.receivedCount;
            this.largeMessage = null;
        }
        return true;
    }

    protected abstract void processCorruptFrame(FrameDecoder.CorruptFrame var1) throws Crc.InvalidCrc;

    private void onEndpointReserveCapacityRegained(ResourceLimits.Limit endpointReserve, long elapsedNanos) {
        this.onReserveCapacityRegained(endpointReserve, this.globalReserveCapacity, elapsedNanos);
    }

    private void onGlobalReserveCapacityRegained(ResourceLimits.Limit globalReserve, long elapsedNanos) {
        this.onReserveCapacityRegained(this.endpointReserveCapacity, globalReserve, elapsedNanos);
    }

    private void onReserveCapacityRegained(ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve, long elapsedNanos) {
        if (this.isClosed) {
            return;
        }
        assert (this.channel.eventLoop().inEventLoop());
        this.ticket = null;
        this.throttledNanos += elapsedNanos;
        try {
            if (this.processUpToOneMessage(endpointReserve, globalReserve)) {
                this.decoder.reactivate();
            }
        }
        catch (Throwable t) {
            this.fatalExceptionCaught(t);
        }
    }

    protected abstract void fatalExceptionCaught(Throwable var1);

    private boolean processUpToOneMessage(ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) throws IOException {
        UpToOneMessageFrameProcessor processor = new UpToOneMessageFrameProcessor(endpointReserve, globalReserve);
        this.decoder.processBacklog(processor);
        return processor.isActive;
    }

    protected boolean acquireCapacity(ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve, int bytes, long currentTimeNanos, long expiresAtNanos) {
        ResourceLimits.Outcome outcome = this.acquireCapacity(endpointReserve, globalReserve, bytes);
        if (outcome == ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT) {
            this.ticket = this.endpointWaitQueue.register(this, bytes, currentTimeNanos, expiresAtNanos);
        } else if (outcome == ResourceLimits.Outcome.INSUFFICIENT_GLOBAL) {
            this.ticket = this.globalWaitQueue.register(this, bytes, currentTimeNanos, expiresAtNanos);
        }
        if (outcome != ResourceLimits.Outcome.SUCCESS) {
            ++this.throttledCount;
        }
        return outcome == ResourceLimits.Outcome.SUCCESS;
    }

    protected ResourceLimits.Outcome acquireCapacity(ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve, int bytes) {
        long currentQueueSize = this.queueSize;
        if (currentQueueSize + (long)bytes <= this.queueCapacity) {
            queueSizeUpdater.addAndGet(this, bytes);
            return ResourceLimits.Outcome.SUCCESS;
        }
        long allocatedExcess = Math.min(currentQueueSize + (long)bytes - this.queueCapacity, (long)bytes);
        if (!globalReserve.tryAllocate(allocatedExcess)) {
            return ResourceLimits.Outcome.INSUFFICIENT_GLOBAL;
        }
        if (!endpointReserve.tryAllocate(allocatedExcess)) {
            globalReserve.release(allocatedExcess);
            this.globalWaitQueue.signal();
            return ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT;
        }
        long newQueueSize = queueSizeUpdater.addAndGet(this, bytes);
        long actualExcess = Math.max(0L, Math.min(newQueueSize - this.queueCapacity, (long)bytes));
        if (actualExcess != allocatedExcess) {
            long excess = allocatedExcess - actualExcess;
            endpointReserve.release(excess);
            globalReserve.release(excess);
            this.endpointWaitQueue.signal();
            this.globalWaitQueue.signal();
        }
        return ResourceLimits.Outcome.SUCCESS;
    }

    public void releaseCapacity(int bytes) {
        long oldQueueSize = queueSizeUpdater.getAndAdd(this, -bytes);
        if (oldQueueSize > this.queueCapacity) {
            long excess = Math.min(oldQueueSize - this.queueCapacity, (long)bytes);
            this.endpointReserveCapacity.release(excess);
            this.globalReserveCapacity.release(excess);
            this.endpointWaitQueue.signal();
            this.globalWaitQueue.signal();
        }
    }

    @VisibleForTesting
    protected void releaseProcessedCapacity(int size, Message.Header header) {
        this.releaseCapacity(size);
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.isClosed = true;
        if (null != this.largeMessage) {
            this.largeMessage.abort();
        }
        if (null != this.ticket) {
            this.ticket.invalidate();
        }
        this.onClosed.call(this);
    }

    private EventLoop eventLoop() {
        return this.channel.eventLoop();
    }

    protected abstract String id();

    public static interface OnHandlerClosed {
        public void call(AbstractMessageHandler var1);
    }

    public static final class WaitQueue {
        private static final int NOT_RUNNING = 0;
        private static final int RUNNING = 1;
        private static final int RUN_AGAIN = 2;
        private volatile int scheduled;
        private static final AtomicIntegerFieldUpdater<WaitQueue> scheduledUpdater = AtomicIntegerFieldUpdater.newUpdater(WaitQueue.class, "scheduled");
        private final Kind kind;
        private final ResourceLimits.Limit reserveCapacity;
        private final ManyToOneConcurrentLinkedQueue<Ticket> queue = new ManyToOneConcurrentLinkedQueue();

        private WaitQueue(Kind kind, ResourceLimits.Limit reserveCapacity) {
            this.kind = kind;
            this.reserveCapacity = reserveCapacity;
        }

        public static WaitQueue endpoint(ResourceLimits.Limit endpointReserveCapacity) {
            return new WaitQueue(Kind.ENDPOINT, endpointReserveCapacity);
        }

        public static WaitQueue global(ResourceLimits.Limit globalReserveCapacity) {
            return new WaitQueue(Kind.GLOBAL, globalReserveCapacity);
        }

        private Ticket register(AbstractMessageHandler handler, int bytesRequested, long registeredAtNanos, long expiresAtNanos) {
            Ticket ticket = new Ticket(this, handler, bytesRequested, registeredAtNanos, expiresAtNanos);
            Ticket previous = this.queue.relaxedPeekLastAndOffer(ticket);
            if (null == previous || !previous.isWaiting()) {
                this.signal();
            }
            return ticket;
        }

        @VisibleForTesting
        public void signal() {
            if (this.queue.relaxedIsEmpty()) {
                return;
            }
            if (0 == scheduledUpdater.getAndUpdate(this, i -> Math.min(2, i + 1))) {
                do {
                    this.schedule();
                } while (2 == scheduledUpdater.getAndDecrement(this));
            }
        }

        private void schedule() {
            Ticket t;
            IdentityHashMap<EventLoop, ReactivateHandlers> tasks = null;
            long currentTimeNanos = MonotonicClock.approxTime.now();
            while ((t = this.queue.peek()) != null) {
                if (!t.call()) {
                    this.queue.remove();
                    continue;
                }
                boolean isLive = t.isLive(currentTimeNanos);
                if (isLive && !this.reserveCapacity.tryAllocate(t.bytesRequested)) {
                    if (t.reset()) break;
                    this.queue.remove();
                    continue;
                }
                if (null == tasks) {
                    tasks = new IdentityHashMap<EventLoop, ReactivateHandlers>();
                }
                this.queue.remove();
                tasks.computeIfAbsent(t.handler.eventLoop(), e -> new ReactivateHandlers()).add(t, isLive);
            }
            if (null != tasks) {
                tasks.forEach(Executor::execute);
            }
        }

        private static final class Ticket {
            private static final int WAITING = 0;
            private static final int CALLED = 1;
            private static final int INVALIDATED = 2;
            private volatile int state;
            private static final AtomicIntegerFieldUpdater<Ticket> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(Ticket.class, "state");
            private final WaitQueue waitQueue;
            private final AbstractMessageHandler handler;
            private final int bytesRequested;
            private final long reigsteredAtNanos;
            private final long expiresAtNanos;

            private Ticket(WaitQueue waitQueue, AbstractMessageHandler handler, int bytesRequested, long registeredAtNanos, long expiresAtNanos) {
                this.waitQueue = waitQueue;
                this.handler = handler;
                this.bytesRequested = bytesRequested;
                this.reigsteredAtNanos = registeredAtNanos;
                this.expiresAtNanos = expiresAtNanos;
            }

            private void reactivateHandler(ResourceLimits.Limit capacity) {
                long elapsedNanos = MonotonicClock.approxTime.now() - this.reigsteredAtNanos;
                try {
                    if (this.waitQueue.kind == Kind.ENDPOINT) {
                        this.handler.onEndpointReserveCapacityRegained(capacity, elapsedNanos);
                    } else {
                        this.handler.onGlobalReserveCapacityRegained(capacity, elapsedNanos);
                    }
                }
                catch (Throwable t) {
                    logger.error("{} exception caught while reactivating a handler", (Object)this.handler.id(), (Object)t);
                }
            }

            private boolean isWaiting() {
                return this.state == 0;
            }

            private boolean isLive(long currentTimeNanos) {
                return !MonotonicClock.approxTime.isAfter(currentTimeNanos, this.expiresAtNanos);
            }

            private void invalidate() {
                this.state = 2;
                this.waitQueue.signal();
            }

            private boolean call() {
                return stateUpdater.compareAndSet(this, 0, 1);
            }

            private boolean reset() {
                return stateUpdater.compareAndSet(this, 1, 0);
            }
        }

        private class ReactivateHandlers
        implements Runnable {
            List<Ticket> tickets = new ArrayList<Ticket>();
            long capacity = 0L;

            private ReactivateHandlers() {
            }

            private void add(Ticket ticket, boolean isLive) {
                this.tickets.add(ticket);
                if (isLive) {
                    this.capacity += (long)ticket.bytesRequested;
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                ResourceLimits.Basic limit = new ResourceLimits.Basic(this.capacity);
                try {
                    for (Ticket ticket : this.tickets) {
                        ticket.reactivateHandler(limit);
                    }
                }
                finally {
                    long remaining = limit.remaining();
                    if (remaining > 0L) {
                        WaitQueue.this.reserveCapacity.release(remaining);
                        WaitQueue.this.signal();
                    }
                }
            }
        }

        static enum Kind {
            ENDPOINT,
            GLOBAL;

        }
    }

    protected abstract class LargeMessage<H> {
        protected final int size;
        protected final H header;
        protected final List<ShareableBytes> buffers = new ArrayList<ShareableBytes>();
        protected int received;
        protected final long expiresAtNanos;
        protected boolean isExpired;
        protected boolean isCorrupt;

        protected LargeMessage(int size, H header, long expiresAtNanos, boolean isExpired) {
            this.size = size;
            this.header = header;
            this.expiresAtNanos = expiresAtNanos;
            this.isExpired = isExpired;
        }

        protected LargeMessage(int size, H header, long expiresAtNanos, ShareableBytes bytes) {
            this(size, header, expiresAtNanos, false);
            this.buffers.add(bytes);
        }

        public boolean supply(FrameDecoder.Frame frame) {
            if (frame instanceof FrameDecoder.IntactFrame) {
                this.onIntactFrame((FrameDecoder.IntactFrame)frame);
            } else {
                this.onCorruptFrame();
            }
            this.received += frame.frameSize;
            if (this.size == this.received) {
                this.onComplete();
            }
            return this.size == this.received;
        }

        private void onIntactFrame(FrameDecoder.IntactFrame frame) {
            boolean expires = MonotonicClock.approxTime.isAfter(this.expiresAtNanos);
            if (!this.isExpired && !this.isCorrupt) {
                if (!expires) {
                    this.buffers.add(frame.contents.sliceAndConsume(frame.frameSize).share());
                    return;
                }
                this.releaseBuffersAndCapacity();
            }
            frame.consume();
            this.isExpired |= expires;
        }

        private void onCorruptFrame() {
            if (!this.isExpired && !this.isCorrupt) {
                this.releaseBuffersAndCapacity();
            }
            this.isCorrupt = true;
            this.isExpired |= MonotonicClock.approxTime.isAfter(this.expiresAtNanos);
        }

        protected abstract void onComplete();

        protected abstract void abort();

        protected void releaseBuffers() {
            this.buffers.forEach(ShareableBytes::release);
            this.buffers.clear();
        }

        protected void releaseBuffersAndCapacity() {
            this.releaseBuffers();
            AbstractMessageHandler.this.releaseCapacity(this.size);
        }
    }

    private class UpToOneMessageFrameProcessor
    implements FrameDecoder.FrameProcessor {
        private final ResourceLimits.Limit endpointReserve;
        private final ResourceLimits.Limit globalReserve;
        boolean isActive = true;
        boolean firstFrame = true;

        private UpToOneMessageFrameProcessor(ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) {
            this.endpointReserve = endpointReserve;
            this.globalReserve = globalReserve;
        }

        @Override
        public boolean process(FrameDecoder.Frame frame) throws IOException {
            if (this.firstFrame) {
                if (!(frame instanceof FrameDecoder.IntactFrame)) {
                    throw new IllegalStateException("First backlog frame must be intact");
                }
                this.firstFrame = false;
                return this.processFirstFrame((FrameDecoder.IntactFrame)frame);
            }
            return this.processSubsequentFrame(frame);
        }

        private boolean processFirstFrame(FrameDecoder.IntactFrame frame) throws IOException {
            if (frame.isSelfContained) {
                this.isActive = AbstractMessageHandler.this.processOneContainedMessage(frame.contents, this.endpointReserve, this.globalReserve);
                return false;
            }
            this.isActive = AbstractMessageHandler.this.processFirstFrameOfLargeMessage(frame, this.endpointReserve, this.globalReserve);
            return this.isActive;
        }

        private boolean processSubsequentFrame(FrameDecoder.Frame frame) throws IOException {
            if (frame instanceof FrameDecoder.IntactFrame) {
                AbstractMessageHandler.this.processSubsequentFrameOfLargeMessage(frame);
            } else {
                AbstractMessageHandler.this.processCorruptFrame((FrameDecoder.CorruptFrame)frame);
            }
            return AbstractMessageHandler.this.largeMessage != null;
        }
    }
}

