/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.map;

import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.BitSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.hash.impl.util.BuildVersion;
import net.openhft.chronicle.hash.replication.RemoteNodeValidator;
import net.openhft.chronicle.hash.replication.TcpTransportAndNetworkConfig;
import net.openhft.chronicle.hash.replication.ThrottlingConfig;
import net.openhft.chronicle.hash.serialization.internal.SerializationBuilder;
import net.openhft.chronicle.map.AbstractChannelReplicator;
import net.openhft.chronicle.map.Replica;
import net.openhft.chronicle.map.StatelessChronicleMap;
import net.openhft.chronicle.map.StatelessServerConnector;
import net.openhft.chronicle.map.VanillaChronicleMap;
import net.openhft.chronicle.map.Work;
import net.openhft.lang.io.ByteBufferBytes;
import net.openhft.lang.io.Bytes;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class TcpReplicator<K, V>
extends AbstractChannelReplicator
implements Closeable {
    public static final long TIMESTAMP_FACTOR = 10000L;
    private static final int STATELESS_CLIENT = -127;
    private static final byte NOT_SET = (byte)StatelessChronicleMap.EventId.HEARTBEAT.ordinal();
    private static final Logger LOG = LoggerFactory.getLogger((String)TcpReplicator.class.getName());
    private static final int BUFFER_SIZE = 0x100000;
    public static final long SPIN_LOOP_TIME_IN_NONOSECONDS = TimeUnit.MICROSECONDS.toNanos(500L);
    private final SelectionKey[] selectionKeysStore = new SelectionKey[128];
    private final KeyInterestUpdater opWriteUpdater = new KeyInterestUpdater(4, this.selectionKeysStore);
    private final BitSet activeKeys = new BitSet(this.selectionKeysStore.length);
    private final long heartBeatIntervalMillis;
    private long largestEntrySoFar = 128L;
    @NotNull
    private final Replica replica;
    private final byte localIdentifier;
    @NotNull
    private final Replica.EntryExternalizable externalizable;
    @NotNull
    private final TcpTransportAndNetworkConfig replicationConfig;
    @Nullable
    private final RemoteNodeValidator remoteNodeValidator;
    private final String name;
    private long selectorTimeout;
    StatelessClientParameters<K, V> statelessClientParameters;

    public TcpReplicator(@NotNull Replica replica, @NotNull Replica.EntryExternalizable externalizable, @NotNull TcpTransportAndNetworkConfig replicationConfig, @Nullable RemoteNodeValidator remoteNodeValidator, @Nullable StatelessClientParameters statelessClientParameters, String name) throws IOException {
        super("TcpSocketReplicator-" + replica.identifier(), replicationConfig.throttlingConfig());
        this.statelessClientParameters = statelessClientParameters;
        ThrottlingConfig throttlingConfig = replicationConfig.throttlingConfig();
        long throttleBucketInterval = throttlingConfig.bucketInterval(TimeUnit.MILLISECONDS);
        this.heartBeatIntervalMillis = replicationConfig.heartBeatInterval(TimeUnit.MILLISECONDS);
        this.selectorTimeout = Math.min(this.heartBeatIntervalMillis / 4L, throttleBucketInterval);
        this.replica = replica;
        this.localIdentifier = replica.identifier();
        this.externalizable = externalizable;
        this.replicationConfig = replicationConfig;
        this.remoteNodeValidator = remoteNodeValidator;
        this.name = name;
        this.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    void processEvent() throws IOException {
        try {
            InetSocketAddress serverInetSocketAddress = new InetSocketAddress(this.replicationConfig.serverPort());
            AbstractChannelReplicator.Details serverDetails = new AbstractChannelReplicator.Details(serverInetSocketAddress, this.localIdentifier);
            new ServerConnector(serverDetails).connect();
            for (InetSocketAddress client : this.replicationConfig.endpoints()) {
                AbstractChannelReplicator.Details clientDetails = new AbstractChannelReplicator.Details(client, this.localIdentifier);
                new ClientConnector(clientDetails).connect();
            }
            while (this.selector.isOpen()) {
                int i;
                this.registerPendingRegistrations();
                int nSelectedKeys = this.select();
                long approxTime = System.currentTimeMillis();
                this.checkThrottleInterval();
                this.heartBeatMonitor(approxTime);
                this.opWriteUpdater.applyUpdates();
                if (useJavaNIOSelectionKeys) {
                    if (nSelectedKeys == 0) continue;
                    Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
                    for (SelectionKey key : selectionKeys) {
                        this.processKey(approxTime, key);
                    }
                    selectionKeys.clear();
                    continue;
                }
                SelectionKey[] keys = this.selectedKeys.flip();
                try {
                    for (i = 0; i < keys.length && keys[i] != null; ++i) {
                        SelectionKey key;
                        key = keys[i];
                        try {
                            this.processKey(approxTime, key);
                            continue;
                        }
                        catch (BufferUnderflowException e) {
                            if (this.isClosed) continue;
                            LOG.error("", (Throwable)e);
                        }
                    }
                }
                catch (Throwable throwable) {
                    for (int i2 = 0; i2 < keys.length && keys[i2] != null; ++i2) {
                        keys[i2] = null;
                    }
                    throw throwable;
                    return;
                }
                for (i = 0; i < keys.length && keys[i] != null; ++i) {
                    keys[i] = null;
                }
            }
        }
        catch (ConnectException | CancelledKeyException | ClosedChannelException | ClosedSelectorException e) {
            if (!LOG.isDebugEnabled()) return;
            LOG.debug("", (Throwable)e);
            return;
        }
        catch (Exception e) {
            LOG.error("", (Throwable)e);
            return;
        }
        catch (Throwable e) {
            LOG.error("", e);
            throw e;
        }
        finally {
            if (LOG.isDebugEnabled()) {
                LOG.debug("closing name=" + this.name);
            }
            if (!this.isClosed) {
                this.closeResources();
            }
        }
    }

    private void processKey(long approxTime, @NotNull SelectionKey key) {
        block13: {
            try {
                if (!key.isValid()) {
                    return;
                }
                if (key.isAcceptable()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("onAccept - " + this.name);
                    }
                    this.onAccept(key);
                }
                if (key.isConnectable()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("onConnect - " + this.name);
                    }
                    this.onConnect(key);
                }
                if (key.isReadable()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("onRead - " + this.name);
                    }
                    this.onRead(key, approxTime);
                }
                if (key.isWritable()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("onWrite - " + this.name);
                    }
                    this.onWrite(key, approxTime);
                }
            }
            catch (IOException | InterruptedException | BufferUnderflowException | CancelledKeyException | ClosedSelectorException e) {
                if (!this.isClosed) {
                    this.quietClose(key, e);
                }
            }
            catch (Exception e) {
                LOG.info("", (Throwable)e);
                if (this.isClosed) break block13;
                this.closeEarlyAndQuietly(key.channel());
            }
        }
    }

    private int select() throws IOException {
        long start = System.nanoTime();
        while (System.nanoTime() < start + SPIN_LOOP_TIME_IN_NONOSECONDS) {
            int keys = this.selector.selectNow();
            if (keys == 0) continue;
            return keys;
        }
        return this.selector.select(this.selectorTimeout);
    }

    void heartBeatMonitor(long approxTime) {
        int i = this.activeKeys.nextSetBit(0);
        while (i >= 0) {
            block9: {
                try {
                    SelectionKey key;
                    block10: {
                        key = this.selectionKeysStore[i];
                        if (!key.isValid() || !key.channel().isOpen()) {
                            this.activeKeys.clear(i);
                            break block9;
                        }
                        Attached attachment = (Attached)key.attachment();
                        if (attachment == null || !attachment.hasRemoteHeartbeatInterval) break block9;
                        try {
                            this.sendHeartbeatIfRequired(approxTime, key);
                        }
                        catch (Exception e) {
                            if (!LOG.isDebugEnabled()) break block10;
                            LOG.debug("", (Throwable)e);
                        }
                    }
                    try {
                        this.heartbeatCheckHasReceived(key, approxTime);
                    }
                    catch (Exception e) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("", (Throwable)e);
                        }
                    }
                }
                catch (Exception e) {
                    if (!LOG.isDebugEnabled()) break block9;
                    LOG.debug("", (Throwable)e);
                }
            }
            i = this.activeKeys.nextSetBit(i + 1);
        }
    }

    private void sendHeartbeatIfRequired(long approxTime, @NotNull SelectionKey key) {
        Attached attachment = (Attached)key.attachment();
        if (attachment.isHandShakingComplete() && attachment.entryWriter.lastSentTime + this.heartBeatIntervalMillis < approxTime) {
            attachment.entryWriter.lastSentTime = approxTime;
            attachment.entryWriter.writeHeartbeatToBuffer();
            this.enableOpWrite(key);
            if (LOG.isDebugEnabled()) {
                LOG.debug("sending heartbeat");
            }
        }
    }

    private void enableOpWrite(@NotNull SelectionKey key) {
        int ops = key.interestOps();
        if ((ops & 0x18) == 0) {
            key.interestOps(ops | 4);
        }
    }

    private void heartbeatCheckHasReceived(@NotNull SelectionKey key, long approxTimeOutTime) {
        Attached attached = (Attached)key.attachment();
        if (attached.isServer || !attached.isHandShakingComplete()) {
            return;
        }
        SocketChannel channel = (SocketChannel)key.channel();
        if (approxTimeOutTime > attached.entryReader.lastHeartBeatReceived + attached.remoteHeartbeatInterval) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("lost connection, attempting to reconnect. missed heartbeat from identifier=" + attached.remoteIdentifier);
            }
            this.activeKeys.clear(attached.remoteIdentifier);
            this.closeables.closeQuietly((Closeable)channel.socket());
            if (this.replicationConfig.autoReconnectedUponDroppedConnection()) {
                attached.connector.connectLater();
            }
        }
    }

    private void quietClose(@NotNull SelectionKey key, @NotNull Exception e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("", (Throwable)e);
        }
        this.closeEarlyAndQuietly(key.channel());
    }

    private void onConnect(@NotNull SelectionKey key) throws IOException {
        SocketChannel channel = null;
        try {
            channel = (SocketChannel)key.channel();
        }
        finally {
            this.closeables.add((Closeable)channel);
        }
        Attached attached = (Attached)key.attachment();
        try {
            if (!channel.finishConnect()) {
                return;
            }
        }
        catch (SocketException e) {
            this.quietClose(key, e);
            attached.connector.connect();
            throw e;
        }
        attached.connector.setSuccessfullyConnected();
        if (LOG.isDebugEnabled()) {
            LOG.debug("successfully connected to {}, local-id={}", (Object)channel.socket().getInetAddress(), (Object)this.localIdentifier);
        }
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(true);
        channel.socket().setSoTimeout(0);
        channel.socket().setSoLinger(false, 0);
        attached.entryReader = new TcpSocketChannelEntryReader();
        attached.entryWriter = new TcpSocketChannelEntryWriter();
        key.interestOps(5);
        this.throttle(channel);
        attached.entryWriter.identifierToBuffer(this.localIdentifier);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onAccept(@NotNull SelectionKey key) throws IOException {
        ServerSocketChannel server = null;
        try {
            server = (ServerSocketChannel)key.channel();
        }
        finally {
            if (server != null) {
                this.closeables.add((Closeable)server);
            }
        }
        SocketChannel channel = null;
        try {
            channel = server.accept();
        }
        finally {
            if (channel != null) {
                this.closeables.add((Closeable)channel);
            }
        }
        channel.configureBlocking(false);
        channel.socket().setReuseAddress(true);
        channel.socket().setTcpNoDelay(true);
        channel.socket().setSoTimeout(0);
        channel.socket().setSoLinger(false, 0);
        Attached attached = new Attached();
        attached.entryReader = new TcpSocketChannelEntryReader();
        attached.entryWriter = new TcpSocketChannelEntryWriter();
        attached.entryWriter.identifierToBuffer(this.localIdentifier);
        attached.isServer = true;
        channel.register(this.selector, 1, attached);
        this.throttle(channel);
    }

    boolean isValidVersionNumber(String versionNumber) {
        if (versionNumber.length() <= 2) {
            return false;
        }
        for (char c : versionNumber.toCharArray()) {
            if (c >= '0' && c <= '9' || c == '.' || c == '-' || c == '_' || c >= 'A' && c <= 'Z' || c >= 'a' && c <= 'z') continue;
            return false;
        }
        return true;
    }

    private void checkVersions(Attached<K, V> attached) {
        String remoteVersion = attached.serverVersion;
        String localVersion = BuildVersion.version();
        if (!remoteVersion.equals(localVersion)) {
            byte remoteIdentifier = attached.remoteIdentifier;
            LOG.warn("DIFFERENT CHRONICLE-MAP VERSIONS : local-map=" + localVersion + ", remote-map-id-" + remoteIdentifier + "=" + remoteVersion + ", The Remote Chronicle Map with " + "identifier=" + remoteIdentifier + " and this Chronicle Map are on different " + "versions, we suggest that you use the same version.");
        }
    }

    private void doHandShaking(@NotNull SelectionKey key, @NotNull SocketChannel socketChannel) throws IOException {
        Attached attached = (Attached)key.attachment();
        TcpSocketChannelEntryWriter writer = attached.entryWriter;
        TcpSocketChannelEntryReader reader = attached.entryReader;
        socketChannel.register(this.selector, 5, attached);
        if (attached.remoteIdentifier == -128) {
            byte remoteIdentifier = reader.identifierFromBuffer();
            if (remoteIdentifier == -127) {
                attached.handShakingComplete = true;
                attached.hasRemoteHeartbeatInterval = false;
                return;
            }
            if (remoteIdentifier == -128) {
                return;
            }
            attached.remoteIdentifier = remoteIdentifier;
            this.selectionKeysStore[remoteIdentifier] = key;
            this.activeKeys.set(remoteIdentifier);
            if (LOG.isDebugEnabled()) {
                LOG.debug("server-connection id={}, remoteIdentifier={}", (Object)this.localIdentifier, (Object)remoteIdentifier);
            }
            SocketAddress remoteAddress = socketChannel.getRemoteAddress();
            if (this.remoteNodeValidator != null && !this.remoteNodeValidator.validate(remoteIdentifier, remoteAddress) || remoteIdentifier == this.localIdentifier) {
                throw new IllegalStateException("dropping connection, as the remote-identifier is already being used, identifier=" + remoteIdentifier);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("handshaking for localIdentifier=" + this.localIdentifier + "," + "remoteIdentifier=" + remoteIdentifier);
            }
            attached.remoteModificationIterator = this.replica.acquireModificationIterator(remoteIdentifier, attached);
            writer.writeRemoteBootstrapTimestamp(this.replica.lastModificationTime(remoteIdentifier));
            writer.writeServerVersion();
            writer.writeRemoteHeartbeatInterval(this.heartBeatIntervalMillis);
        }
        if (attached.remoteBootstrapTimestamp == Long.MIN_VALUE) {
            attached.remoteBootstrapTimestamp = reader.remoteBootstrapTimestamp();
            if (attached.remoteBootstrapTimestamp == Long.MIN_VALUE) {
                return;
            }
        }
        if (attached.serverVersion == null) {
            try {
                attached.serverVersion = reader.readRemoteServerVersion();
            }
            catch (IllegalStateException e1) {
                socketChannel.close();
            }
            if (attached.serverVersion == null) {
                return;
            }
            if (!this.isValidVersionNumber(attached.serverVersion)) {
                LOG.warn("Please check that you don't have a third party system incorrectly connecting to ChronicleMap, Closing the remote connection as Chronicle can not make sense of the remote version number received from the external connection, version=" + attached.serverVersion + ", Chronicle is expecting the version " + "number to only contain '.',A-Z,a-z,0-9");
                socketChannel.close();
            }
            this.checkVersions(attached);
        }
        if (!attached.hasRemoteHeartbeatInterval) {
            long value = reader.readRemoteHeartbeatIntervalFromBuffer();
            if (value == Long.MIN_VALUE) {
                return;
            }
            if (value < 0L) {
                LOG.error("value=" + value);
            }
            attached.remoteHeartbeatInterval = (long)((double)value * 1.25);
            this.selectorTimeout = Math.min(this.selectorTimeout, value);
            if (this.selectorTimeout < 0L) {
                LOG.info("");
            }
            attached.hasRemoteHeartbeatInterval = true;
            attached.remoteModificationIterator.dirtyEntries(attached.remoteBootstrapTimestamp);
            reader.entriesFromBuffer(attached, key);
            attached.handShakingComplete = true;
        }
    }

    private void onWrite(@NotNull SelectionKey key, long approxTime) throws IOException {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        Attached attached = (Attached)key.attachment();
        if (attached == null) {
            LOG.info("Closing connection " + socketChannel + ", nothing attached");
            socketChannel.close();
            return;
        }
        TcpSocketChannelEntryWriter entryWriter = attached.entryWriter;
        if (entryWriter == null) {
            throw new NullPointerException("No entryWriter");
        }
        if (entryWriter.isWorkIncomplete()) {
            boolean completed = entryWriter.doWork();
            if (completed) {
                entryWriter.workCompleted();
            }
        } else if (attached.remoteModificationIterator != null) {
            entryWriter.entriesToBuffer(attached.remoteModificationIterator);
        }
        try {
            int len = entryWriter.writeBufferToSocket(socketChannel, approxTime);
            if (len == -1) {
                socketChannel.close();
            }
            if (len > 0) {
                this.contemplateThrottleWrites(len);
            }
            if (!entryWriter.hasBytesToWrite() && !entryWriter.isWorkIncomplete() && !this.hasNext(attached) && attached.isHandShakingComplete()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Disabling OP_WRITE to remoteIdentifier=" + attached.remoteIdentifier + ", localIdentifier=" + this.localIdentifier);
                }
                key.interestOps(key.interestOps() & 0xFFFFFFFB);
            }
        }
        catch (IOException e) {
            this.quietClose(key, e);
            if (!attached.isServer) {
                attached.connector.connectLater();
            }
            throw e;
        }
    }

    private boolean hasNext(Attached attached) {
        return attached.remoteModificationIterator != null && attached.remoteModificationIterator.hasNext();
    }

    private void onRead(@NotNull SelectionKey key, long approxTime) throws IOException, InterruptedException {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        Attached attached = (Attached)key.attachment();
        if (attached == null) {
            LOG.info("Closing connection " + socketChannel + ", nothing attached");
            socketChannel.close();
            return;
        }
        try {
            int len = attached.entryReader.readSocketToBuffer(socketChannel);
            if (len == -1) {
                socketChannel.register(this.selector, 0);
                if (this.replicationConfig.autoReconnectedUponDroppedConnection()) {
                    AbstractChannelReplicator.AbstractConnector connector = attached.connector;
                    if (connector != null) {
                        connector.connectLater();
                    }
                } else {
                    socketChannel.close();
                }
                return;
            }
            if (len == 0) {
                return;
            }
            if (attached.entryWriter.isWorkIncomplete()) {
                return;
            }
        }
        catch (IOException e) {
            if (!attached.isServer) {
                attached.connector.connectLater();
            }
            throw e;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("heartbeat or data received.");
        }
        attached.entryReader.lastHeartBeatReceived = approxTime;
        if (attached.isHandShakingComplete()) {
            attached.entryReader.entriesFromBuffer(attached, key);
        } else {
            this.doHandShaking(key, socketChannel);
        }
    }

    @Nullable
    private ServerSocketChannel openServerSocketChannel() throws IOException {
        ServerSocketChannel result = null;
        try {
            result = ServerSocketChannel.open();
        }
        finally {
            if (result != null) {
                this.closeables.add((Closeable)result);
            }
        }
        return result;
    }

    static /* synthetic */ long access$1200(TcpReplicator x0) {
        return x0.heartBeatIntervalMillis;
    }

    class TcpSocketChannelEntryReader {
        public static final int HEADROOM = 1024;
        public long lastHeartBeatReceived = System.currentTimeMillis();
        ByteBuffer in;
        ByteBufferBytes out;
        private long sizeInBytes;
        private byte state;

        private TcpSocketChannelEntryReader() {
            this.in = ByteBuffer.allocateDirect(TcpReplicator.this.replicationConfig.tcpBufferSize());
            this.out = new ByteBufferBytes(this.in.slice());
            this.out.limit(0L);
            this.in.clear();
        }

        void resizeBuffer(long size) {
            assert (size < Integer.MAX_VALUE);
            if (size < (long)this.in.capacity()) {
                throw new IllegalStateException("it not possible to resize the buffer smaller");
            }
            ByteBuffer buffer = ByteBuffer.allocateDirect((int)size).order(ByteOrder.nativeOrder());
            int inPosition = this.in.position();
            long outPosition = this.out.position();
            long outLimit = this.out.limit();
            this.out = new ByteBufferBytes(buffer.slice());
            this.in.position(0);
            for (int i = 0; i < inPosition; ++i) {
                buffer.put(this.in.get());
            }
            this.in = buffer;
            this.in.limit(this.in.capacity());
            this.in.position(inPosition);
            this.out.limit(outLimit);
            this.out.position(outPosition);
        }

        private int readSocketToBuffer(@NotNull SocketChannel socketChannel) throws IOException {
            this.compactBuffer();
            int len = socketChannel.read(this.in);
            this.out.limit((long)this.in.position());
            return len;
        }

        void entriesFromBuffer(@NotNull Attached attached, @NotNull SelectionKey key) {
            int entriesRead = 0;
            try {
                while (true) {
                    block12: {
                        this.out.limit((long)this.in.position());
                        if (this.state != NOT_SET) break block12;
                        if (this.out.remaining() < 5L) {
                            return;
                        }
                        this.state = this.out.readByte();
                        this.sizeInBytes = this.out.readInt();
                        assert (this.sizeInBytes >= 0L);
                        long requiredSize = this.sizeInBytes + 4L + 1L;
                        if (this.out.capacity() < requiredSize) {
                            attached.entryReader.resizeBuffer(requiredSize + 1024L);
                        }
                        if (this.state != NOT_SET) break;
                        break;
                    }
                    ++entriesRead;
                }
            }
            finally {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Entries read: {}", (Object)entriesRead);
                }
            }
        }

        private void compactBuffer() {
            if (this.in.position() == 0 || (long)this.in.remaining() > TcpReplicator.this.largestEntrySoFar) {
                return;
            }
            this.in.limit(this.in.position());
            assert (this.out.position() < Integer.MAX_VALUE);
            this.in.position((int)this.out.position());
            this.in.compact();
            this.out.position(0L);
        }

        byte identifierFromBuffer() {
            return this.out.remaining() >= 1L ? this.out.readByte() : (byte)-128;
        }

        long remoteBootstrapTimestamp() {
            if (this.out.remaining() >= 8L) {
                return this.out.readLong();
            }
            return Long.MIN_VALUE;
        }

        String readRemoteServerVersion() {
            if (this.out.remaining() >= 64L) {
                char[] chars = new char[64];
                this.out.readFully(chars, 0, chars.length);
                return new String(chars).trim();
            }
            return null;
        }

        public long readRemoteHeartbeatIntervalFromBuffer() {
            return this.out.remaining() >= 8L ? this.out.readLong() : Long.MIN_VALUE;
        }
    }

    class TcpSocketChannelEntryWriter {
        @NotNull
        private final AbstractChannelReplicator.EntryCallback entryCallback;
        @Nullable
        public Work uncompletedWork;
        private long lastSentTime;
        StatelessServerConnector statelessServer;

        private TcpSocketChannelEntryWriter() {
            this.entryCallback = new AbstractChannelReplicator.EntryCallback(TcpReplicator.this.externalizable, TcpReplicator.this.replicationConfig.tcpBufferSize());
            if (TcpReplicator.this.statelessClientParameters != null) {
                this.statelessServer = new StatelessServerConnector(TcpReplicator.this.statelessClientParameters.map, this.entryCallback, TcpReplicator.this.replicationConfig.tcpBufferSize(), TcpReplicator.this.statelessClientParameters.keySerializationBuilder, TcpReplicator.this.statelessClientParameters.valueSerializationBuilder);
            }
        }

        public boolean isWorkIncomplete() {
            return this.uncompletedWork != null;
        }

        public void workCompleted() {
            this.uncompletedWork = null;
        }

        void identifierToBuffer(byte localIdentifier) {
            this.in().writeByte((int)localIdentifier);
        }

        void ensureBufferSize(long size) {
            if (this.in().remaining() < size) {
                if ((size += this.entryCallback.in().position()) > Integer.MAX_VALUE) {
                    throw new UnsupportedOperationException();
                }
                this.entryCallback.resizeBuffer((int)size);
            }
        }

        void resizeToMessage(@NotNull IllegalStateException e) {
            int i;
            String substring;
            String message = e.getMessage();
            if (message.startsWith("java.io.IOException: Not enough available space for writing ")) {
                substring = message.substring("java.io.IOException: Not enough available space for writing ".length(), message.length());
                i = substring.indexOf(32);
                if (i == -1) {
                    throw e;
                }
            } else {
                throw e;
            }
            int size = Integer.parseInt(substring.substring(0, i));
            long requiresExtra = (long)size - this.in().remaining();
            this.ensureBufferSize((int)(this.in().capacity() + requiresExtra));
        }

        Bytes in() {
            return this.entryCallback.in();
        }

        private ByteBuffer out() {
            return this.entryCallback.out();
        }

        void writeRemoteBootstrapTimestamp(long timeStampOfLastMessage) {
            this.in().writeLong(timeStampOfLastMessage);
        }

        void writeServerVersion() {
            this.in().write(String.format("%1$64s", BuildVersion.version()).toCharArray());
        }

        void entriesToBuffer(@NotNull Replica.ModificationIterator modificationIterator) {
            int entriesWritten = 0;
            try {
                while (true) {
                    long start = this.in().position();
                    boolean success = modificationIterator.nextEntry(this.entryCallback, 0);
                    if (!success) {
                        return;
                    }
                    long entrySize = this.in().position() - start;
                    if (entrySize > TcpReplicator.this.largestEntrySoFar) {
                        TcpReplicator.this.largestEntrySoFar = entrySize;
                    }
                    if (this.in().remaining() <= TcpReplicator.this.largestEntrySoFar || this.in().position() > (long)TcpReplicator.this.replicationConfig.tcpBufferSize()) {
                        return;
                    }
                    ++entriesWritten;
                }
            }
            finally {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Entries written: {}", (Object)entriesWritten);
                }
            }
        }

        private int writeBufferToSocket(@NotNull SocketChannel socketChannel, long approxTime) throws IOException {
            Bytes in = this.in();
            ByteBuffer out = this.out();
            if (in.position() == 0L) {
                return 0;
            }
            this.lastSentTime = approxTime;
            assert (in.position() <= Integer.MAX_VALUE);
            int size = (int)in.position();
            out.limit(size);
            int len = socketChannel.write(out);
            if (LOG.isDebugEnabled()) {
                LOG.debug("bytes-written=" + len);
            }
            if (len == size) {
                out.clear();
                in.clear();
            } else {
                out.compact();
                in.position((long)out.position());
                in.limit(in.capacity());
                out.clear();
            }
            return len;
        }

        private void writeHeartbeatToBuffer() {
            this.in().writeByte(StatelessChronicleMap.EventId.HEARTBEAT.ordinal());
            this.in().writeInt(0);
        }

        private void writeRemoteHeartbeatInterval(long localHeartbeatInterval) {
            this.in().writeLong(localHeartbeatInterval);
        }

        public boolean doWork() {
            return this.uncompletedWork != null && this.uncompletedWork.doWork(this.in());
        }

        public boolean hasBytesToWrite() {
            return this.in().position() > 0L;
        }
    }

    class Attached<K, V>
    implements Replica.ModificationNotifier {
        public TcpSocketChannelEntryReader entryReader;
        public TcpSocketChannelEntryWriter entryWriter;
        @Nullable
        public Replica.ModificationIterator remoteModificationIterator;
        public AbstractChannelReplicator.AbstractConnector connector;
        public long remoteBootstrapTimestamp = Long.MIN_VALUE;
        public byte remoteIdentifier = (byte)-128;
        public boolean hasRemoteHeartbeatInterval;
        public boolean isServer;
        public boolean handShakingComplete;
        public String serverVersion;
        public long remoteHeartbeatInterval = TcpReplicator.access$1200(TcpReplicator.this);

        Attached() {
        }

        boolean isHandShakingComplete() {
            return this.handShakingComplete;
        }

        @Override
        public void onChange() {
            if (this.remoteIdentifier != -128) {
                TcpReplicator.this.opWriteUpdater.set(this.remoteIdentifier);
            }
        }
    }

    private class ClientConnector
    extends AbstractChannelReplicator.AbstractConnector {
        @NotNull
        private final AbstractChannelReplicator.Details details;

        private ClientConnector(AbstractChannelReplicator.Details details) {
            super("TCP-ClientConnector-" + details.localIdentifier());
            this.details = details;
        }

        @NotNull
        public String toString() {
            return "ClientConnector{" + this.details + '}';
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        SelectableChannel doConnect() throws IOException, InterruptedException {
            boolean success = false;
            final SocketChannel socketChannel = AbstractChannelReplicator.openSocketChannel(TcpReplicator.this.closeables);
            try {
                socketChannel.configureBlocking(false);
                socketChannel.socket().setReuseAddress(true);
                socketChannel.socket().setSoLinger(false, 0);
                socketChannel.socket().setSoTimeout(0);
                try {
                    socketChannel.connect(this.details.address());
                }
                catch (UnresolvedAddressException e) {
                    this.connectLater();
                }
                Thread.sleep(10L);
                TcpReplicator.this.addPendingRegistration(new Runnable(){

                    @Override
                    public void run() {
                        block2: {
                            Attached attached = new Attached();
                            attached.connector = ClientConnector.this;
                            try {
                                socketChannel.register(TcpReplicator.this.selector, 8, attached);
                            }
                            catch (ClosedChannelException e) {
                                if (!socketChannel.isOpen()) break block2;
                                LOG.error("", (Throwable)e);
                            }
                        }
                    }
                });
                TcpReplicator.this.selector.wakeup();
                success = true;
                SocketChannel socketChannel2 = socketChannel;
                return socketChannel2;
            }
            finally {
                if (!success) {
                    try {
                        try {
                            socketChannel.socket().close();
                        }
                        catch (Exception e) {
                            LOG.error("", (Throwable)e);
                        }
                        socketChannel.close();
                    }
                    catch (IOException e) {
                        LOG.error("", (Throwable)e);
                    }
                }
            }
        }
    }

    private class ServerConnector
    extends AbstractChannelReplicator.AbstractConnector {
        @NotNull
        private final AbstractChannelReplicator.Details details;

        private ServerConnector(AbstractChannelReplicator.Details details) {
            super("TCP-ServerConnector-" + TcpReplicator.this.localIdentifier);
            this.details = details;
        }

        @NotNull
        public String toString() {
            return "ServerConnector{" + this.details + '}';
        }

        @Override
        @Nullable
        SelectableChannel doConnect() throws IOException, InterruptedException {
            ServerSocketChannel serverChannel = TcpReplicator.this.openServerSocketChannel();
            if (serverChannel == null) {
                return null;
            }
            serverChannel.socket().setReceiveBufferSize(0x100000);
            serverChannel.configureBlocking(false);
            serverChannel.register(TcpReplicator.this.selector, 0);
            ServerSocket serverSocket = null;
            try {
                serverSocket = serverChannel.socket();
            }
            finally {
                if (serverSocket != null) {
                    TcpReplicator.this.closeables.add((Closeable)serverSocket);
                }
            }
            if (serverSocket == null) {
                return null;
            }
            serverSocket.setReuseAddress(true);
            serverSocket.bind(this.details.address());
            TcpReplicator.this.addPendingRegistration(() -> {
                Attached attached = new Attached();
                attached.connector = this;
                try {
                    serverChannel.register(TcpReplicator.this.selector, 16, attached);
                }
                catch (ClosedChannelException e) {
                    LOG.debug("", (Throwable)e);
                }
            });
            TcpReplicator.this.selector.wakeup();
            return serverChannel;
        }
    }

    private static class KeyInterestUpdater {
        private final AtomicBoolean wasChanged = new AtomicBoolean();
        @NotNull
        private final BitSet changeOfOpWriteRequired;
        @NotNull
        private final SelectionKey[] selectionKeys;
        private final int op;

        KeyInterestUpdater(int op, @NotNull SelectionKey[] selectionKeys) {
            this.op = op;
            this.selectionKeys = selectionKeys;
            this.changeOfOpWriteRequired = new BitSet(selectionKeys.length);
        }

        public void applyUpdates() {
            if (this.wasChanged.getAndSet(false)) {
                int i = this.changeOfOpWriteRequired.nextSetBit(0);
                while (i >= 0) {
                    this.changeOfOpWriteRequired.clear(i);
                    SelectionKey key = this.selectionKeys[i];
                    try {
                        key.interestOps(key.interestOps() | this.op);
                    }
                    catch (Exception e) {
                        LOG.debug("", (Throwable)e);
                    }
                    i = this.changeOfOpWriteRequired.nextSetBit(i + 1);
                }
            }
        }

        public void set(int keyIndex) {
            this.changeOfOpWriteRequired.set(keyIndex);
            this.wasChanged.lazySet(true);
        }
    }

    static class StatelessClientParameters<K, V> {
        VanillaChronicleMap<K, ?, ?, V, ?, ?, ?> map;
        SerializationBuilder<K> keySerializationBuilder;
        SerializationBuilder<V> valueSerializationBuilder;

        public StatelessClientParameters(VanillaChronicleMap<K, ?, ?, V, ?, ?, ?> map, SerializationBuilder<K> keySerializationBuilder, SerializationBuilder<V> valueSerializationBuilder) {
            this.map = map;
            this.keySerializationBuilder = keySerializationBuilder;
            this.valueSerializationBuilder = valueSerializationBuilder;
        }
    }
}

