/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.network;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.Maths;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.threads.EventHandler;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.HandlerPriority;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.network.HeartbeatListener;
import net.openhft.chronicle.network.MarshallableFunction;
import net.openhft.chronicle.network.NetworkContext;
import net.openhft.chronicle.network.NetworkLog;
import net.openhft.chronicle.network.NetworkStatsListener;
import net.openhft.chronicle.network.ServerThreadingStrategy;
import net.openhft.chronicle.network.TcpEventHandlerManager;
import net.openhft.chronicle.network.TcpHandlerBias;
import net.openhft.chronicle.network.WanSimulator;
import net.openhft.chronicle.network.api.TcpHandler;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.network.tcp.ChronicleSocket;
import net.openhft.chronicle.network.tcp.ChronicleSocketChannel;
import net.openhft.chronicle.network.tcp.ChronicleSocketChannelFactory;
import net.openhft.chronicle.threads.MediumEventLoop;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TcpEventHandler<T extends NetworkContext<T>>
extends AbstractCloseable
implements EventHandler,
TcpEventHandlerManager<T> {
    public static final int TARGET_WRITE_SIZE = Integer.getInteger("TcpEventHandler.targetWriteSize", 1024);
    private static final int MONITOR_POLL_EVERY_SEC = Integer.getInteger("tcp.event.monitor.secs", 10);
    private static final long NBR_WARNING_NANOS = Long.getLong("tcp.nbr.warning.nanos", 20000000L);
    private static final long NBW_WARNING_NANOS = Long.getLong("tcp.nbw.warning.nanos", 20000000L);
    private static final Logger LOG = LoggerFactory.getLogger(TcpEventHandler.class);
    private static final AtomicBoolean FIRST_HANDLER = new AtomicBoolean();
    private static final int DEFAULT_MAX_MESSAGE_SIZE = 0x40000000;
    public static boolean DISABLE_TCP_NODELAY = Jvm.getBoolean((String)"disable.tcp_nodelay");
    private SocketReader reader = new DefaultSocketReader();
    @NotNull
    private final ChronicleSocketChannel sc;
    private final String scToString;
    @NotNull
    private final T nc;
    @NotNull
    private final NetworkLog readLog;
    @NotNull
    private final NetworkLog writeLog;
    @NotNull
    private final Bytes<ByteBuffer> inBBB;
    @NotNull
    private final Bytes<ByteBuffer> outBBB;
    private final TcpHandlerBias.BiasController bias;
    private final boolean nbWarningEnabled;
    private final StatusMonitorEventHandler statusMonitorEventHandler;
    @Nullable
    private volatile TcpHandler<T> tcpHandler;
    private long lastTickReadTime = System.currentTimeMillis() + 20000L;
    private Thread actionThread;

    public TcpEventHandler(@NotNull T nc) {
        this(nc, false);
    }

    public TcpEventHandler(@NotNull T nc, boolean fair) {
        this(nc, fair ? TcpHandlerBias.FAIR : TcpHandlerBias.READ);
    }

    public TcpEventHandler(@NotNull T nc, TcpHandlerBias bias) {
        this.sc = ChronicleSocketChannelFactory.wrapUnsafe(nc.socketChannel().socketChannel());
        this.scToString = this.sc.toString();
        this.nc = nc;
        this.bias = (TcpHandlerBias.BiasController)bias.get();
        try {
            this.sc.configureBlocking(false);
            ChronicleSocket sock = this.sc.socket();
            if (!DISABLE_TCP_NODELAY) {
                sock.setTcpNoDelay(true);
            }
            if (TcpChannelHub.TCP_BUFFER >= 65536) {
                sock.setReceiveBufferSize(TcpChannelHub.TCP_BUFFER);
                sock.setSendBufferSize(TcpChannelHub.TCP_BUFFER);
                this.checkBufSize(sock.getReceiveBufferSize(), "recv");
                this.checkBufSize(sock.getSendBufferSize(), "send");
            }
        }
        catch (IOException e) {
            if (this.isClosed() || !this.sc.isOpen()) {
                throw new IORuntimeException((Throwable)e);
            }
            Jvm.warn().on(this.getClass(), (Throwable)e);
        }
        this.inBBB = Bytes.elasticByteBuffer((int)(TcpChannelHub.TCP_BUFFER + OS.pageSize()), (int)Math.max(TcpChannelHub.TCP_BUFFER + OS.pageSize(), 0x40000000));
        this.outBBB = Bytes.elasticByteBuffer((int)TcpChannelHub.TCP_BUFFER, (int)Math.max(TcpChannelHub.TCP_BUFFER, 0x40000000));
        ((ByteBuffer)this.outBBB.underlyingObject()).limit(0);
        this.readLog = new NetworkLog(this.sc, "read");
        this.writeLog = new NetworkLog(this.sc, "write");
        this.nbWarningEnabled = Jvm.warn().isEnabled(this.getClass());
        this.statusMonitorEventHandler = new StatusMonitorEventHandler(this.getClass());
        if (FIRST_HANDLER.compareAndSet(false, true)) {
            this.warmUp();
        }
    }

    public void eventLoop(EventLoop eventLoop) {
        block3: {
            if (eventLoop == null || eventLoop instanceof MediumEventLoop) {
                return;
            }
            try {
                eventLoop.addHandler((EventHandler)this.statusMonitorEventHandler);
            }
            catch (Exception e) {
                if (eventLoop.isClosed()) break block3;
                throw Jvm.rethrow((Throwable)e);
            }
        }
    }

    public void resetUsedByThread() {
        super.resetUsedByThread();
        ((AbstractCloseable)this.nc).resetUsedByThread();
    }

    public void reader(@NotNull SocketReader reader) {
        this.throwExceptionIfClosed();
        this.reader = reader;
    }

    public boolean action() throws InvalidEventHandlerException {
        ChronicleSocketChannel c;
        Jvm.safepoint();
        if (this.isClosed()) {
            throw new InvalidEventHandlerException();
        }
        if (this.actionThread == null) {
            this.actionThread = Thread.currentThread();
        }
        if ((c = this.sc).isClosed()) {
            throw new InvalidEventHandlerException();
        }
        if (this.tcpHandler == null) {
            return false;
        }
        try {
            return this.action0();
        }
        catch (Throwable t) {
            if (this.isClosed()) {
                throw new InvalidEventHandlerException();
            }
            throw Jvm.rethrow((Throwable)t);
        }
    }

    private boolean action0() throws InvalidEventHandlerException {
        if (!this.sc.isOpen()) {
            this.tcpHandler.onEndOfConnection(false);
            Closeable.closeQuietly(this.nc);
            throw new InvalidEventHandlerException("socket is closed");
        }
        this.statusMonitorEventHandler.incrementSocketPollCount();
        boolean busy = false;
        if (this.bias.canWrite()) {
            try {
                busy = this.writeAction();
            }
            catch (Exception e) {
                Jvm.warn().on(this.getClass(), (Throwable)e);
            }
        }
        if (this.bias.canRead()) {
            try {
                busy = this.readAction(busy);
            }
            catch (ClosedChannelException e) {
                this.close();
                throw new InvalidEventHandlerException((Throwable)e);
            }
            catch (IOException e) {
                this.close();
                this.handleIOE(e, this.tcpHandler.hasClientClosed(), this.nc.heartbeatListener());
                throw new InvalidEventHandlerException();
            }
            catch (InvalidEventHandlerException e) {
                this.close();
                throw e;
            }
            catch (Exception e) {
                this.close();
                Jvm.warn().on(this.getClass(), "", (Throwable)e);
                throw new InvalidEventHandlerException((Throwable)e);
            }
        }
        return busy;
    }

    /*
     * Enabled aggressive block sorting
     */
    private boolean readAction(boolean busy) throws IOException, InvalidEventHandlerException {
        ByteBuffer inBB = (ByteBuffer)this.inBBB.underlyingObject();
        int start = inBB.position();
        assert (!this.sc.isBlocking());
        long beginNs = System.nanoTime();
        int read = inBB.remaining() > 0 ? this.reader.read(this.sc, this.inBBB) : Integer.MAX_VALUE;
        long elapsedNs = System.nanoTime() - beginNs;
        if (this.nbWarningEnabled && elapsedNs > NBR_WARNING_NANOS) {
            this.statusMonitorEventHandler.add(new ThreadLogTypeElapsedRecord(LogType.READ, elapsedNs));
        }
        if (read == Integer.MAX_VALUE) {
            this.onInBBFul();
        }
        if (read > 0) {
            WanSimulator.dataRead(read);
            this.tcpHandler.onReadTime(System.nanoTime(), inBB, start, inBB.position());
            this.lastTickReadTime = System.currentTimeMillis();
            this.readLog.log(inBB, start, inBB.position());
            this.invokeHandler();
            return true;
        }
        if (read != 0) {
            this.close();
            throw new InvalidEventHandlerException("socket closed " + this.sc);
        }
        if (this.outBBB.readRemaining() > 0L) {
            busy |= this.invokeHandler();
        }
        if (this.nc.heartbeatTimeoutMs() <= 0L) return busy;
        long tickTime = System.currentTimeMillis();
        if (tickTime <= this.lastTickReadTime + this.nc.heartbeatTimeoutMs()) return busy;
        HeartbeatListener heartbeatListener = this.nc.heartbeatListener();
        if (heartbeatListener != null && heartbeatListener.onMissedHeartbeat()) {
            this.lastTickReadTime += heartbeatListener.lingerTimeBeforeDisconnect();
            return busy;
        }
        this.tcpHandler.onEndOfConnection(true);
        this.close();
        throw new InvalidEventHandlerException("heartbeat timeout");
    }

    public String toString() {
        return "TcpEventHandler{sc=" + this.scToString + ", tcpHandler=" + this.tcpHandler + ", closed=" + this.isClosed() + '}';
    }

    public void warmUp() {
        System.out.println(TcpEventHandler.class.getSimpleName() + " - Warming up...");
        int runs = 12000;
        long beginNs = System.nanoTime();
        for (int i = 0; i < 12000; ++i) {
            this.inBBB.readPositionRemaining(8L, 1024L);
            this.compactBuffer();
            this.clearBuffer();
        }
        long elapsedNs = System.nanoTime() - beginNs;
        System.out.println(TcpEventHandler.class.getSimpleName() + " - ... warmed up - took " + (double)(elapsedNs / 12000L) / 1000.0 + " us avg");
    }

    private void checkBufSize(int bufSize, String name) {
        if (bufSize < TcpChannelHub.TCP_BUFFER) {
            LOG.warn("Attempted to set " + name + " tcp buffer to " + TcpChannelHub.TCP_BUFFER + " but kernel only allowed " + bufSize);
        }
    }

    public ChronicleSocketChannel socketChannel() {
        return this.sc;
    }

    @NotNull
    public HandlerPriority priority() {
        ServerThreadingStrategy sts = this.nc.serverThreadingStrategy();
        switch (sts) {
            case SINGLE_THREADED: {
                return this.singleThreadedPriority();
            }
            case CONCURRENT: {
                return HandlerPriority.CONCURRENT;
            }
        }
        throw new UnsupportedOperationException("todo");
    }

    @NotNull
    public HandlerPriority singleThreadedPriority() {
        return HandlerPriority.MEDIUM;
    }

    @Nullable
    public TcpHandler<T> tcpHandler() {
        return this.tcpHandler;
    }

    @Override
    public void tcpHandler(TcpHandler<T> tcpHandler) {
        this.throwExceptionIfClosedInSetter();
        this.nc.onHandlerChanged(tcpHandler);
        this.tcpHandler = tcpHandler;
    }

    protected boolean threadSafetyCheck(boolean isUsed) {
        return true;
    }

    public void loopFinished() {
        this.inBBB.releaseLast();
        this.outBBB.releaseLast();
    }

    public void onInBBFul() {
        LOG.trace("inBB is full, can't read from socketChannel");
    }

    boolean invokeHandler() throws IOException {
        ByteBuffer outBB;
        long lastInBBBReadPosition;
        Jvm.safepoint();
        boolean busy = false;
        int position = ((ByteBuffer)this.inBBB.underlyingObject()).position();
        this.inBBB.readLimit((long)position);
        this.outBBB.writePosition((long)((ByteBuffer)this.outBBB.underlyingObject()).limit());
        do {
            lastInBBBReadPosition = this.inBBB.readPosition();
            this.tcpHandler.process(this.inBBB, this.outBBB, this.nc);
            this.statusMonitorEventHandler.addBytesRead(this.inBBB.readPosition() - lastInBBBReadPosition);
            outBB = (ByteBuffer)this.outBBB.underlyingObject();
            int wpBBB = Maths.toUInt31((long)this.outBBB.writePosition());
            int length = wpBBB - outBB.limit();
            if (length < TARGET_WRITE_SIZE) continue;
            outBB.limit(wpBBB);
            boolean busy2 = this.tryWrite(outBB);
            if (!busy2) break;
            busy = busy2;
        } while (lastInBBBReadPosition != this.inBBB.readPosition());
        outBB = (ByteBuffer)this.outBBB.underlyingObject();
        if (this.outBBB.writePosition() > (long)outBB.limit() || this.outBBB.writePosition() >= 4L) {
            outBB.limit(Maths.toInt32((long)this.outBBB.writePosition()));
            busy |= this.tryWrite(outBB);
        }
        Jvm.safepoint();
        if (this.inBBB.readRemaining() == 0L) {
            this.clearBuffer();
        } else if (this.inBBB.readPosition() > (long)(TcpChannelHub.TCP_BUFFER / 4)) {
            this.compactBuffer();
            busy = true;
        }
        return busy;
    }

    private void clearBuffer() {
        this.inBBB.clear();
        @Nullable ByteBuffer inBB = (ByteBuffer)this.inBBB.underlyingObject();
        inBB.clear();
    }

    private void compactBuffer() {
        @Nullable ByteBuffer inBB = (ByteBuffer)this.inBBB.underlyingObject();
        inBB.position((int)this.inBBB.readPosition());
        inBB.limit((int)this.inBBB.readLimit());
        Jvm.safepoint();
        inBB.compact();
        Jvm.safepoint();
        this.inBBB.readPosition(0L);
        this.inBBB.readLimit((long)inBB.remaining());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleIOE(@NotNull IOException e, boolean clientIntentionallyClosed, @Nullable HeartbeatListener heartbeatListener) {
        try {
            if (clientIntentionallyClosed) {
                return;
            }
            if (e.getMessage() != null && e.getMessage().startsWith("Connection reset by peer")) {
                LOG.trace(e.getMessage(), (Throwable)e);
            } else if (e.getMessage() != null && e.getMessage().startsWith("An existing connection was forcibly closed")) {
                Jvm.debug().on(this.getClass(), e.getMessage());
            } else if (!(e instanceof ClosedByInterruptException)) {
                Jvm.warn().on(this.getClass(), "", (Throwable)e);
            }
            if (heartbeatListener != null) {
                heartbeatListener.onMissedHeartbeat();
            }
        }
        finally {
            this.close();
        }
    }

    protected void performClose() {
        Closeable.closeQuietly((Object[])new Object[]{this.tcpHandler, this.nc.networkStatsListener(), this.sc, this.nc});
    }

    boolean tryWrite(ByteBuffer outBB) throws IOException {
        if (outBB.remaining() <= 0) {
            return false;
        }
        int start = outBB.position();
        long beginNs = System.nanoTime();
        assert (!this.sc.isBlocking());
        int wrote = this.sc.write(outBB);
        long elapsedNs = System.nanoTime() - beginNs;
        if (this.nbWarningEnabled && elapsedNs > NBW_WARNING_NANOS) {
            this.statusMonitorEventHandler.add(new ThreadLogTypeElapsedRecord(LogType.WRITE, elapsedNs));
        }
        this.tcpHandler.onWriteTime(beginNs, outBB, start, outBB.position());
        this.statusMonitorEventHandler.addBytesWritten(outBB.position() - start);
        this.writeLog.log(outBB, start, outBB.position());
        if (wrote < 0) {
            this.close();
        } else if (wrote > 0) {
            outBB.compact().flip();
            this.outBBB.writePosition((long)outBB.limit());
            return true;
        }
        return false;
    }

    public boolean writeAction() {
        boolean busy;
        block5: {
            busy = false;
            try {
                ByteBuffer outBB = (ByteBuffer)this.outBBB.underlyingObject();
                int remaining = outBB.remaining();
                boolean bl = busy = remaining > 0;
                if (busy) {
                    this.tryWrite(outBB);
                }
                if (outBB.remaining() == remaining && !(busy |= this.invokeHandler())) {
                    busy = this.tryWrite(outBB);
                }
            }
            catch (ClosedChannelException cce) {
                this.close();
            }
            catch (IOException e) {
                if (this.isClosed()) break block5;
                this.handleIOE(e, this.tcpHandler.hasClientClosed(), this.nc.heartbeatListener());
            }
        }
        return busy;
    }

    static {
        if (DISABLE_TCP_NODELAY) {
            System.out.println("tcpNoDelay disabled");
        }
    }

    private final class StatusMonitorEventHandler
    implements EventHandler {
        private final String className;
        private final StringBuilder messageBuilder = new StringBuilder();
        private final AtomicInteger socketPollCount = new AtomicInteger();
        private final AtomicLong bytesReadCount = new AtomicLong();
        private final AtomicLong bytesWriteCount = new AtomicLong();
        private final Queue<ThreadLogTypeElapsedRecord> logs = new ConcurrentLinkedQueue<ThreadLogTypeElapsedRecord>();
        private long lastMonitor;

        public StatusMonitorEventHandler(Class<?> clazz) {
            this.className = clazz.getSimpleName();
        }

        public boolean action() throws InvalidEventHandlerException {
            long now;
            if (TcpEventHandler.this.isClosed()) {
                throw InvalidEventHandlerException.reusable();
            }
            if (!this.logs.isEmpty()) {
                ThreadLogTypeElapsedRecord msg;
                while ((msg = this.logs.poll()) != null) {
                    this.messageBuilder.setLength(0);
                    this.messageBuilder.append("Non blocking ").append(this.className).append(" ").append(msg.logType.label()).append(" took ").append(msg.elapsedNs / 1000L).append(" us");
                    Jvm.warn().on(this.getClass(), this.messageBuilder.toString());
                }
            }
            if ((now = System.currentTimeMillis()) > this.lastMonitor + (long)(MONITOR_POLL_EVERY_SEC * 1000)) {
                NetworkStatsListener networkStatsListener = TcpEventHandler.this.nc.networkStatsListener();
                if (networkStatsListener != null && !networkStatsListener.isClosed()) {
                    if (this.lastMonitor == 0L) {
                        networkStatsListener.onNetworkStats(0L, 0L, 0L);
                    } else {
                        networkStatsListener.onNetworkStats(this.bytesWriteCount.get() / (long)MONITOR_POLL_EVERY_SEC, this.bytesReadCount.get() / (long)MONITOR_POLL_EVERY_SEC, this.socketPollCount.get() / MONITOR_POLL_EVERY_SEC);
                        this.bytesWriteCount.set(0L);
                        this.bytesReadCount.set(0L);
                        this.socketPollCount.set(0);
                    }
                }
                this.lastMonitor = now;
            }
            return false;
        }

        private void incrementSocketPollCount() {
            this.socketPollCount.incrementAndGet();
        }

        private void addBytesRead(long delta) {
            this.bytesReadCount.addAndGet(delta);
        }

        private void addBytesWritten(long delta) {
            this.bytesReadCount.addAndGet(delta);
        }

        private void add(@NotNull ThreadLogTypeElapsedRecord logTypeTimeRecord) {
            this.logs.add(logTypeTimeRecord);
        }

        @NotNull
        public HandlerPriority priority() {
            return HandlerPriority.MONITOR;
        }
    }

    private static final class ThreadLogTypeElapsedRecord {
        private final LogType logType;
        private final long elapsedNs;

        public ThreadLogTypeElapsedRecord(@NotNull LogType logType, long elapsedNs) {
            this.logType = logType;
            this.elapsedNs = elapsedNs;
        }
    }

    public static class Factory<T extends NetworkContext<T>>
    implements MarshallableFunction<T, TcpEventHandler<T>> {
        @Override
        @NotNull
        public TcpEventHandler<T> apply(@NotNull T nc) {
            return new TcpEventHandler<T>(nc);
        }
    }

    public static final class DefaultSocketReader
    implements SocketReader {
        @Override
        public int read(@NotNull ChronicleSocketChannel socketChannel, @NotNull Bytes<ByteBuffer> bytes) throws IOException {
            return socketChannel.read((ByteBuffer)bytes.underlyingObject());
        }
    }

    @FunctionalInterface
    public static interface SocketReader {
        public int read(@NotNull ChronicleSocketChannel var1, @NotNull Bytes<ByteBuffer> var2) throws IOException;
    }

    private static enum LogType {
        READ("read"),
        WRITE("write");

        private final String label;

        private LogType(String label) {
            this.label = label;
        }

        private String label() {
            return this.label;
        }
    }
}

