/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.network.connection;

import gnu.trove.TCollections;
import gnu.trove.map.TLongObjectMap;
import gnu.trove.map.hash.TLongObjectHashMap;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesUtil;
import net.openhft.chronicle.bytes.ConnectionDroppedException;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.StackTrace;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.threads.EventHandler;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.HandlerPriority;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.core.util.Time;
import net.openhft.chronicle.network.ConnectionStrategy;
import net.openhft.chronicle.network.WanSimulator;
import net.openhft.chronicle.network.api.session.SessionDetails;
import net.openhft.chronicle.network.api.session.SessionProvider;
import net.openhft.chronicle.network.connection.AbstractAsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.AsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.ClientConnectionMonitor;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.EventId;
import net.openhft.chronicle.network.connection.SocketAddressSupplier;
import net.openhft.chronicle.network.connection.TraceLock;
import net.openhft.chronicle.network.connection.TryLock;
import net.openhft.chronicle.threads.LongPauser;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.threads.PauserMonitor;
import net.openhft.chronicle.threads.Threads;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.YamlLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TcpChannelHub
extends AbstractCloseable {
    private static final boolean hasAssert;
    public static final int TCP_BUFFER;
    static final int SAFE_TCP_SIZE;
    private static final int HEATBEAT_PING_PERIOD;
    private static final int HEATBEAT_TIMEOUT_PERIOD;
    private static final int SIZE_OF_SIZE = 4;
    private static final Set<TcpChannelHub> hubs;
    private static final Logger LOG;
    final long timeoutMs;
    @NotNull
    private final String name;
    private final int tcpBufferSize;
    private final Wire outWire;
    @NotNull
    private final SocketAddressSupplier socketAddressSupplier;
    private final Set<Long> preventSubscribeUponReconnect = new ConcurrentSkipListSet<Long>();
    private final ReentrantLock outBytesLock = TraceLock.create();
    private final Condition condition = this.outBytesLock.newCondition();
    @NotNull
    private final AtomicLong transactionID = new AtomicLong(0L);
    @Nullable
    private final SessionProvider sessionProvider;
    @NotNull
    private final TcpSocketConsumer tcpSocketConsumer;
    @NotNull
    private final EventLoop eventLoop;
    @NotNull
    private final WireType wireType;
    private final Wire handShakingWire;
    @Nullable
    private final ClientConnectionMonitor clientConnectionMonitor;
    private final ConnectionStrategy connectionStrategy;
    @NotNull
    private final Pauser pauser;
    private long largestChunkSoFar = 0L;
    @Nullable
    private volatile SocketChannel clientChannel;
    @NotNull
    private final CountDownLatch receivedClosedAcknowledgement = new CountDownLatch(1);
    private long limitOfLast = 0L;
    private final boolean shouldSendCloseMessage;
    private final HandlerPriority priority;

    public TcpChannelHub(@Nullable SessionProvider sessionProvider, @NotNull EventLoop eventLoop, @NotNull WireType wireType, @NotNull String name, @NotNull SocketAddressSupplier socketAddressSupplier, boolean shouldSendCloseMessage, @Nullable ClientConnectionMonitor clientConnectionMonitor, @NotNull HandlerPriority monitor, @NotNull ConnectionStrategy connectionStrategy) {
        this(sessionProvider, eventLoop, wireType, name, socketAddressSupplier, shouldSendCloseMessage, clientConnectionMonitor, monitor, connectionStrategy, null);
    }

    public TcpChannelHub(@Nullable SessionProvider sessionProvider, @NotNull EventLoop eventLoop, @NotNull WireType wireType, @NotNull String name, @NotNull SocketAddressSupplier socketAddressSupplier, boolean shouldSendCloseMessage, @Nullable ClientConnectionMonitor clientConnectionMonitor, @NotNull HandlerPriority monitor, @NotNull ConnectionStrategy connectionStrategy, @Nullable Supplier<Pauser> pauserSupplier) {
        assert (!name.trim().isEmpty());
        this.connectionStrategy = connectionStrategy;
        this.priority = monitor;
        this.socketAddressSupplier = socketAddressSupplier;
        this.eventLoop = eventLoop;
        this.tcpBufferSize = Integer.getInteger("tcp.client.buffer.size", TCP_BUFFER);
        this.outWire = (Wire)wireType.apply((Object)Bytes.elasticByteBuffer());
        this.name = name.trim();
        this.timeoutMs = Integer.getInteger("tcp.client.timeout", 10000).intValue();
        this.wireType = wireType;
        this.handShakingWire = (Wire)WireType.TEXT.apply((Object)Bytes.elasticByteBuffer());
        this.sessionProvider = sessionProvider;
        this.shouldSendCloseMessage = shouldSendCloseMessage;
        this.clientConnectionMonitor = clientConnectionMonitor;
        this.pauser = pauserSupplier != null ? pauserSupplier.get() : new LongPauser(100, 100, 500L, 20000L, TimeUnit.MICROSECONDS);
        hubs.add(this);
        eventLoop.addHandler((EventHandler)new PauserMonitor(this.pauser, "async-read", 30));
        this.tcpSocketConsumer = new TcpSocketConsumer();
    }

    /*
     * Exception decompiling
     */
    private static int getTcpBufferSize() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 3 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public static void assertAllHubsClosed() {
        StringBuilder errors = new StringBuilder();
        for (TcpChannelHub h : hubs) {
            if (!h.isClosed()) {
                errors.append("Connection ").append((Object)h).append(" still open\n");
            }
            h.close();
        }
        hubs.clear();
        if (errors.length() > 0) {
            throw new AssertionError((Object)errors.toString());
        }
    }

    public static void closeAllHubs() {
        TcpChannelHub[] hubsArr;
        for (TcpChannelHub hub : hubsArr = hubs.toArray(new TcpChannelHub[hubs.size()])) {
            if (hub.isClosed()) continue;
            if (Jvm.isDebugEnabled(TcpChannelHub.class)) {
                Jvm.debug().on(TcpChannelHub.class, "Closing " + (Object)((Object)hub));
            }
            hub.close();
        }
        hubs.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void logToStandardOutMessageReceived(@NotNull Wire wire) {
        @NotNull Bytes bytes = wire.bytes();
        if (!YamlLogging.showClientReads()) {
            return;
        }
        long position = bytes.writePosition();
        long limit = bytes.writeLimit();
        try {
            try {
                LOG.info("\nreceives:\n```yaml\n" + Wires.fromSizePrefixedBlobs((WireIn)wire) + "```\n");
                YamlLogging.title = "";
                YamlLogging.writeMessage((String)"");
            }
            catch (Exception e) {
                Jvm.warn().on(TcpChannelHub.class, Bytes.toString((Bytes)bytes), (Throwable)e);
            }
        }
        finally {
            bytes.writeLimit(limit);
            bytes.writePosition(position);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void logToStandardOutMessageReceivedInERROR(@NotNull Wire wire) {
        @NotNull Bytes bytes = wire.bytes();
        long position = bytes.writePosition();
        long limit = bytes.writeLimit();
        try {
            try {
                LOG.info("\nreceives IN ERROR:\n```yaml\n" + Wires.fromSizePrefixedBlobs((WireIn)wire) + "```\n");
                YamlLogging.title = "";
                YamlLogging.writeMessage((String)"");
            }
            catch (Exception e) {
                String x = Bytes.toString((Bytes)bytes);
                Jvm.warn().on(TcpChannelHub.class, x, (Throwable)e);
            }
        }
        finally {
            bytes.writeLimit(limit);
            bytes.writePosition(position);
        }
    }

    private static boolean checkWritesOnReadThread(@NotNull TcpSocketConsumer tcpSocketConsumer) {
        assert (Thread.currentThread() != tcpSocketConsumer.readThread) : "if writes and reads are on the same thread this can lead to deadlocks with the server, if the server buffer becomes full";
        return true;
    }

    void clear(@NotNull Wire wire) {
        assert (wire.startUse());
        try {
            wire.clear();
        }
        finally {
            assert (wire.endUse());
        }
    }

    /*
     * Exception decompiling
     */
    @Nullable
    SocketChannel openSocketChannel(InetSocketAddress socketAddress) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [2[TRYBLOCK]], but top level block is 4[CATCHBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public void preventSubscribeUponReconnect(long tid) {
        this.preventSubscribeUponReconnect.add(tid);
    }

    @NotNull
    public String toString() {
        return "TcpChannelHub{name=" + this.name + "remoteAddressSupplier=" + this.socketAddressSupplier + '}';
    }

    private void onDisconnected() {
        InetSocketAddress socketAddress;
        if (LOG.isDebugEnabled()) {
            Jvm.debug().on(((Object)((Object)this)).getClass(), "disconnected to remoteAddress=" + this.socketAddressSupplier);
        }
        this.tcpSocketConsumer.onConnectionClosed();
        if (this.clientConnectionMonitor != null && (socketAddress = this.socketAddressSupplier.get()) != null) {
            this.clientConnectionMonitor.onDisconnected(this.name, socketAddress);
        }
    }

    private void onConnected() {
        InetSocketAddress socketAddress;
        if (LOG.isDebugEnabled()) {
            Jvm.debug().on(((Object)((Object)this)).getClass(), "connected to remoteAddress=" + this.socketAddressSupplier);
        }
        if (this.clientConnectionMonitor != null && (socketAddress = this.socketAddressSupplier.get()) != null) {
            this.clientConnectionMonitor.onConnected(this.name, socketAddress);
        }
    }

    public void subscribe(@NotNull AsyncSubscription asyncSubscription) {
        this.subscribe(asyncSubscription, false);
    }

    private void subscribe(@NotNull AsyncSubscription asyncSubscription, boolean tryLock) {
        this.tcpSocketConsumer.subscribe(asyncSubscription, tryLock);
    }

    public void unsubscribe(long tid) {
        this.tcpSocketConsumer.unsubscribe(tid);
    }

    @NotNull
    public ReentrantLock outBytesLock() {
        return this.outBytesLock;
    }

    void doHandShaking(@NotNull SocketChannel socketChannel) throws IOException {
        assert (this.outBytesLock.isHeldByCurrentThread());
        @Nullable SessionDetails sessionDetails = this.sessionDetails();
        if (sessionDetails != null) {
            this.handShakingWire.clear();
            @NotNull Bytes bytes = this.handShakingWire.bytes();
            bytes.clear();
            this.handShakingWire.writeDocument(false, wireOut -> {
                wireOut.writeEventName((WireKey)EventId.userId).text(sessionDetails.userId());
                wireOut.writeEventName((WireKey)EventId.domain).text(sessionDetails.domain());
                wireOut.writeEventName((WireKey)EventId.sessionMode).text(sessionDetails.sessionMode().toString());
                wireOut.writeEventName((WireKey)EventId.securityToken).text(sessionDetails.securityToken());
                wireOut.writeEventName((WireKey)EventId.clientId).text(sessionDetails.clientId().toString());
                wireOut.writeEventName((WireKey)EventId.wireType).text(this.wireType.toString());
            });
            this.writeSocket1((WireOut)this.handShakingWire, socketChannel);
        }
    }

    @Nullable
    private SessionDetails sessionDetails() {
        if (this.sessionProvider == null) {
            return null;
        }
        return this.sessionProvider.get();
    }

    synchronized void closeSocket() {
        @Nullable SocketChannel clientChannel = this.clientChannel;
        if (clientChannel != null) {
            try {
                clientChannel.socket().shutdownInput();
            }
            catch (ClosedChannelException closedChannelException) {
            }
            catch (IOException e) {
                Jvm.debug().on(((Object)((Object)this)).getClass(), (Throwable)e);
            }
            try {
                clientChannel.socket().shutdownOutput();
            }
            catch (ClosedChannelException e) {
            }
            catch (IOException e) {
                Jvm.debug().on(((Object)((Object)this)).getClass(), (Throwable)e);
            }
            Closeable.closeQuietly((Object)clientChannel);
            this.clientChannel = null;
            if (LOG.isDebugEnabled()) {
                Jvm.debug().on(((Object)((Object)this)).getClass(), "closing", (Throwable)new StackTrace("only added for logging - please ignore !"));
            }
            @NotNull TcpSocketConsumer tcpSocketConsumer = this.tcpSocketConsumer;
            tcpSocketConsumer.tid = 0L;
            if (hasAssert) {
                tcpSocketConsumer.omap.clear();
            }
            this.onDisconnected();
        }
    }

    public boolean isOpen() {
        return this.clientChannel != null;
    }

    public void notifyClosing() {
        this.close();
    }

    protected void performClose() {
        this.tcpSocketConsumer.prepareToShutdown();
        if (this.shouldSendCloseMessage) {
            this.eventLoop.addHandler(new EventHandler(){

                public boolean action() throws InvalidEventHandlerException {
                    try {
                        TcpChannelHub.this.sendCloseMessage();
                    }
                    catch (ConnectionDroppedException e) {
                        throw new InvalidEventHandlerException((Throwable)e);
                    }
                    throw InvalidEventHandlerException.reusable();
                }

                @NotNull
                public String toString() {
                    return TcpChannelHub.class.getSimpleName() + ".close()";
                }
            });
            this.awaitAckOfClosedMessage();
        }
        if (LOG.isDebugEnabled()) {
            Jvm.debug().on(((Object)((Object)this)).getClass(), "closing connection to " + this.socketAddressSupplier);
        }
        this.tcpSocketConsumer.stop();
        while (this.clientChannel != null) {
            if (!LOG.isDebugEnabled()) continue;
            Jvm.debug().on(((Object)((Object)this)).getClass(), "waiting for disconnect to " + this.socketAddressSupplier);
            Jvm.pause((long)50L);
        }
        this.outWire.bytes().release();
        this.handShakingWire.bytes().release();
    }

    private void sendCloseMessage() {
        this.lock(() -> {
            this.writeMetaDataForKnownTID(0L, this.outWire, null, 0L);
            this.outWire.writeDocument(false, w -> w.writeEventName((WireKey)EventId.onClientClosing).text(""));
        }, TryLock.LOCK);
    }

    private void awaitAckOfClosedMessage() {
        try {
            boolean await = this.receivedClosedAcknowledgement.await(250L, TimeUnit.MILLISECONDS);
            if (!await && Jvm.isDebugEnabled(((Object)((Object)this)).getClass())) {
                Jvm.debug().on(((Object)((Object)this)).getClass(), "SERVER IGNORED CLOSE REQUEST: shutting down the client anyway as the server did not respond to the close() request.");
            }
        }
        catch (InterruptedException ignore) {
            Thread.currentThread().interrupt();
        }
    }

    public long nextUniqueTransaction(long timeMs) {
        long old;
        long id = timeMs;
        do {
            if ((old = this.transactionID.get()) < id) continue;
            id = old + 1L;
        } while (!this.transactionID.compareAndSet(old, id));
        return id;
    }

    public void writeSocket(@NotNull WireOut wire, boolean reconnectOnFailure) {
        block18: {
            assert (this.outBytesLock().isHeldByCurrentThread());
            try {
                assert (wire.startUse());
                @Nullable SocketChannel clientChannel = this.clientChannel;
                if (clientChannel == null) {
                    if (!reconnectOnFailure) {
                        return;
                    }
                    byte[] bytes = wire.bytes().toByteArray();
                    assert (wire.endUse());
                    this.condition.await(10L, TimeUnit.SECONDS);
                    assert (wire.startUse());
                    wire.clear();
                    wire.bytes().write(bytes);
                }
                this.writeSocket1(wire, this.clientChannel);
            }
            catch (ClosedChannelException e) {
                this.closeSocket();
                Jvm.pause((long)500L);
                if (reconnectOnFailure) {
                    throw new ConnectionDroppedException((Throwable)e);
                }
            }
            catch (IOException e) {
                if (!"Broken pipe".equals(e.getMessage())) {
                    Jvm.warn().on(((Object)((Object)this)).getClass(), (Throwable)e);
                }
                this.closeSocket();
                Jvm.pause((long)500L);
                throw new ConnectionDroppedException((Throwable)e);
            }
            catch (ConnectionDroppedException e) {
                this.closeSocket();
                Jvm.pause((long)500L);
                throw e;
            }
            catch (Exception e) {
                Jvm.warn().on(((Object)((Object)this)).getClass(), (Throwable)e);
                this.closeSocket();
                Jvm.pause((long)500L);
                throw new ConnectionDroppedException((Throwable)e);
            }
            finally {
                if ($assertionsDisabled || wire.endUse()) break block18;
                throw new AssertionError();
            }
        }
    }

    public Wire proxyReply(long timeoutTime, long tid) throws ConnectionDroppedException, TimeoutException {
        try {
            return this.tcpSocketConsumer.syncBlockingReadSocket(timeoutTime, tid);
        }
        catch (ConnectionDroppedException e) {
            this.closeSocket();
            throw e;
        }
        catch (Throwable e) {
            Jvm.warn().on(((Object)((Object)this)).getClass(), e);
            this.closeSocket();
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeSocket1(@NotNull WireOut outWire, @Nullable SocketChannel clientChannel) throws IOException {
        if (LOG.isDebugEnabled()) {
            Jvm.debug().on(((Object)((Object)this)).getClass(), "sending :" + Wires.fromSizePrefixedBlobs((WireIn)((Wire)outWire)));
        }
        if (clientChannel == null) {
            LOG.info("Connection Dropped");
            throw new ConnectionDroppedException("Connection Dropped");
        }
        assert (this.outBytesLock.isHeldByCurrentThread());
        long start = Time.currentTimeMillis();
        assert (outWire.startUse());
        try {
            @NotNull Bytes bytes = outWire.bytes();
            @Nullable ByteBuffer outBuffer = (ByteBuffer)bytes.underlyingObject();
            outBuffer.limit((int)bytes.writePosition());
            outBuffer.position(0);
            assert (this.outBytesLock().isHeldByCurrentThread());
            boolean isOutBufferFull = false;
            this.logToStandardOutMessageSent(outWire, outBuffer);
            this.updateLargestChunkSoFarSize(outBuffer);
            try {
                int prevRemaining = outBuffer.remaining();
                while (outBuffer.remaining() > 0) {
                    if (clientChannel != this.clientChannel) {
                        throw new ConnectionDroppedException("Connection has Changed");
                    }
                    int len = clientChannel.write(outBuffer);
                    if (len == -1) {
                        throw new IORuntimeException("Disconnection to server=" + this.socketAddressSupplier + ", name=" + this.name);
                    }
                    if (LOG.isDebugEnabled()) {
                        Jvm.debug().on(((Object)((Object)this)).getClass(), "W:" + len + ",socket=" + this.socketAddressSupplier.get());
                    }
                    if (prevRemaining != outBuffer.remaining()) {
                        start = Time.currentTimeMillis();
                        isOutBufferFull = false;
                        prevRemaining = outBuffer.remaining();
                        @NotNull TcpSocketConsumer tcpSocketConsumer = this.tcpSocketConsumer;
                        if (tcpSocketConsumer == null) continue;
                        this.tcpSocketConsumer.lastTimeMessageReceivedOrSent = start;
                        continue;
                    }
                    if (!isOutBufferFull && Jvm.isDebug() && LOG.isDebugEnabled()) {
                        Jvm.debug().on(((Object)((Object)this)).getClass(), "----> TCP write buffer is FULL! " + outBuffer.remaining() + " bytes remaining.");
                    }
                    isOutBufferFull = true;
                    long writeTime = Time.currentTimeMillis() - start;
                    if (writeTime > TimeUnit.MINUTES.toMillis(15L)) {
                        for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
                            Thread thread = entry.getKey();
                            if (thread.getThreadGroup().getName().equals("system")) continue;
                            @NotNull StringBuilder sb = new StringBuilder();
                            sb.append("\n========= THREAD DUMP =========\n");
                            sb.append(thread).append(" ").append((Object)thread.getState());
                            Jvm.trimStackTrace((StringBuilder)sb, (StackTraceElement[])entry.getValue());
                            sb.append("\n");
                            Jvm.warn().on(((Object)((Object)this)).getClass(), sb.toString());
                        }
                        this.closeSocket();
                        throw new IORuntimeException("Took " + writeTime + " ms to perform a write, remaining= " + outBuffer.remaining());
                    }
                    Thread.yield();
                }
            }
            catch (IOException e) {
                this.closeSocket();
                throw e;
            }
            outBuffer.clear();
            bytes.clear();
        }
        finally {
            assert (outWire.endUse());
        }
    }

    private void logToStandardOutMessageSent(@NotNull WireOut wire, @NotNull ByteBuffer outBuffer) {
        if (!YamlLogging.showClientWrites()) {
            return;
        }
        @NotNull Bytes bytes = wire.bytes();
        try {
            if (bytes.readRemaining() > 0L) {
                LOG.info((!YamlLogging.title.isEmpty() ? "### " + YamlLogging.title + "\n" : "") + "" + YamlLogging.writeMessage() + (YamlLogging.writeMessage().isEmpty() ? "" : "\n\n") + "sends:\n\n```yaml\n" + Wires.fromSizePrefixedBlobs((Bytes)bytes) + "```");
            }
            YamlLogging.title = "";
            YamlLogging.writeMessage((String)"");
        }
        catch (Exception e) {
            Jvm.warn().on(((Object)((Object)this)).getClass(), Bytes.toString((Bytes)bytes), (Throwable)e);
        }
    }

    private void updateLargestChunkSoFarSize(@NotNull ByteBuffer outBuffer) {
        int sizeOfThisChunk = (int)((long)outBuffer.limit() - this.limitOfLast);
        if (this.largestChunkSoFar < (long)sizeOfThisChunk) {
            this.largestChunkSoFar = sizeOfThisChunk;
        }
        this.limitOfLast = outBuffer.limit();
    }

    public Wire outWire() {
        assert (this.outBytesLock().isHeldByCurrentThread());
        return this.outWire;
    }

    public boolean isOutBytesLocked() {
        return this.outBytesLock.isLocked();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reflectServerHeartbeatMessage(@NotNull ValueIn valueIn) {
        if (!this.outBytesLock().tryLock()) {
            if (Jvm.isDebug() && LOG.isDebugEnabled()) {
                Jvm.debug().on(((Object)((Object)this)).getClass(), "skipped sending back heartbeat, because lock is held !" + this.outBytesLock);
            }
            return;
        }
        try {
            long timestamp = valueIn.int64();
            this.writeMetaDataForKnownTID(0L, this.outWire, null, 0L);
            this.outWire.writeDocument(false, w -> w.writeEventName((WireKey)EventId.heartbeatReply).int64(timestamp));
            this.writeSocket((WireOut)this.outWire(), false);
        }
        finally {
            this.outBytesLock().unlock();
            assert (!this.outBytesLock.isHeldByCurrentThread());
        }
    }

    public long writeMetaDataStartTime(long startTime, @NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        long tid = this.nextUniqueTransaction(startTime);
        this.writeMetaDataForKnownTID(tid, wire, csp, cid);
        return tid;
    }

    public void writeMetaDataForKnownTID(long tid, @NotNull Wire wire, @Nullable String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        try (DocumentContext dc = wire.writingDocument(true);){
            if (cid == 0L) {
                dc.wire().writeEventName((WireKey)CoreFields.csp).text(csp);
            } else {
                dc.wire().writeEventName((WireKey)CoreFields.cid).int64(cid);
            }
            dc.wire().writeEventName((WireKey)CoreFields.tid).int64(tid);
        }
    }

    public void writeAsyncHeader(@NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        wire.writeDocument(true, wireOut -> {
            if (cid == 0L) {
                wireOut.writeEventName((WireKey)CoreFields.csp).text(csp);
            } else {
                wireOut.writeEventName((WireKey)CoreFields.cid).int64(cid);
            }
        });
    }

    public boolean lock(@NotNull Task r) {
        return this.lock(r, TryLock.LOCK);
    }

    private boolean lock(@NotNull Task r, @NotNull TryLock tryLock) {
        return this.lock2(r, false, tryLock);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean lock2(@NotNull Task r, boolean reconnectOnFailure, @NotNull TryLock tryLock) {
        assert (!this.outBytesLock.isHeldByCurrentThread());
        try {
            if (this.clientChannel == null && !reconnectOnFailure) {
                boolean bl = TryLock.LOCK != tryLock;
                return bl;
            }
            @NotNull ReentrantLock lock = this.outBytesLock();
            if (TryLock.LOCK == tryLock) {
                try {
                    lock.lock();
                }
                catch (Throwable e) {
                    lock.unlock();
                    throw e;
                }
            } else if (!lock.tryLock()) {
                if (tryLock.equals((Object)TryLock.TRY_LOCK_WARN) && Jvm.isDebugEnabled(((Object)((Object)this)).getClass())) {
                    Jvm.debug().on(((Object)((Object)this)).getClass(), "FAILED TO OBTAIN LOCK thread=" + Thread.currentThread() + " on " + lock, (Throwable)new IllegalStateException());
                }
                boolean e = false;
                return e;
            }
            try {
                if (this.clientChannel == null && reconnectOnFailure) {
                    this.checkConnection();
                }
                r.run();
                assert (TcpChannelHub.checkWritesOnReadThread(this.tcpSocketConsumer));
                this.writeSocket((WireOut)this.outWire(), reconnectOnFailure);
            }
            catch (ConnectionDroppedException e) {
                if (Jvm.isDebug()) {
                    Jvm.debug().on(((Object)((Object)this)).getClass(), (Throwable)e);
                }
                throw e;
            }
            catch (Exception e) {
                Jvm.warn().on(((Object)((Object)this)).getClass(), (Throwable)e);
                throw e;
            }
            finally {
                lock.unlock();
            }
            boolean bl = true;
            return bl;
        }
        finally {
            assert (!this.outBytesLock.isHeldByCurrentThread());
        }
    }

    public void checkConnection() {
        long start = Time.currentTimeMillis();
        while (this.clientChannel == null) {
            this.tcpSocketConsumer.checkNotShutdown();
            if (start + this.timeoutMs > Time.currentTimeMillis()) {
                try {
                    this.condition.await(1L, TimeUnit.MILLISECONDS);
                    continue;
                }
                catch (InterruptedException e) {
                    throw new IORuntimeException("Interrupted");
                }
            }
            throw new IORuntimeException("Not connected to " + this.socketAddressSupplier);
        }
        if (this.clientChannel == null) {
            throw new IORuntimeException("Not connected to " + this.socketAddressSupplier);
        }
    }

    public void forceDisconnect() {
        Closeable.closeQuietly((Object)this.clientChannel);
    }

    public boolean isOutBytesEmpty() {
        return this.outWire.bytes().readRemaining() == 0L;
    }

    static {
        boolean x = false;
        if (!$assertionsDisabled) {
            x = true;
            if (!true) {
                throw new AssertionError();
            }
        }
        hasAssert = x;
        TCP_BUFFER = TcpChannelHub.getTcpBufferSize();
        SAFE_TCP_SIZE = TCP_BUFFER * 3 / 4;
        HEATBEAT_PING_PERIOD = Integer.getInteger("heartbeat.ping.period", Jvm.isDebug() ? 30000 : 5000);
        HEATBEAT_TIMEOUT_PERIOD = Integer.getInteger("heartbeat.timeout", Jvm.isDebug() ? 120000 : 15000);
        hubs = new CopyOnWriteArraySet<TcpChannelHub>();
        LOG = LoggerFactory.getLogger(TcpChannelHub.class);
    }

    private final class TcpSocketConsumer
    implements EventHandler {
        private static final int TIME_OUT_MS = 3000;
        @NotNull
        private final TLongObjectMap<Object> map;
        private final TLongObjectMap<Object> omap = TcpChannelHub.access$600() ? TCollections.synchronizedMap((TLongObjectMap)new TLongObjectHashMap(8)) : null;
        @NotNull
        private final ExecutorService service;
        @NotNull
        private final ThreadLocal<Wire> syncInWireThreadLocal = ThreadLocal.withInitial(this::createWire);
        long lastheartbeatSentTime = 0L;
        volatile long start = Long.MAX_VALUE;
        private long tid;
        private final Bytes serverHeartBeatHandler = Bytes.elasticByteBuffer();
        private volatile long lastTimeMessageReceivedOrSent = Time.currentTimeMillis();
        private volatile boolean isShutdown;
        @Nullable
        private volatile Throwable shutdownHere = null;
        private volatile boolean prepareToShutdown;
        private volatile Thread readThread;

        TcpSocketConsumer() {
            if (LOG.isDebugEnabled()) {
                Jvm.debug().on(this.getClass(), "constructor remoteAddress=" + TcpChannelHub.this.socketAddressSupplier);
            }
            this.service = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory(this.threadName(), Boolean.valueOf(true)));
            TLongObjectHashMap longMap = new TLongObjectHashMap(32);
            longMap.setAutoCompactionFactor(0.0f);
            this.map = TCollections.synchronizedMap((TLongObjectMap)longMap);
            this.start();
        }

        private void onReconnect() {
            TcpChannelHub.this.preventSubscribeUponReconnect.forEach(this::unsubscribe);
            this.map.forEachValue(v -> {
                if (v instanceof AsyncSubscription && !(v instanceof AsyncTemporarySubscription)) {
                    ((AsyncSubscription)v).applySubscribe();
                }
                return true;
            });
        }

        void onConnectionClosed() {
            this.map.forEachValue(v -> {
                Object object;
                if (v instanceof Bytes) {
                    object = v;
                    synchronized (object) {
                        v.notifyAll();
                    }
                }
                if (v instanceof AsyncSubscription) {
                    ((AsyncSubscription)v).onClose();
                } else if (v instanceof Bytes) {
                    object = v;
                    synchronized (object) {
                        v.notifyAll();
                    }
                }
                return true;
            });
        }

        @NotNull
        public HandlerPriority priority() {
            return TcpChannelHub.this.priority;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Wire syncBlockingReadSocket(long timeoutTimeMs, long tid) throws TimeoutException, ConnectionDroppedException {
            long start = Time.currentTimeMillis();
            Wire wire = this.syncInWireThreadLocal.get();
            assert (BytesUtil.unregister((Bytes)wire.bytes()));
            wire.clear();
            @NotNull Bytes bytes = wire.bytes();
            ((ByteBuffer)bytes.underlyingObject()).clear();
            Bytes bytes2 = bytes;
            synchronized (bytes2) {
                if (LOG.isDebugEnabled()) {
                    Jvm.debug().on(this.getClass(), "tid=" + tid + " of client request");
                }
                bytes.clear();
                this.registerSubscribe(tid, bytes);
                long end = start + timeoutTimeMs;
                try {
                    long delay;
                    while ((delay = end - System.currentTimeMillis()) > 0L) {
                        bytes.wait(delay);
                        if (TcpChannelHub.this.clientChannel == null) {
                            throw new ConnectionDroppedException("Connection Closed : the connection to the server has been dropped.");
                        }
                        if (bytes.readLimit() == 0L && !this.isShutdown) continue;
                        break;
                    }
                }
                catch (InterruptedException ie) {
                    @NotNull TimeoutException te = new TimeoutException();
                    te.initCause(ie);
                    throw te;
                }
            }
            TcpChannelHub.logToStandardOutMessageReceived(wire);
            if (Time.currentTimeMillis() - start >= timeoutTimeMs) {
                throw new TimeoutException("timeoutTimeMs=" + timeoutTimeMs);
            }
            return wire;
        }

        private void registerSubscribe(long tid, Object bytes) {
            TcpChannelHub.this.outBytesLock().isHeldByCurrentThread();
            Object prev = this.map.put(tid, bytes);
            assert (prev == null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void subscribe(@NotNull AsyncSubscription asyncSubscription, boolean tryLock) {
            TcpSocketConsumer tcpSocketConsumer = this;
            synchronized (tcpSocketConsumer) {
                if (TcpChannelHub.this.clientChannel == null) {
                    TcpChannelHub.this.outBytesLock().isHeldByCurrentThread();
                    this.registerSubscribe(asyncSubscription.tid(), asyncSubscription);
                    if (LOG.isDebugEnabled()) {
                        Jvm.debug().on(this.getClass(), "deferred subscription tid=" + asyncSubscription.tid() + ",asyncSubscription=" + asyncSubscription);
                    }
                    return;
                }
            }
            @NotNull ReentrantLock lock = TcpChannelHub.this.outBytesLock();
            if (tryLock) {
                if (!lock.tryLock()) {
                    return;
                }
            } else {
                try {
                    if (!lock.tryLock()) {
                        while (!lock.tryLock(1L, TimeUnit.SECONDS)) {
                            if (this.isShuttingdown()) {
                                throw new IllegalStateException("Shutting down");
                            }
                            LOG.info("Waiting for lock " + Jvm.lockWithStack((ReentrantLock)lock));
                        }
                    }
                }
                catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }
            try {
                this.registerSubscribe(asyncSubscription.tid(), asyncSubscription);
                asyncSubscription.applySubscribe();
            }
            catch (Exception e) {
                Jvm.warn().on(this.getClass(), (Throwable)e);
            }
            finally {
                lock.unlock();
            }
        }

        public void unsubscribe(long tid) {
            this.map.remove(tid);
        }

        private void start() {
            this.checkNotShutdown();
            assert (this.shutdownHere == null);
            assert (!this.isShutdown);
            this.service.submit(() -> {
                block4: {
                    this.readThread = Thread.currentThread();
                    try {
                        this.running();
                    }
                    catch (ConnectionDroppedException e) {
                        if (Jvm.isDebug() && !this.prepareToShutdown) {
                            Jvm.debug().on(this.getClass(), (Throwable)e);
                        }
                    }
                    catch (Throwable e) {
                        if (this.prepareToShutdown) break block4;
                        Jvm.warn().on(this.getClass(), e);
                    }
                }
            });
            this.service.submit(() -> {
                int count = 0;
                @Nullable String lastMsg = null;
                while (!this.isShuttingdown()) {
                    long delay;
                    Jvm.pause((long)50L);
                    if (count++ < 40 || (delay = System.currentTimeMillis() - this.start) < 3150L) continue;
                    StringBuilder sb = new StringBuilder().append(this.readThread).append(" at ").append(delay).append(" ms");
                    Jvm.trimStackTrace((StringBuilder)sb, (StackTraceElement[])this.readThread.getStackTrace());
                    @NotNull String msg = sb.toString();
                    if (msg.contains("sun.nio.ch.SocketChannelImpl.read")) continue;
                    if (delay < 20L) {
                        lastMsg = msg;
                        continue;
                    }
                    if (lastMsg != null) {
                        LOG.info(lastMsg);
                    }
                    LOG.info(msg);
                    lastMsg = null;
                }
            });
        }

        public void checkNotShutdown() {
            if (this.isShutdown) {
                throw new IORuntimeException("Called after shutdown", this.shutdownHere);
            }
        }

        @NotNull
        private String threadName() {
            return "TcpChannelHub-Reads-" + TcpChannelHub.this.name + "-" + TcpChannelHub.this.socketAddressSupplier;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void running() {
            Wire inWire = (Wire)TcpChannelHub.this.wireType.apply((Object)Bytes.elasticByteBuffer());
            try {
                assert (inWire != null);
                assert (inWire.startUse());
                while (!this.isShuttingdown()) {
                    this.checkConnectionState();
                    try {
                        @NotNull Bytes bytes = inWire.bytes();
                        this.blockingRead((WireIn)inWire, 4L);
                        int header = bytes.readVolatileInt(0L);
                        long messageSize = this.size(header);
                        this.start = System.currentTimeMillis();
                        if (Wires.isData((int)header)) {
                            assert (messageSize < Integer.MAX_VALUE);
                            boolean clearTid = this.processData(this.tid, Wires.isReady((int)header), header, (int)messageSize, inWire);
                            long timeTaken = System.currentTimeMillis() - this.start;
                            this.start = Long.MAX_VALUE;
                            if (timeTaken > 20L) {
                                LOG.info("Processing data=" + timeTaken + "ms");
                            }
                            if (!clearTid) continue;
                            this.tid = -1L;
                            continue;
                        }
                        this.blockingRead((WireIn)inWire, messageSize);
                        TcpChannelHub.logToStandardOutMessageReceived(inWire);
                        this.tid = -1L;
                        inWire.readDocument(this::tidReader, null);
                    }
                    catch (Exception e) {
                        this.start = Long.MAX_VALUE;
                        if (Jvm.isDebug() && LOG.isDebugEnabled()) {
                            Jvm.debug().on(this.getClass(), (Throwable)e);
                        }
                        this.tid = -1L;
                        if (!this.isShuttingdown()) {
                            String message = e.getMessage();
                            if (e instanceof ConnectionDroppedException) {
                                if (Jvm.isDebugEnabled(this.getClass())) {
                                    Jvm.debug().on(this.getClass(), "reconnecting due to dropped connection " + (message == null ? "" : message));
                                }
                            } else if (e instanceof IOException && "Connection reset by peer".equals(message)) {
                                Jvm.warn().on(this.getClass(), "reconnecting due to \"Connection reset by peer\" " + message);
                            } else {
                                Jvm.warn().on(this.getClass(), "reconnecting due to unexpected exception", (Throwable)e);
                            }
                            TcpChannelHub.this.closeSocket();
                            long pauseMs = TcpChannelHub.this.connectionStrategy == null ? 500L : TcpChannelHub.this.connectionStrategy.pauseMillisBeforeReconnect();
                            Jvm.pause((long)pauseMs);
                            continue;
                        }
                        break;
                    }
                    finally {
                        this.start = Long.MAX_VALUE;
                        TcpChannelHub.this.clear(inWire);
                    }
                }
            }
            catch (Throwable e) {
                if (!this.isShuttingdown()) {
                    Jvm.warn().on(this.getClass(), e);
                }
            }
            finally {
                inWire.bytes().release();
                TcpChannelHub.this.closeSocket();
            }
        }

        boolean isShutdown() {
            return this.isShutdown;
        }

        boolean isShuttingdown() {
            return this.isShutdown || this.prepareToShutdown;
        }

        private long size(int header) {
            long messageSize = Wires.lengthOf((int)header);
            assert (messageSize > 0L) : "Invalid message size " + messageSize;
            assert (messageSize < 0x40000000L) : "Invalid message size " + messageSize;
            return messageSize;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean processData(long tid, boolean isReady, int header, int messageSize, @NotNull Wire inWire) throws IOException {
            Object o;
            boolean isLastMessageForThisTid;
            block21: {
                assert (tid != -1L);
                isLastMessageForThisTid = false;
                long startTime = 0L;
                o = null;
                if (tid == 0L) {
                    this.processServerSystemMessage(header, messageSize);
                    return false;
                }
                @Nullable SocketChannel c = TcpChannelHub.this.clientChannel;
                if (c == null) {
                    return false;
                }
                while (!this.isShuttingdown() && c.isOpen()) {
                    o = this.map.get(tid);
                    if (o != null) {
                        if (!isReady || !(o instanceof Bytes) && !(o instanceof AsyncTemporarySubscription)) break;
                        if (hasAssert) {
                            this.omap.put(tid, this.map.remove(tid));
                        } else {
                            this.map.remove(tid);
                        }
                        isLastMessageForThisTid = true;
                        break;
                    }
                    if (hasAssert && (o = this.omap.get(tid)) != null) {
                        this.blockingRead((WireIn)inWire, messageSize);
                        TcpChannelHub.logToStandardOutMessageReceivedInERROR(inWire);
                        throw new AssertionError((Object)("Found tid=" + tid + " in the old map."));
                    }
                    if (startTime == 0L) {
                        startTime = Time.currentTimeMillis();
                    } else {
                        Jvm.pause((long)1L);
                    }
                    if (Time.currentTimeMillis() - startTime <= 3000L) continue;
                    this.blockingRead((WireIn)inWire, messageSize);
                    TcpChannelHub.logToStandardOutMessageReceived(inWire);
                    if (Jvm.isDebugEnabled(this.getClass())) {
                        Jvm.debug().on(this.getClass(), "unable to respond to tid=" + tid + ", given that we have received a message we a tid which is unknown, this can occur sometime if the subscription has just become unregistered ( an the server has not yet processed the unregister event ) ");
                    }
                    return isLastMessageForThisTid;
                }
                if (o == null) {
                    return isLastMessageForThisTid;
                }
                if (o instanceof AsyncSubscription) {
                    this.blockingRead((WireIn)inWire, messageSize);
                    TcpChannelHub.logToStandardOutMessageReceived(inWire);
                    @NotNull AsyncSubscription asyncSubscription = (AsyncSubscription)o;
                    try {
                        asyncSubscription.onConsumer((WireIn)inWire);
                    }
                    catch (Exception e) {
                        if (LOG.isDebugEnabled()) {
                            Jvm.debug().on(this.getClass(), "Removing " + tid + " " + o, (Throwable)e);
                        }
                        if (!hasAssert) break block21;
                        this.omap.remove(tid);
                    }
                }
            }
            if (o instanceof Bytes) {
                Bytes bytes;
                Bytes bytes2 = bytes = (Bytes)o;
                synchronized (bytes2) {
                    bytes.clear();
                    bytes.ensureCapacity((long)(4 + messageSize));
                    @Nullable ByteBuffer byteBuffer = (ByteBuffer)bytes.underlyingObject();
                    byteBuffer.clear();
                    bytes.writeInt(0L, header);
                    byteBuffer.position(4);
                    byteBuffer.limit(4 + messageSize);
                    this.readBuffer(byteBuffer);
                    bytes.readLimit((long)byteBuffer.position());
                    bytes.notifyAll();
                }
                if (hasAssert) {
                    this.omap.remove(tid);
                }
            }
            return isLastMessageForThisTid;
        }

        private void processServerSystemMessage(int header, int messageSize) throws IOException {
            this.serverHeartBeatHandler.clear();
            Bytes bytes = this.serverHeartBeatHandler;
            bytes.clear();
            @NotNull ByteBuffer byteBuffer = (ByteBuffer)bytes.underlyingObject();
            byteBuffer.clear();
            bytes.writeInt(0L, header);
            byteBuffer.position(4);
            byteBuffer.limit(4 + messageSize);
            this.readBuffer(byteBuffer);
            bytes.readLimit((long)byteBuffer.position());
            StringBuilder eventName = Wires.acquireStringBuilder();
            Wire inWire = (Wire)TcpChannelHub.this.wireType.apply((Object)bytes);
            if (YamlLogging.showHeartBeats()) {
                TcpChannelHub.logToStandardOutMessageReceived(inWire);
            }
            inWire.readDocument(null, d -> {
                @NotNull ValueIn valueIn = d.readEventName(eventName);
                if (EventId.heartbeat.contentEquals(eventName)) {
                    TcpChannelHub.this.reflectServerHeartbeatMessage(valueIn);
                } else if (EventId.onClosingReply.contentEquals(eventName)) {
                    TcpChannelHub.this.receivedClosedAcknowledgement.countDown();
                }
            });
        }

        private void blockingRead(@NotNull WireIn wire, long numberOfBytes) throws IOException {
            @NotNull Bytes bytes = wire.bytes();
            bytes.ensureCapacity(bytes.writePosition() + numberOfBytes);
            @NotNull ByteBuffer buffer = (ByteBuffer)bytes.underlyingObject();
            int start = (int)bytes.writePosition();
            buffer.position(start);
            buffer.limit((int)((long)start + numberOfBytes));
            this.readBuffer(buffer);
            bytes.readLimit((long)buffer.position());
        }

        private void readBuffer(@NotNull ByteBuffer buffer) throws IOException {
            boolean emptyRead = true;
            while (buffer.remaining() > 0) {
                @Nullable SocketChannel clientChannel = TcpChannelHub.this.clientChannel;
                if (clientChannel == null) {
                    throw new IOException("Disconnection to server=" + TcpChannelHub.this.socketAddressSupplier + " channel is closed, name=" + TcpChannelHub.this.name);
                }
                int numberOfBytesRead = clientChannel.read(buffer);
                if (numberOfBytesRead == 0 && Thread.currentThread().isInterrupted()) {
                    this.isShutdown = true;
                }
                WanSimulator.dataRead(numberOfBytesRead);
                if (numberOfBytesRead == -1) {
                    throw new ConnectionDroppedException("Disconnection to server=" + TcpChannelHub.this.socketAddressSupplier + " read=-1 , name=" + TcpChannelHub.this.name);
                }
                if (numberOfBytesRead > 0) {
                    this.onMessageReceived();
                    emptyRead = false;
                    if (LOG.isDebugEnabled()) {
                        Jvm.debug().on(this.getClass(), "R:" + numberOfBytesRead + ",socket=" + TcpChannelHub.this.socketAddressSupplier.get());
                    }
                    TcpChannelHub.this.pauser.reset();
                } else if (numberOfBytesRead == 0 && TcpChannelHub.this.isOpen()) {
                    long millisecondsSinceLastMessageReceived = System.currentTimeMillis() - this.lastTimeMessageReceivedOrSent;
                    if (millisecondsSinceLastMessageReceived - (long)HEATBEAT_TIMEOUT_PERIOD > 0L) {
                        throw new IOException("reconnecting due to heartbeat failure, time since last message=" + millisecondsSinceLastMessageReceived + "ms dropping connection to " + TcpChannelHub.this.socketAddressSupplier);
                    }
                    if (emptyRead) {
                        this.start = Long.MAX_VALUE;
                    }
                    TcpChannelHub.this.pauser.pause();
                    if (this.start == Long.MAX_VALUE) {
                        this.start = System.currentTimeMillis();
                    }
                } else {
                    throw new ConnectionDroppedException(TcpChannelHub.this.name + " is shutdown, was connected to " + TcpChannelHub.this.socketAddressSupplier);
                }
                if (this.isShutdown) {
                    throw new ConnectionDroppedException(TcpChannelHub.this.name + " is shutdown, was connected to " + TcpChannelHub.this.socketAddressSupplier);
                }
                if (this.lastTimeMessageReceivedOrSent + 60000L >= System.currentTimeMillis()) continue;
                for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
                    Thread thread = entry.getKey();
                    if (thread == null || thread.getThreadGroup() == null || thread.getThreadGroup().getName() == null || thread.getThreadGroup().getName().equals("system")) continue;
                    @NotNull StringBuilder sb = new StringBuilder();
                    sb.append(thread).append(" ").append((Object)thread.getState());
                    Jvm.trimStackTrace((StringBuilder)sb, (StackTraceElement[])entry.getValue());
                    sb.append("\n");
                    Jvm.warn().on(this.getClass(), "\n========= THREAD DUMP =========\n" + sb);
                }
                throw new ConnectionDroppedException(TcpChannelHub.this.name + " the client is failing to get the data from the server, so we are going to drop the connection and reconnect.");
            }
        }

        private void onMessageReceived() {
            this.lastTimeMessageReceivedOrSent = Time.currentTimeMillis();
        }

        private void sendHeartbeat() {
            TcpChannelHub.this.lock(this::sendHeartbeat0, TryLock.TRY_LOCK_IGNORE);
        }

        private void sendHeartbeat0() {
            assert (TcpChannelHub.this.outWire.startUse());
            try {
                if (TcpChannelHub.this.outWire.bytes().writePosition() > 100L) {
                    return;
                }
                final long l = System.nanoTime();
                this.subscribe(new AbstractAsyncTemporarySubscription(TcpChannelHub.this, null, TcpChannelHub.this.name){

                    @Override
                    public void onSubscribe(@NotNull WireOut wireOut) {
                        if (Jvm.isDebug()) {
                            LOG.info("sending heartbeat");
                        }
                        wireOut.writeEventName((WireKey)EventId.heartbeat).int64(Time.currentTimeMillis());
                    }

                    @Override
                    public void onConsumer(@NotNull WireIn inWire) {
                        long roundTipTimeMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - l);
                        if (LOG.isDebugEnabled()) {
                            Jvm.debug().on(this.getClass(), "heartbeat round trip time=" + roundTipTimeMicros + " server=" + TcpChannelHub.this.socketAddressSupplier);
                        }
                        inWire.clear();
                    }
                }, true);
                this.readThread.setName(this.threadName());
            }
            finally {
                assert (TcpChannelHub.this.outWire.endUse());
            }
        }

        void stop() {
            if (this.isShutdown) {
                return;
            }
            if (this.shutdownHere == null) {
                this.shutdownHere = new StackTrace(Thread.currentThread() + " Shutdown here");
            }
            Threads.shutdown((ExecutorService)this.service);
            this.serverHeartBeatHandler.release();
            this.isShutdown = true;
        }

        public boolean action() throws InvalidEventHandlerException {
            if (TcpChannelHub.this.clientChannel == null) {
                throw new InvalidEventHandlerException();
            }
            long currentTime = Time.currentTimeMillis();
            long millisecondsSinceLastMessageReceived = currentTime - this.lastTimeMessageReceivedOrSent;
            long millisecondsSinceLastHeatbeatSend = currentTime - this.lastheartbeatSentTime;
            if (millisecondsSinceLastMessageReceived >= (long)HEATBEAT_PING_PERIOD && millisecondsSinceLastHeatbeatSend >= (long)HEATBEAT_PING_PERIOD) {
                this.lastheartbeatSentTime = Time.currentTimeMillis();
                this.sendHeartbeat();
            }
            return true;
        }

        private void checkConnectionState() {
            if (TcpChannelHub.this.clientChannel != null) {
                return;
            }
            this.attemptConnect();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void attemptConnect() {
            this.keepSubscriptionsAndClearEverythingElse();
            TcpChannelHub.this.socketAddressSupplier.resetToPrimary();
            int i = 0;
            while (true) {
                block17: {
                    this.checkNotShutdown();
                    if (LOG.isDebugEnabled()) {
                        Jvm.debug().on(this.getClass(), "attemptConnect remoteAddress=" + TcpChannelHub.this.socketAddressSupplier);
                    } else if (i >= TcpChannelHub.this.socketAddressSupplier.all().size() && !this.isShuttingdown()) {
                        LOG.info("attemptConnect remoteAddress=" + TcpChannelHub.this.socketAddressSupplier);
                    }
                    @Nullable SocketChannel socketChannel = null;
                    try {
                        if (this.isShuttingdown()) break block17;
                        socketChannel = TcpChannelHub.this.connectionStrategy.connect(TcpChannelHub.this.name, TcpChannelHub.this.socketAddressSupplier, false, TcpChannelHub.this.clientConnectionMonitor);
                        if (this.isShuttingdown()) break block17;
                        if (socketChannel == null) {
                            Jvm.pause((long)1000L);
                            break block17;
                        }
                        if (!TcpChannelHub.this.outBytesLock().tryLock(20L, TimeUnit.SECONDS)) {
                            throw new IORuntimeException("failed to obtain the outBytesLock " + TcpChannelHub.this.outBytesLock);
                        }
                        try {
                            TcpChannelHub.this.clear(TcpChannelHub.this.outWire);
                            this.onMessageReceived();
                            TcpSocketConsumer tcpSocketConsumer = this;
                            synchronized (tcpSocketConsumer) {
                                LOG.info("connected to " + socketChannel);
                                TcpChannelHub.this.clientChannel = socketChannel;
                            }
                            TcpChannelHub.this.doHandShaking(socketChannel);
                            TcpChannelHub.this.eventLoop.addHandler((EventHandler)this);
                            if (LOG.isDebugEnabled()) {
                                Jvm.debug().on(this.getClass(), "successfully connected to remoteAddress=" + TcpChannelHub.this.socketAddressSupplier);
                            }
                            this.onReconnect();
                            TcpChannelHub.this.condition.signalAll();
                            TcpChannelHub.this.onConnected();
                        }
                        finally {
                            TcpChannelHub.this.outBytesLock().unlock();
                            assert (!TcpChannelHub.this.outBytesLock.isHeldByCurrentThread());
                        }
                        return;
                    }
                    catch (Exception e) {
                        if (this.isShutdown || this.prepareToShutdown) {
                            TcpChannelHub.this.closeSocket();
                            throw new IORuntimeException("shutting down");
                        }
                        Jvm.warn().on(this.getClass(), "failed to connect remoteAddress=" + TcpChannelHub.this.socketAddressSupplier + " so will reconnect ", (Throwable)e);
                        TcpChannelHub.this.closeSocket();
                        Jvm.pause((long)1000L);
                    }
                }
                ++i;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void keepSubscriptionsAndClearEverythingElse() {
            this.tid = 0L;
            if (hasAssert) {
                this.omap.clear();
            }
            TLongObjectMap<Object> tLongObjectMap = this.map;
            synchronized (tLongObjectMap) {
                HashSet keys = new HashSet();
                this.map.keySet().forEach(keys::add);
                keys.forEach(k -> {
                    Object o = this.map.get(k.longValue());
                    if (o instanceof Bytes || o instanceof AsyncTemporarySubscription) {
                        this.map.remove(k.longValue());
                    }
                });
            }
        }

        void prepareToShutdown() {
            this.prepareToShutdown = true;
            try {
                this.service.awaitTermination(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.service.shutdown();
        }

        private void tidReader(WireIn w) {
            this.tid = CoreFields.tid(w);
        }

        private Wire createWire() {
            Wire wire = (Wire)TcpChannelHub.this.wireType.apply((Object)Bytes.elasticByteBuffer());
            assert (wire.startUse());
            return wire;
        }
    }

    @FunctionalInterface
    public static interface Task {
        public void run();
    }
}

