/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.tcp;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.IndexedChronicle;
import net.openhft.chronicle.MappingFunction;
import net.openhft.chronicle.MappingProvider;
import net.openhft.chronicle.VanillaChronicle;
import net.openhft.chronicle.tcp.Attached;
import net.openhft.chronicle.tcp.ChronicleTcp;
import net.openhft.chronicle.tcp.TcpConnection;
import net.openhft.chronicle.tcp.VanillaSelectionKeySet;
import net.openhft.chronicle.tcp.VanillaSelector;
import net.openhft.chronicle.tools.ResizableDirectByteBufferBytes;
import net.openhft.lang.io.Bytes;
import net.openhft.lang.io.DirectByteBufferBytes;
import net.openhft.lang.io.RandomDataInput;
import net.openhft.lang.model.constraints.NotNull;
import net.openhft.lang.thread.LightPauser;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SourceTcp {
    protected final Logger logger;
    protected final String name;
    protected final AtomicBoolean running;
    protected final ChronicleQueueBuilder.ReplicaChronicleQueueBuilder builder;
    protected final ThreadPoolExecutor executor;
    protected final LightPauser pauser;

    protected SourceTcp(String name, ChronicleQueueBuilder.ReplicaChronicleQueueBuilder builder, ThreadPoolExecutor executor) {
        this.builder = builder;
        this.name = ChronicleTcp.connectionName(name, this.builder.bindAddress(), this.builder.connectAddress());
        this.logger = LoggerFactory.getLogger((String)this.name);
        this.running = new AtomicBoolean(false);
        this.executor = executor;
        this.pauser = new LightPauser(builder.busyPeriodTimeNanos(), builder.parkPeriodTimeNanos());
    }

    public SourceTcp open() {
        this.running.set(true);
        this.executor.execute(this.createHandler());
        return this;
    }

    public boolean close() {
        this.running.set(false);
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(this.builder.selectTimeout() * 2L, this.builder.selectTimeoutUnit());
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return !this.running.get();
    }

    public String toString() {
        return this.name;
    }

    public void dataNotification() {
        this.pauser.unpause();
    }

    public abstract boolean isLocalhost();

    protected abstract Runnable createHandler();

    protected Runnable createSessionHandler(@NotNull SocketChannel socketChannel) {
        Chronicle chronicle = this.builder.chronicle();
        if (chronicle != null) {
            if (chronicle instanceof IndexedChronicle) {
                this.builder.connectionListener().onConnect(socketChannel);
                return new IndexedSessionHandler(socketChannel);
            }
            if (chronicle instanceof VanillaChronicle) {
                this.builder.connectionListener().onConnect(socketChannel);
                return new VanillaSessionHandler(socketChannel);
            }
            throw new IllegalStateException("Chronicle must be Indexed or Vanilla");
        }
        throw new IllegalStateException("Chronicle can't be null");
    }

    private class VanillaSessionHandler
    extends SessionHandler {
        private boolean nextIndex;
        private long index;

        private VanillaSessionHandler(SocketChannel socketChannel) {
            super(socketChannel);
            this.nextIndex = true;
            this.index = -1L;
        }

        @Override
        protected boolean onSubscribe(SelectionKey key, long data) throws IOException {
            this.index = data;
            if (this.index == -1L) {
                this.nextIndex = true;
                this.tailer = this.tailer.toStart();
                this.index = -1L;
            } else if (this.index == -2L) {
                this.nextIndex = false;
                this.tailer = this.tailer.toEnd();
                this.index = this.tailer.index();
                if (this.index == -1L) {
                    this.nextIndex = true;
                    this.tailer = this.tailer.toStart();
                    this.index = -1L;
                }
            } else {
                this.nextIndex = false;
            }
            this.sendSizeAndIndex(-126, this.index);
            key.interestOps(5);
            return false;
        }

        @Override
        protected boolean onSubmit(SelectionKey key, long size, boolean ack) throws IOException {
            this.readUpTo((int)size);
            this.appender.startExcerpt((int)size);
            this.appender.write((RandomDataInput)this.readBuffer);
            this.appender.finish();
            SourceTcp.this.pauser.unpause();
            if (ack) {
                this.sendSizeAndIndex(-129, this.appender.lastWrittenIndex());
            }
            return true;
        }

        @Override
        protected boolean write(Object attached) throws IOException {
            if (this.nextIndex) {
                if (!this.tailer.nextIndex()) {
                    this.pause();
                    if (SourceTcp.this.running.get() && !this.tailer.nextIndex()) {
                        return false;
                    }
                }
            } else {
                if (!this.tailer.index(this.index)) {
                    return false;
                }
                this.nextIndex = true;
            }
            this.pauseReset();
            Bytes bytes = this.applyMapping(this.tailer, attached);
            int size = (int)bytes.limit();
            this.writeBuffer.clear();
            this.writeBuffer.putInt(size);
            this.writeBuffer.putLong(this.tailer.index());
            if (size > this.writeBuffer.limit() / 2) {
                while (size > 0) {
                    int minSize = Math.min(size, this.writeBuffer.remaining());
                    bytes.read(this.writeBuffer, size);
                    this.writeBuffer.flip();
                    this.connection.writeAll(this.writeBuffer);
                    this.writeBuffer.clear();
                    if ((size -= minSize) <= 0) continue;
                    this.writeBuffer.clear();
                }
            } else {
                long previousIndex;
                bytes.read(this.writeBuffer, size);
                long currentIndex = previousIndex = this.tailer.index();
                for (int count = SourceTcp.this.builder.maxExcerptsPerMessage(); count > 0 && this.tailer.nextIndex(); --count) {
                    currentIndex = this.tailer.index();
                    bytes = this.applyMapping(this.tailer, attached);
                    if (this.hasRoomForExcerpt(this.writeBuffer, bytes)) {
                        size = (int)bytes.limit();
                        previousIndex = currentIndex;
                        this.writeBuffer.putInt(size);
                        this.writeBuffer.putLong(currentIndex);
                        bytes.read(this.writeBuffer, size);
                        this.tailer.finish();
                        continue;
                    }
                    this.tailer.finish();
                    this.tailer.index(previousIndex);
                    break;
                }
                this.writeBuffer.flip();
                this.connection.writeAll(this.writeBuffer);
            }
            if (this.writeBuffer.remaining() > 0) {
                throw new EOFException("Failed to send index=" + this.tailer.index());
            }
            return true;
        }
    }

    private class IndexedSessionHandler
    extends SessionHandler {
        private long index;

        private IndexedSessionHandler(SocketChannel socketChannel) {
            super(socketChannel);
            this.index = -1L;
        }

        @Override
        protected boolean onSubscribe(SelectionKey key, long data) throws IOException {
            this.index = data;
            if (this.index == -1L) {
                this.index = -1L;
            } else if (this.index == -2L) {
                this.index = this.tailer.toEnd().index();
            }
            this.sendSizeAndIndex(-126, this.index);
            key.interestOps(5);
            return true;
        }

        @Override
        protected boolean onSubmit(SelectionKey key, long size, boolean ack) throws IOException {
            if (ack) {
                this.sendSizeAndIndex(-130, -4L);
            }
            return true;
        }

        @Override
        protected boolean write(Object attached) throws IOException {
            if (!this.tailer.index(this.index)) {
                if (this.tailer.wasPadding()) {
                    if (this.index >= 0L) {
                        this.sendSizeAndIndex(-127, this.tailer.index());
                    }
                    ++this.index;
                }
                this.pause();
                if (SourceTcp.this.running.get() && !this.tailer.index(this.index)) {
                    return false;
                }
            }
            this.pauseReset();
            Bytes bytes = this.applyMapping(this.tailer, attached);
            int size = (int)bytes.limit();
            this.writeBuffer.clear();
            this.writeBuffer.putInt(size);
            this.writeBuffer.putLong(this.tailer.index());
            if (size > this.writeBuffer.capacity() / 2) {
                while (size > 0) {
                    int minSize = Math.min(size, this.writeBuffer.remaining());
                    bytes.read(this.writeBuffer, minSize);
                    this.writeBuffer.flip();
                    this.connection.writeAll(this.writeBuffer);
                    if ((size -= minSize) <= 0) continue;
                    this.writeBuffer.clear();
                }
            } else {
                bytes.read(this.writeBuffer, size);
                int count = SourceTcp.this.builder.maxExcerptsPerMessage();
                while (count > 0 && this.tailer.index(this.index + 1L)) {
                    if (!this.tailer.wasPadding()) {
                        bytes = this.applyMapping(this.tailer, attached);
                        if (!this.hasRoomForExcerpt(this.writeBuffer, bytes)) break;
                        size = (int)bytes.limit();
                        this.writeBuffer.putInt(size);
                        this.writeBuffer.putLong(this.tailer.index());
                        bytes.read(this.writeBuffer, size);
                        ++this.index;
                        --count;
                        this.tailer.finish();
                        continue;
                    }
                    if (!this.hasRoomFor(this.writeBuffer, 12L)) break;
                    this.writeBuffer.putInt(-127);
                    this.writeBuffer.putLong(this.index);
                    ++this.index;
                }
                this.writeBuffer.flip();
                this.connection.writeAll(this.writeBuffer);
            }
            if (this.writeBuffer.remaining() > 0) {
                throw new EOFException("Failed to send index=" + this.index);
            }
            ++this.index;
            return true;
        }
    }

    private abstract class SessionHandler
    implements Runnable,
    Closeable {
        private final SocketChannel socketChannel;
        protected final TcpConnection connection;
        protected ExcerptTailer tailer;
        protected ExcerptAppender appender;
        protected long lastHeartbeat;
        private long lastUnPausedNS;
        protected final ByteBuffer writeBuffer;
        protected final ResizableDirectByteBufferBytes readBuffer;
        private ResizableDirectByteBufferBytes withMappedBuffer;

        private SessionHandler(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
            this.connection = new TcpConnection(socketChannel);
            this.tailer = null;
            this.appender = null;
            this.lastHeartbeat = 0L;
            this.lastUnPausedNS = 0L;
            this.readBuffer = new ResizableDirectByteBufferBytes(16);
            this.readBuffer.clearThreadAssociation();
            this.writeBuffer = ChronicleTcp.createBuffer(SourceTcp.this.builder.minBufferSize());
            this.writeBuffer.limit(0);
            this.withMappedBuffer = new ResizableDirectByteBufferBytes(1024);
        }

        @Override
        public void close() throws IOException {
            if (this.tailer != null) {
                this.tailer.close();
                this.tailer = null;
            }
            if (this.appender != null) {
                this.appender.close();
                this.appender = null;
            }
            if (this.socketChannel.isOpen()) {
                this.socketChannel.close();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            VanillaSelectionKeySet selectionKeys = null;
            try {
                this.socketChannel.configureBlocking(false);
                this.socketChannel.socket().setTcpNoDelay(true);
                this.socketChannel.socket().setSoTimeout(0);
                this.socketChannel.socket().setSoLinger(false, 0);
                if (SourceTcp.this.builder.receiveBufferSize() > 0) {
                    this.socketChannel.socket().setReceiveBufferSize(SourceTcp.this.builder.receiveBufferSize());
                }
                if (SourceTcp.this.builder.sendBufferSize() > 0) {
                    this.socketChannel.socket().setSendBufferSize(SourceTcp.this.builder.sendBufferSize());
                }
                VanillaSelector selector = new VanillaSelector().open().register(this.socketChannel, 1, new Attached());
                this.tailer = SourceTcp.this.builder.chronicle().createTailer();
                this.appender = SourceTcp.this.builder.chronicle().createAppender();
                selectionKeys = selector.vanillaSelectionKeys();
                if (selectionKeys != null) {
                    this.vanillaNioLoop(selector, selectionKeys);
                } else {
                    this.nioLoop(selector);
                }
            }
            catch (EOFException e) {
                if (SourceTcp.this.running.get()) {
                    SourceTcp.this.logger.info("Connection {} died", (Object)this.socketChannel);
                }
            }
            catch (Exception e) {
                if (SourceTcp.this.running.get()) {
                    String msg = e.getMessage();
                    if (msg != null && (msg.contains("reset by peer") || msg.contains("Broken pipe") || msg.contains("was aborted by"))) {
                        SourceTcp.this.logger.info("Connection {} closed from the other end: ", (Object)this.socketChannel, (Object)e.getMessage());
                    } else {
                        SourceTcp.this.logger.info("Connection {} died", (Object)this.socketChannel, (Object)e);
                    }
                }
            }
            finally {
                if (selectionKeys != null) {
                    selectionKeys.clear();
                }
            }
            try {
                this.close();
            }
            catch (IOException e) {
                SourceTcp.this.logger.warn("", (Throwable)e);
            }
        }

        private void vanillaNioLoop(VanillaSelector selector, VanillaSelectionKeySet selectionKeys) throws IOException {
            int spinLoopCount = SourceTcp.this.builder.selectorSpinLoopCount();
            long selectTimeout = SourceTcp.this.builder.selectTimeout();
            while (SourceTcp.this.running.get()) {
                SelectionKey key;
                int nbKeys = selector.select(spinLoopCount, selectTimeout);
                if (nbKeys <= 0) continue;
                SelectionKey[] keys = selectionKeys.keys();
                int size = selectionKeys.size();
                for (int k = 0; k < size && ((key = keys[k]) == null || this.onSelectionKey(key)); ++k) {
                }
                selectionKeys.clear();
            }
        }

        private void nioLoop(VanillaSelector selector) throws IOException {
            int spinLoopCount = SourceTcp.this.builder.selectorSpinLoopCount();
            long selectTimeout = SourceTcp.this.builder.selectTimeout();
            while (SourceTcp.this.running.get()) {
                int nbKeys = selector.select(spinLoopCount, selectTimeout);
                if (nbKeys <= 0) continue;
                Set<SelectionKey> keys = selector.selectionKeys();
                for (SelectionKey key : keys) {
                    if (!this.onSelectionKey(key)) break;
                }
                keys.clear();
            }
        }

        protected boolean hasRoomForExcerpt(ByteBuffer buffer, Bytes tailer) {
            return this.hasRoomFor(buffer, tailer.remaining() + 12L);
        }

        protected boolean hasRoomFor(ByteBuffer buffer, long size) {
            return (long)buffer.remaining() >= size;
        }

        protected void pauseReset() {
            this.lastUnPausedNS = System.nanoTime();
            SourceTcp.this.pauser.reset();
        }

        protected void pause() {
            if (this.lastUnPausedNS + 100000L > System.nanoTime()) {
                return;
            }
            SourceTcp.this.pauser.pause();
        }

        protected void setLastHeartbeat() {
            this.lastHeartbeat = System.currentTimeMillis() + SourceTcp.this.builder.heartbeatIntervalMillis();
        }

        protected void setLastHeartbeat(long from) {
            this.lastHeartbeat = from + SourceTcp.this.builder.heartbeatIntervalMillis();
        }

        protected void sendSizeAndIndex(int size, long index) throws IOException {
            this.connection.writeSizeAndIndex(this.writeBuffer, size, index);
            this.setLastHeartbeat();
        }

        protected DirectByteBufferBytes readUpTo(int size) throws IOException {
            this.readBuffer.resetToSize(size);
            this.connection.readFullyOrEOF(this.readBuffer.buffer());
            this.readBuffer.buffer().flip();
            this.readBuffer.position(0L);
            this.readBuffer.limit(this.readBuffer.limit());
            return this.readBuffer;
        }

        protected boolean onSelectionKey(SelectionKey key) throws IOException {
            return key == null || !(key.isReadable() ? !this.onRead(key) : key.isWritable() && !this.onWrite(key));
        }

        protected boolean onRead(SelectionKey key) throws IOException {
            try {
                long action = this.readUpTo(8).readLong();
                switch ((int)action) {
                    case 30: {
                        return this.onMapping(key, this.readUpTo(4).readInt());
                    }
                    case 1: {
                        return this.onSubscribe(key, this.readUpTo(8).readLong());
                    }
                    case 2: {
                        return this.onUnsubscribe(key, this.readUpTo(8).readLong());
                    }
                    case 10: {
                        return this.onQuery(key, this.readUpTo(8).readLong());
                    }
                    case 20: {
                        return this.onSubmit(key, this.readUpTo(8).readLong(), true);
                    }
                    case 21: {
                        return this.onSubmit(key, this.readUpTo(8).readLong(), false);
                    }
                }
                throw new IOException("Unknown action received (" + action + ")");
            }
            catch (IOException e) {
                key.selector().close();
                throw e;
            }
        }

        protected boolean onWrite(SelectionKey key) throws IOException {
            long now = System.currentTimeMillis();
            Object attachment = key.attachment();
            if (SourceTcp.this.running.get() && !this.write(attachment) && this.lastHeartbeat <= now) {
                this.sendSizeAndIndex(-128, 0L);
            }
            return true;
        }

        protected boolean onMapping(SelectionKey key, int size) throws IOException {
            MappingProvider mappingProvider = (MappingProvider)key.attachment();
            if (mappingProvider != null) {
                MappingFunction mappingFunction = (MappingFunction)this.readUpTo(size).readObject(MappingFunction.class);
                mappingProvider.withMapping(mappingFunction);
            }
            return true;
        }

        protected boolean onQuery(SelectionKey key, long data) throws IOException {
            block3: {
                if (this.tailer.index(data)) {
                    long now = System.currentTimeMillis();
                    this.setLastHeartbeat(now);
                    do {
                        if (!this.tailer.nextIndex()) continue;
                        this.sendSizeAndIndex(-126, this.tailer.index());
                        this.tailer.finish();
                        break block3;
                    } while (this.lastHeartbeat > now);
                    this.sendSizeAndIndex(-128, 0L);
                } else {
                    this.sendSizeAndIndex(-128, 0L);
                }
            }
            return true;
        }

        protected boolean onUnsubscribe(SelectionKey key, long data) throws IOException {
            key.interestOps(key.interestOps() & 0xFFFFFFFB);
            return true;
        }

        protected abstract boolean onSubscribe(SelectionKey var1, long var2) throws IOException;

        protected abstract boolean onSubmit(SelectionKey var1, long var2, boolean var4) throws IOException;

        protected abstract boolean write(Object var1) throws IOException;

        protected Bytes applyMapping(@NotNull ExcerptTailer source, @Nullable Object attached) {
            if (attached == null) {
                return source;
            }
            MappingProvider mappingProvider = (MappingProvider)attached;
            MappingFunction mappingFunction = mappingProvider.withMapping();
            if (mappingFunction == null) {
                return source;
            }
            this.withMappedBuffer.clear();
            if (this.withMappedBuffer.capacity() < source.limit()) {
                this.withMappedBuffer.resetToSize((int)source.capacity());
            }
            try {
                mappingFunction.apply(source, (Bytes)this.withMappedBuffer);
            }
            catch (IllegalArgumentException e) {
                if (e.getMessage().contains("Attempt to write")) {
                    if (this.withMappedBuffer.capacity() == Integer.MAX_VALUE) {
                        throw e;
                    }
                    this.withMappedBuffer.resetToSize(Math.min(Integer.MAX_VALUE, (int)((double)this.withMappedBuffer.capacity() * 1.5)));
                }
                throw e;
            }
            return this.withMappedBuffer.flip();
        }
    }
}

