/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.network.connection;

import java.io.IOException;
import java.net.ConnectException;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.ConnectionDroppedException;
import net.openhft.chronicle.bytes.IORuntimeException;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.util.Time;
import net.openhft.chronicle.network.WanSimulator;
import net.openhft.chronicle.network.api.session.SessionDetails;
import net.openhft.chronicle.network.api.session.SessionProvider;
import net.openhft.chronicle.network.connection.AbstractAsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.AsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.EventId;
import net.openhft.chronicle.network.connection.SocketAddressSupplier;
import net.openhft.chronicle.threads.HandlerPriority;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.threads.api.EventHandler;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.YamlLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TcpChannelHub
implements Closeable {
    public static final int HEATBEAT_PING_PERIOD = Integer.getInteger("heartbeat.ping.period", 5000);
    public static final int HEATBEAT_TIMEOUT_PERIOD = Integer.getInteger("heartbeat.timeout", 20000);
    public static final int SIZE_OF_SIZE = 4;
    public static final Set<TcpChannelHub> hubs = new CopyOnWriteArraySet<TcpChannelHub>();
    private static final Logger LOG = LoggerFactory.getLogger(TcpChannelHub.class);
    public final long timeoutMs;
    @NotNull
    protected final String name;
    protected final int tcpBufferSize;
    final Wire outWire;
    final Wire inWire;
    @NotNull
    private final SocketAddressSupplier socketAddressSupplier;
    private final Set<Long> preventSubscribeUponReconnect = new ConcurrentSkipListSet<Long>();
    private final ReentrantLock outBytesLock = new ReentrantLock();
    @NotNull
    private final AtomicLong transactionID = new AtomicLong(0L);
    @NotNull
    private final SessionProvider sessionProvider;
    @NotNull
    private final TcpSocketConsumer tcpSocketConsumer;
    @NotNull
    private final EventLoop eventLoop;
    @NotNull
    private final Function<Bytes, Wire> wire;
    private final Wire handShakingWire;
    private long largestChunkSoFar = 0L;
    @Nullable
    private volatile SocketChannel clientChannel;
    private volatile boolean closed;
    private CountDownLatch receivedClosedAcknowledgement = new CountDownLatch(1);
    private long limitOfLast = 0L;

    public TcpChannelHub(@NotNull SessionProvider sessionProvider, @NotNull EventLoop eventLoop, @NotNull Function<Bytes, Wire> wire, @NotNull String name, @NotNull SocketAddressSupplier socketAddressSupplier) {
        this.socketAddressSupplier = socketAddressSupplier;
        this.eventLoop = eventLoop;
        this.tcpBufferSize = Integer.getInteger("tcp.client.buffer.size", 0x200000);
        this.outWire = wire.apply(Bytes.elasticByteBuffer());
        this.inWire = wire.apply(Bytes.elasticByteBuffer());
        this.name = name;
        this.timeoutMs = Integer.getInteger("tcp.client.timeout", 10000).intValue();
        this.wire = wire;
        this.handShakingWire = wire.apply(Bytes.elasticByteBuffer());
        this.sessionProvider = sessionProvider;
        this.tcpSocketConsumer = new TcpSocketConsumer(wire);
        hubs.add(this);
    }

    public static void assertAllHubsClosed() {
        StringBuilder errors = new StringBuilder();
        for (TcpChannelHub h : hubs) {
            if (!h.isClosed()) {
                errors.append("Connection ").append(h).append(" still open\n");
            }
            h.close();
        }
        hubs.clear();
        if (errors.length() > 0) {
            throw new AssertionError((Object)errors.toString());
        }
    }

    public static void closeAllHubs() {
        for (TcpChannelHub h : hubs) {
            if (h.isClosed()) continue;
            LOG.warn("Closing " + h);
            h.close();
        }
        hubs.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void logToStandardOutMessageReceived(@NotNull Wire wire) {
        Bytes bytes = wire.bytes();
        if (!Jvm.isDebug() || !YamlLogging.clientReads) {
            return;
        }
        long position = bytes.writePosition();
        long limit = bytes.writeLimit();
        try {
            try {
                LOG.info("\nreceives:\n```yaml\n" + Wires.fromSizePrefixedBinaryToText((Bytes)bytes) + "```\n");
                YamlLogging.title = "";
                YamlLogging.writeMessage = "";
            }
            catch (Exception e) {
                LOG.error(Bytes.toString((Bytes)bytes), (Throwable)e);
            }
        }
        finally {
            bytes.writeLimit(limit);
            bytes.writePosition(position);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void logToStandardOutMessageReceivedInERROR(@NotNull Wire wire) {
        Bytes bytes = wire.bytes();
        long position = bytes.writePosition();
        long limit = bytes.writeLimit();
        try {
            try {
                LOG.info("\nreceives IN ERROR:\n```yaml\n" + Wires.fromSizePrefixedBinaryToText((Bytes)bytes) + "```\n");
                YamlLogging.title = "";
                YamlLogging.writeMessage = "";
            }
            catch (Exception e) {
                String x = Bytes.toString((Bytes)bytes);
                LOG.error(x, (Throwable)e);
            }
        }
        finally {
            bytes.writeLimit(limit);
            bytes.writePosition(position);
        }
    }

    private void clear(@NotNull Wire wire) {
        wire.clear();
        ((ByteBuffer)wire.bytes().underlyingObject()).clear();
    }

    @Nullable
    SocketChannel openSocketChannel() throws IOException {
        SocketChannel result = SocketChannel.open();
        Socket socket = result.socket();
        socket.setTcpNoDelay(true);
        socket.setReceiveBufferSize(this.tcpBufferSize);
        socket.setSendBufferSize(this.tcpBufferSize);
        return result;
    }

    public void preventSubscribeUponReconnect(long tid) {
        this.preventSubscribeUponReconnect.add(tid);
    }

    @NotNull
    public String toString() {
        return "TcpChannelHub{name=" + this.name + "remoteAddressSupplier=" + this.socketAddressSupplier + '}';
    }

    private void onDisconnected() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("disconnected to remoteAddress=" + this.socketAddressSupplier);
        }
        this.tcpSocketConsumer.onConnectionClosed();
    }

    private void onConnected() {
    }

    public void subscribe(@NotNull AsyncSubscription asyncSubscription) {
        this.subscribe(asyncSubscription, false);
    }

    public void subscribe(@NotNull AsyncSubscription asyncSubscription, boolean tryLock) {
        this.tcpSocketConsumer.subscribe(asyncSubscription, tryLock);
    }

    public void unsubscribe(long tid) {
        this.tcpSocketConsumer.unsubscribe(tid);
    }

    @NotNull
    public ReentrantLock outBytesLock() {
        return this.outBytesLock;
    }

    private synchronized void doHandShaking(@NotNull SocketChannel socketChannel) throws IOException {
        SessionDetails sessionDetails = this.sessionDetails();
        this.handShakingWire.clear();
        this.handShakingWire.bytes().clear();
        this.handShakingWire.writeDocument(false, wireOut -> {
            if (sessionDetails == null) {
                wireOut.writeEventName((WireKey)EventId.userid).text((CharSequence)System.getProperty("user.name"));
            } else {
                wireOut.writeEventName((WireKey)EventId.userid).text((CharSequence)sessionDetails.userId());
            }
        });
        this.writeSocket1((WireOut)this.handShakingWire, this.timeoutMs, socketChannel);
    }

    @Nullable
    private SessionDetails sessionDetails() {
        return this.sessionProvider.get();
    }

    protected synchronized void closeSocket() {
        SocketChannel clientChannel = this.clientChannel;
        if (clientChannel != null) {
            try {
                clientChannel.socket().shutdownInput();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                clientChannel.socket().shutdownOutput();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                clientChannel.socket().close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                clientChannel.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            this.clientChannel = null;
            this.clear(this.inWire);
            this.clear(this.outWire);
            TcpSocketConsumer tcpSocketConsumer = this.tcpSocketConsumer;
            tcpSocketConsumer.tid = 0L;
            tcpSocketConsumer.omap.clear();
            this.onDisconnected();
        }
    }

    public boolean isOpen() {
        return this.clientChannel != null;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.sendCloseMessage();
        this.closed = true;
        this.tcpSocketConsumer.stop();
        if (LOG.isDebugEnabled()) {
            LOG.debug("closing connection to " + this.socketAddressSupplier);
        }
        while (this.clientChannel != null) {
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("waiting for disconnect to " + this.socketAddressSupplier);
        }
    }

    private void sendCloseMessage() {
        this.lock(() -> {
            this.writeMetaDataForKnownTID(0L, this.outWire, null, 0L);
            this.outWire.writeDocument(false, w -> w.writeEventName((WireKey)EventId.onClientClosing).text((CharSequence)""));
            this.writeSocket((WireOut)this.outWire);
        }, false);
        try {
            boolean await = this.receivedClosedAcknowledgement.await(1L, TimeUnit.SECONDS);
            if (!await) {
                LOG.warn("SERVER IGNORED CLOSE REQUEST: shutting down the client anyway as the server did not respond to the close() request.");
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public long nextUniqueTransaction(long timeMs) {
        long old;
        long id = timeMs;
        do {
            if ((old = this.transactionID.get()) < id) continue;
            id = old + 1L;
        } while (!this.transactionID.compareAndSet(old, id));
        return id;
    }

    public void writeSocket(@NotNull WireOut wire) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        SocketChannel clientChannel = this.clientChannel;
        if (clientChannel == null) {
            throw new ConnectionDroppedException("Not Connected " + this.socketAddressSupplier);
        }
        try {
            this.writeSocket1(wire, this.timeoutMs, clientChannel);
        }
        catch (Exception e) {
            LOG.error("", (Throwable)e);
            this.closeSocket();
            throw new ConnectionDroppedException((Throwable)e);
        }
    }

    public Wire proxyReply(long timeoutTime, long tid) throws ConnectionDroppedException {
        try {
            return this.tcpSocketConsumer.syncBlockingReadSocket(timeoutTime, tid);
        }
        catch (ConnectionDroppedException e) {
            this.closeSocket();
            throw e;
        }
        catch (AssertionError | RuntimeException e) {
            LOG.error("", (Throwable)e);
            this.closeSocket();
            throw e;
        }
        catch (Exception e) {
            LOG.error("", (Throwable)e);
            this.closeSocket();
            throw Jvm.rethrow((Throwable)e);
        }
    }

    private void writeSocket1(@NotNull WireOut outWire, long timeoutTime, @NotNull SocketChannel socketChannel) throws IOException {
        Bytes bytes = outWire.bytes();
        ByteBuffer outBuffer = (ByteBuffer)bytes.underlyingObject();
        outBuffer.limit((int)bytes.writePosition());
        outBuffer.position(0);
        this.outBytesLock().isHeldByCurrentThread();
        this.logToStandardOutMessageSent(outWire, outBuffer);
        this.updateLargestChunkSoFarSize(outBuffer);
        long start = Time.currentTimeMillis();
        try {
            if (socketChannel.isOpen()) {
                socketChannel.configureBlocking(false);
            }
        }
        catch (ClosedChannelException ignored) {
            this.closeSocket();
        }
        try {
            while (outBuffer.remaining() > 0) {
                int prevRemaining = outBuffer.remaining();
                int len = socketChannel.write(outBuffer);
                if (prevRemaining != outBuffer.remaining()) {
                    start = Time.currentTimeMillis();
                }
                if (len == -1) {
                    throw new IORuntimeException("Disconnection to server=" + this.socketAddressSupplier + ", name=" + this.name);
                }
                long writeTime = Time.currentTimeMillis() - start;
                if (writeTime <= 5000L) continue;
                for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
                    Thread thread = entry.getKey();
                    if (thread.getThreadGroup().getName().equals("system")) continue;
                    StringBuilder sb = new StringBuilder();
                    sb.append(thread).append(" ").append((Object)thread.getState());
                    Jvm.trimStackTrace((StringBuilder)sb, (StackTraceElement[])entry.getValue());
                    sb.append("\n");
                    LOG.error("\n========= THREAD DUMP =========\n", (Object)sb);
                }
                this.closeSocket();
                throw new IORuntimeException("Took " + writeTime + " ms " + "to perform a write, remaining= " + outBuffer.remaining());
            }
        }
        catch (IOException e) {
            this.closeSocket();
            throw e;
        }
        finally {
            try {
                if (socketChannel.isOpen()) {
                    socketChannel.configureBlocking(true);
                }
            }
            catch (ClosedChannelException ignored) {
                this.closeSocket();
            }
        }
        outBuffer.clear();
        bytes.clear();
    }

    private void logToStandardOutMessageSent(@NotNull WireOut wire, @NotNull ByteBuffer outBuffer) {
        if (!Jvm.isDebug() || !YamlLogging.clientWrites) {
            return;
        }
        Bytes bytes = wire.bytes();
        try {
            if (bytes.readRemaining() > 0L) {
                LOG.info((!YamlLogging.title.isEmpty() ? "### " + YamlLogging.title + "\n" : "") + "" + YamlLogging.writeMessage + (YamlLogging.writeMessage.isEmpty() ? "" : "\n\n") + "sends:\n\n" + "```yaml\n" + Wires.fromSizePrefixedBinaryToText((Bytes)bytes) + "```");
            }
            YamlLogging.title = "";
            YamlLogging.writeMessage = "";
        }
        catch (Exception e) {
            LOG.error(Bytes.toString((Bytes)bytes), (Throwable)e);
        }
    }

    private void updateLargestChunkSoFarSize(@NotNull ByteBuffer outBuffer) {
        int sizeOfThisChunk = (int)((long)outBuffer.limit() - this.limitOfLast);
        if (this.largestChunkSoFar < (long)sizeOfThisChunk) {
            this.largestChunkSoFar = sizeOfThisChunk;
        }
        this.limitOfLast = outBuffer.limit();
    }

    public Wire outWire() {
        assert (this.outBytesLock().isHeldByCurrentThread());
        return this.outWire;
    }

    void reflectServerHeartbeatMessage(@NotNull ValueIn valueIn) {
        long timestamp = valueIn.int64();
        this.lock(() -> {
            this.writeMetaDataForKnownTID(0L, this.outWire, null, 0L);
            this.outWire.writeDocument(false, w -> w.writeEventName((WireKey)EventId.heartbeatReply).int64(timestamp));
            this.writeSocket((WireOut)this.outWire);
        }, true);
    }

    public long writeMetaDataStartTime(long startTime, @NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        long tid = this.nextUniqueTransaction(startTime);
        this.writeMetaDataForKnownTID(tid, wire, csp, cid);
        return tid;
    }

    public void writeMetaDataForKnownTID(long tid, @NotNull Wire wire, @Nullable String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        wire.writeDocument(true, wireOut -> {
            if (cid == 0L) {
                wireOut.writeEventName((WireKey)CoreFields.csp).text((CharSequence)csp);
            } else {
                wireOut.writeEventName((WireKey)CoreFields.cid).int64(cid);
            }
            wireOut.writeEventName((WireKey)CoreFields.tid).int64(tid);
        });
    }

    public void writeAsyncHeader(@NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        wire.writeDocument(true, wireOut -> {
            if (cid == 0L) {
                wireOut.writeEventName((WireKey)CoreFields.csp).text((CharSequence)csp);
            } else {
                wireOut.writeEventName((WireKey)CoreFields.cid).int64(cid);
            }
        });
    }

    public boolean lock(@NotNull Task r) {
        return this.lock(r, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean lock(@NotNull Task r, boolean tryLock) {
        if (this.clientChannel == null) {
            return tryLock;
        }
        ReentrantLock lock = this.outBytesLock();
        if (tryLock) {
            if (!lock.tryLock()) {
                return false;
            }
        } else {
            lock.lock();
        }
        try {
            r.run();
            this.writeSocket((WireOut)this.outWire());
        }
        catch (Exception e) {
            LOG.debug("", (Throwable)e);
            boolean bl = false;
            return bl;
        }
        finally {
            lock.unlock();
        }
        return true;
    }

    public void checkConnection() throws InterruptedException {
        long start = Time.currentTimeMillis();
        while (this.clientChannel == null) {
            this.tcpSocketConsumer.checkNotShutdown();
            if (start + this.timeoutMs > Time.currentTimeMillis()) {
                Thread.sleep(50L);
                continue;
            }
            throw new IORuntimeException("Not connected to " + this.socketAddressSupplier);
        }
    }

    private class TcpSocketConsumer
    implements EventHandler {
        @NotNull
        private final ExecutorService executorService;
        @NotNull
        private final Map<Long, Object> map = new ConcurrentHashMap<Long, Object>();
        private final Map<Long, Object> omap = new ConcurrentHashMap<Long, Object>();
        long lastheartbeatSentTime = 0L;
        private Function<Bytes, Wire> wireFunction;
        private long tid;
        @NotNull
        private ThreadLocal<Wire> syncInWireThreadLocal = ThreadLocal.withInitial(() -> (Wire)TcpChannelHub.this.wire.apply(Bytes.elasticByteBuffer()));
        private Bytes serverHeartBeatHandler = Bytes.elasticByteBuffer();
        private volatile long lastTimeMessageReceived = Time.currentTimeMillis();
        private volatile boolean isShutdown;
        @Nullable
        private volatile Throwable shutdownHere = null;

        private TcpSocketConsumer(Function<Bytes, Wire> wireFunction) {
            this.wireFunction = wireFunction;
            if (LOG.isDebugEnabled()) {
                LOG.debug("constructor remoteAddress=" + TcpChannelHub.this.socketAddressSupplier);
            }
            this.executorService = this.start();
        }

        private void reconnect() {
            TcpChannelHub.this.preventSubscribeUponReconnect.forEach(this::unsubscribe);
            this.map.values().forEach(v -> {
                if (v instanceof AsyncSubscription && !(v instanceof AsyncTemporarySubscription)) {
                    ((AsyncSubscription)v).applySubscribe();
                }
            });
        }

        public void onConnectionClosed() {
            this.map.values().forEach(v -> {
                Object object;
                if (v instanceof Bytes) {
                    object = v;
                    synchronized (object) {
                        v.notifyAll();
                    }
                }
                if (v instanceof AsyncSubscription) {
                    ((AsyncSubscription)v).onClose();
                } else if (v instanceof Bytes) {
                    object = v;
                    synchronized (object) {
                        v.notifyAll();
                    }
                }
            });
        }

        @NotNull
        public HandlerPriority priority() {
            return HandlerPriority.MONITOR;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        Wire syncBlockingReadSocket(long timeoutTimeMs, long tid) throws InterruptedException, TimeoutException, ConnectionDroppedException {
            long start = Time.currentTimeMillis();
            Wire wire = this.syncInWireThreadLocal.get();
            wire.clear();
            Bytes bytes = wire.bytes();
            ((ByteBuffer)bytes.underlyingObject()).clear();
            Bytes bytes2 = bytes;
            synchronized (bytes2) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("tid=" + tid + " of client request");
                }
                bytes.clear();
                this.registerSubscribe(tid, bytes);
                do {
                    bytes.wait(timeoutTimeMs);
                    if (TcpChannelHub.this.clientChannel != null) continue;
                    throw new ConnectionDroppedException("Connection Closed : the connection to the server has been dropped.");
                } while (bytes.readLimit() == 0L && !this.isShutdown);
            }
            TcpChannelHub.logToStandardOutMessageReceived(wire);
            if (Time.currentTimeMillis() - start >= timeoutTimeMs) {
                throw new TimeoutException("timeoutTimeMs=" + timeoutTimeMs);
            }
            return wire;
        }

        private void registerSubscribe(long tid, Object bytes) {
            TcpChannelHub.this.outBytesLock().isHeldByCurrentThread();
            Object prev = this.map.put(tid, bytes);
            assert (prev == null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void subscribe(@NotNull AsyncSubscription asyncSubscription, boolean tryLock) {
            TcpSocketConsumer tcpSocketConsumer = this;
            synchronized (tcpSocketConsumer) {
                if (TcpChannelHub.this.clientChannel == null) {
                    TcpChannelHub.this.outBytesLock().isHeldByCurrentThread();
                    this.registerSubscribe(asyncSubscription.tid(), asyncSubscription);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("deferred subscription tid=" + asyncSubscription.tid() + "," + "asyncSubscription=" + asyncSubscription);
                    }
                    return;
                }
            }
            ReentrantLock reentrantLock = TcpChannelHub.this.outBytesLock();
            if (tryLock) {
                if (!reentrantLock.tryLock()) {
                    return;
                }
            } else {
                reentrantLock.lock();
            }
            try {
                this.registerSubscribe(asyncSubscription.tid(), asyncSubscription);
                asyncSubscription.applySubscribe();
            }
            catch (Exception e) {
                LOG.error("", (Throwable)e);
            }
            finally {
                reentrantLock.unlock();
            }
        }

        public void unsubscribe(long tid) {
            this.map.remove(tid);
        }

        @NotNull
        private ExecutorService start() {
            this.checkNotShutdown();
            ExecutorService executorService = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("TcpChannelHub-" + TcpChannelHub.this.socketAddressSupplier, Boolean.valueOf(true)));
            assert (this.shutdownHere == null);
            assert (!this.isShutdown);
            executorService.submit(() -> {
                block3: {
                    try {
                        this.running();
                    }
                    catch (IORuntimeException e) {
                        LOG.debug("", (Throwable)e);
                    }
                    catch (Throwable e) {
                        if (this.isShutdown()) break block3;
                        LOG.error("", e);
                    }
                }
            });
            return executorService;
        }

        public void checkNotShutdown() {
            if (this.isShutdown) {
                throw new IORuntimeException("Called after shutdown", this.shutdownHere);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void running() {
            try {
                Wire inWire = this.wireFunction.apply(Bytes.elasticByteBuffer());
                assert (inWire != null);
                while (!this.isShutdown()) {
                    this.checkConnectionState();
                    try {
                        Bytes bytes = inWire.bytes();
                        this.blockingRead((WireIn)inWire, 4L);
                        int header = bytes.readVolatileInt(0L);
                        long messageSize = this.size(header);
                        if (Wires.isData((long)header)) {
                            assert (messageSize < Integer.MAX_VALUE);
                            boolean clearTid = this.processData(this.tid, Wires.isReady((long)header), header, (int)messageSize, inWire);
                            if (!clearTid) continue;
                            this.tid = -1L;
                            continue;
                        }
                        this.blockingRead((WireIn)inWire, messageSize);
                        TcpChannelHub.logToStandardOutMessageReceived(inWire);
                        this.tid = -1L;
                        inWire.readDocument(w -> {
                            this.tid = CoreFields.tid(w);
                        }, null);
                    }
                    catch (IOException e) {
                        this.tid = -1L;
                        break;
                    }
                    catch (Exception e) {
                        this.tid = -1L;
                        if (this.isShutdown()) break;
                        LOG.warn("reconnecting due to unexpected ", (Throwable)e);
                        Thread.sleep(50L);
                    }
                    finally {
                        TcpChannelHub.this.clear(inWire);
                    }
                }
                TcpChannelHub.this.clear(inWire);
            }
            catch (Throwable e) {
                if (!this.isShutdown()) {
                    LOG.error("", e);
                }
            }
            finally {
                TcpChannelHub.this.closeSocket();
            }
        }

        private boolean isShutdown() {
            return this.isShutdown;
        }

        private long size(int header) {
            long messageSize = Wires.lengthOf((long)header);
            assert (messageSize > 0L) : "Invalid message size " + messageSize;
            assert (messageSize < 0x40000000L) : "Invalid message size " + messageSize;
            return messageSize;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean processData(long tid, boolean isReady, int header, int messageSize, @NotNull Wire inWire) throws IOException, InterruptedException {
            assert (tid != -1L);
            boolean isLastMessageForThisTid = false;
            long startTime = 0L;
            Object o = null;
            if (tid != 0L) {
                SocketChannel c = TcpChannelHub.this.clientChannel;
                if (c == null) {
                    return false;
                }
                while (!this.isShutdown() && c.isOpen()) {
                    o = this.map.get(tid);
                    if (o != null) {
                        if (!isReady || !(o instanceof Bytes) && !(o instanceof AsyncTemporarySubscription)) break;
                        this.omap.put(tid, this.map.remove(tid));
                        isLastMessageForThisTid = true;
                        break;
                    }
                    o = this.omap.get(tid);
                    if (o != null) {
                        this.blockingRead((WireIn)inWire, messageSize);
                        TcpChannelHub.logToStandardOutMessageReceivedInERROR(inWire);
                        throw new AssertionError((Object)("Found tid=" + tid + " in the old map."));
                    }
                    if (startTime == 0L) {
                        startTime = Time.currentTimeMillis();
                    } else {
                        Thread.sleep(1L);
                    }
                    if (Time.currentTimeMillis() - startTime <= 3000L) continue;
                    this.blockingRead((WireIn)inWire, messageSize);
                    TcpChannelHub.logToStandardOutMessageReceived(inWire);
                    LOG.debug("unable to respond to tid=" + tid + ", given that we have " + "received a message we a tid which is unknown, this can occur " + "sometime if " + "the subscription has just become unregistered ( an the server " + "has not yet processed the unregister event ) ");
                    return isLastMessageForThisTid;
                }
                if (o == null) {
                    return isLastMessageForThisTid;
                }
            }
            if (tid == 0L) {
                this.processServerSystemMessage(header, messageSize);
                return isLastMessageForThisTid;
            }
            if (o instanceof AsyncSubscription) {
                this.blockingRead((WireIn)inWire, messageSize);
                TcpChannelHub.logToStandardOutMessageReceived(inWire);
                AsyncSubscription asyncSubscription = (AsyncSubscription)o;
                asyncSubscription.onConsumer((WireIn)inWire);
            }
            if (o instanceof Bytes) {
                Bytes bytes;
                Bytes bytes2 = bytes = (Bytes)o;
                synchronized (bytes2) {
                    bytes.clear();
                    bytes.ensureCapacity((long)(4 + messageSize));
                    ByteBuffer byteBuffer = (ByteBuffer)bytes.underlyingObject();
                    byteBuffer.clear();
                    bytes.writeInt(0L, header);
                    byteBuffer.position(4);
                    byteBuffer.limit(4 + messageSize);
                    this.readBuffer(byteBuffer);
                    bytes.readLimit((long)byteBuffer.position());
                    bytes.notifyAll();
                }
            }
            return isLastMessageForThisTid;
        }

        private void processServerSystemMessage(int header, int messageSize) throws IOException {
            this.serverHeartBeatHandler.clear();
            Bytes bytes = this.serverHeartBeatHandler;
            bytes.clear();
            ByteBuffer byteBuffer = (ByteBuffer)bytes.underlyingObject();
            byteBuffer.clear();
            bytes.writeInt(0L, header);
            byteBuffer.position(4);
            byteBuffer.limit(4 + messageSize);
            this.readBuffer(byteBuffer);
            bytes.readLimit((long)byteBuffer.position());
            StringBuilder eventName = Wires.acquireStringBuilder();
            Wire inWire = (Wire)TcpChannelHub.this.wire.apply(bytes);
            TcpChannelHub.logToStandardOutMessageReceived(inWire);
            inWire.readDocument(null, d -> {
                ValueIn valueIn = d.readEventName(eventName);
                if (EventId.heartbeat.contentEquals(eventName)) {
                    TcpChannelHub.this.reflectServerHeartbeatMessage(valueIn);
                } else if (EventId.onClosingReply.contentEquals(eventName)) {
                    TcpChannelHub.this.receivedClosedAcknowledgement.countDown();
                }
            });
        }

        private void blockingRead(@NotNull WireIn wire, long numberOfBytes) throws IOException {
            Bytes bytes = wire.bytes();
            bytes.ensureCapacity(bytes.writePosition() + numberOfBytes);
            ByteBuffer buffer = (ByteBuffer)bytes.underlyingObject();
            int start = (int)bytes.writePosition();
            buffer.position(start);
            buffer.limit((int)((long)start + numberOfBytes));
            this.readBuffer(buffer);
            bytes.readLimit((long)buffer.position());
        }

        private void readBuffer(@NotNull ByteBuffer buffer) throws IOException {
            while (buffer.remaining() > 0) {
                SocketChannel clientChannel = TcpChannelHub.this.clientChannel;
                if (clientChannel == null) {
                    throw new IOException("Disconnection to server=" + TcpChannelHub.this.socketAddressSupplier + " channel is closed, name=" + TcpChannelHub.this.name);
                }
                int numberOfBytesRead = clientChannel.read(buffer);
                WanSimulator.dataRead(numberOfBytesRead);
                if (numberOfBytesRead == -1) {
                    throw new IOException("Disconnection to server=" + TcpChannelHub.this.socketAddressSupplier + " read=-1 " + ", name=" + TcpChannelHub.this.name);
                }
                if (numberOfBytesRead > 0) {
                    this.onMessageReceived();
                }
                if (!this.isShutdown) continue;
                throw new IOException("The server" + TcpChannelHub.this.socketAddressSupplier + " was shutdown, " + "name=" + TcpChannelHub.this.name);
            }
        }

        private void onMessageReceived() {
            this.lastTimeMessageReceived = Time.currentTimeMillis();
        }

        private void sendHeartbeat() {
            final long l = System.nanoTime();
            this.subscribe(new AbstractAsyncTemporarySubscription(TcpChannelHub.this, null, TcpChannelHub.this.name){

                @Override
                public void onSubscribe(@NotNull WireOut wireOut) {
                    wireOut.writeEventName((WireKey)EventId.heartbeat).int64(Time.currentTimeMillis());
                }

                @Override
                public void onConsumer(@NotNull WireIn inWire) {
                    long roundTipTimeMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - l);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("heartbeat round trip time=" + roundTipTimeMicros + "" + " server=" + TcpChannelHub.this.socketAddressSupplier);
                    }
                    inWire.clear();
                }
            }, true);
        }

        private void stop() {
            if (this.isShutdown) {
                return;
            }
            if (this.shutdownHere == null) {
                this.shutdownHere = new Throwable(Thread.currentThread() + " Shutdown here");
            }
            this.isShutdown = true;
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS)) {
                    this.executorService.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                this.executorService.shutdownNow();
            }
        }

        public boolean action() throws InvalidEventHandlerException {
            long x;
            if (TcpChannelHub.this.clientChannel == null) {
                throw new InvalidEventHandlerException();
            }
            long currentTime = Time.currentTimeMillis();
            long millisecondsSinceLastMessageReceived = currentTime - this.lastTimeMessageReceived;
            long millisecondsSinceLastHeatbeatSend = currentTime - this.lastheartbeatSentTime;
            if (millisecondsSinceLastMessageReceived >= (long)HEATBEAT_PING_PERIOD && millisecondsSinceLastHeatbeatSend >= (long)HEATBEAT_PING_PERIOD) {
                this.lastheartbeatSentTime = Time.currentTimeMillis();
                this.sendHeartbeat();
            }
            if ((x = millisecondsSinceLastMessageReceived - (long)HEATBEAT_TIMEOUT_PERIOD) > 0L) {
                LOG.warn("reconnecting due to heartbeat failure, millisecondsSinceLastMessageReceived=" + millisecondsSinceLastMessageReceived);
                TcpChannelHub.this.closeSocket();
                throw new InvalidEventHandlerException();
            }
            return true;
        }

        private void checkConnectionState() throws IOException {
            if (TcpChannelHub.this.clientChannel != null) {
                return;
            }
            this.attemptConnect();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void attemptConnect() throws IOException {
            this.keepSubscriptionsClearEverythingElse();
            long start = System.currentTimeMillis();
            TcpChannelHub.this.socketAddressSupplier.startAtFirstAddress();
            block10: while (true) {
                this.checkNotShutdown();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("attemptConnect remoteAddress=" + TcpChannelHub.this.socketAddressSupplier);
                }
                try {
                    SocketChannel socketChannel;
                    while (true) {
                        if (this.isShutdown()) continue block10;
                        if (start + TcpChannelHub.this.socketAddressSupplier.timeoutMS() < System.currentTimeMillis()) {
                            String oldAddress = TcpChannelHub.this.socketAddressSupplier.toString();
                            TcpChannelHub.this.socketAddressSupplier.failoverToNextAddress();
                            LOG.info("Connection Dropped to address=" + oldAddress + ", so will fail over to" + TcpChannelHub.this.socketAddressSupplier + ", name=" + TcpChannelHub.this.name);
                            if (TcpChannelHub.this.socketAddressSupplier.get() == null) {
                                LOG.warn("failed to establish a socket connection of any of the following servers=" + TcpChannelHub.this.socketAddressSupplier.all() + " so will re-attempt");
                                TcpChannelHub.this.socketAddressSupplier.startAtFirstAddress();
                            }
                            start = System.currentTimeMillis();
                        }
                        socketChannel = TcpChannelHub.this.openSocketChannel();
                        try {
                            if (socketChannel == null) {
                                LOG.error("Unable to open socketChannel to remoteAddress=" + TcpChannelHub.this.socketAddressSupplier);
                                Jvm.pause((long)250L);
                                continue;
                            }
                            SocketAddress socketAddress = TcpChannelHub.this.socketAddressSupplier.get();
                            if (socketAddress == null) {
                                throw new IORuntimeException("failed to connect as socketAddress=null");
                            }
                            SocketAddress remote = TcpChannelHub.this.socketAddressSupplier.get();
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("attempting to connect to address=" + remote);
                            }
                            if (socketChannel.connect(remote)) break;
                            LOG.error("Unable to connect to remoteAddress=" + TcpChannelHub.this.socketAddressSupplier);
                            Jvm.pause((long)250L);
                        }
                        catch (ConnectException e) {
                            LOG.info("Server is unavailable, ConnectException to remoteAddress=" + TcpChannelHub.this.socketAddressSupplier);
                            Jvm.pause((long)250L);
                        }
                    }
                    TcpChannelHub.this.outBytesLock().lock();
                    try {
                        TcpChannelHub.this.clear(TcpChannelHub.this.outWire);
                        this.onMessageReceived();
                        TcpChannelHub.this.doHandShaking(socketChannel);
                        TcpSocketConsumer e = this;
                        synchronized (e) {
                            TcpChannelHub.this.clientChannel = socketChannel;
                        }
                        TcpChannelHub.this.eventLoop.addHandler((EventHandler)this);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("successfully connected to remoteAddress=" + TcpChannelHub.this.socketAddressSupplier);
                        }
                        this.reconnect();
                        TcpChannelHub.this.onConnected();
                    }
                    finally {
                        TcpChannelHub.this.outBytesLock().unlock();
                    }
                }
                catch (Exception e) {
                    if (this.isShutdown) continue;
                    LOG.error("failed to connect remoteAddress=" + TcpChannelHub.this.socketAddressSupplier + " so will reconnect " + e.getMessage());
                    TcpChannelHub.this.closeSocket();
                    continue;
                }
                break;
            }
        }

        private void keepSubscriptionsClearEverythingElse() {
            this.tid = 0L;
            this.omap.clear();
            HashSet<Long> keys = new HashSet<Long>(this.map.keySet());
            keys.forEach(k -> {
                Object o = this.map.get(k);
                if (o instanceof Bytes || o instanceof AsyncTemporarySubscription) {
                    this.map.remove(k);
                }
            });
        }
    }

    public static interface Task {
        public void run();
    }
}

