/*
 * Decompiled with CFR 0.152.
 */
package com.tc.net.core;

import com.tc.bytes.TCByteBuffer;
import com.tc.bytes.TCDirectByteBufferCache;
import com.tc.io.TCByteBufferOutputStream;
import com.tc.io.TCDirectByteBufferOutputStream;
import com.tc.net.core.BufferManager;
import com.tc.net.core.BufferManagerFactory;
import com.tc.net.core.CoreNIOServices;
import com.tc.net.core.SocketParams;
import com.tc.net.core.TCChannelReader;
import com.tc.net.core.TCChannelWriter;
import com.tc.net.core.TCConnection;
import com.tc.net.core.TCConnectionManagerImpl;
import com.tc.net.core.event.TCConnectionEventCaller;
import com.tc.net.core.event.TCConnectionEventListener;
import com.tc.net.protocol.TCNetworkMessage;
import com.tc.net.protocol.TCProtocolAdaptor;
import com.tc.net.protocol.tcm.TCActionNetworkMessage;
import com.tc.net.protocol.transport.WireProtocolGroupMessageImpl;
import com.tc.net.protocol.transport.WireProtocolHeader;
import com.tc.net.protocol.transport.WireProtocolMessage;
import com.tc.net.protocol.transport.WireProtocolMessageImpl;
import com.tc.properties.TCPropertiesImpl;
import com.tc.text.PrettyPrintable;
import com.tc.util.Assert;
import com.tc.util.TCTimeoutException;
import com.tc.util.concurrent.SetOnceFlag;
import com.tc.util.concurrent.SetOnceRef;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class TCConnectionImpl
implements TCConnection,
TCChannelReader,
TCChannelWriter {
    private static final long NO_CONNECT_TIME = -1L;
    private static final Logger logger = LoggerFactory.getLogger(TCConnection.class);
    private static final long WARN_THRESHOLD = 0x800000L;
    private volatile CoreNIOServices commWorker;
    private volatile SocketChannel channel;
    private volatile BufferManager bufferManager;
    private final BufferManagerFactory bufferManagerFactory;
    private final boolean clientConnection;
    private final AtomicBoolean transportEstablished = new AtomicBoolean(false);
    private final BlockingQueue<TCNetworkMessage> writeMessages = new ArrayBlockingQueue<TCNetworkMessage>(MSG_GROUPING_MAX_COUNT);
    private final TCConnectionManagerImpl parent;
    private final TCDirectByteBufferCache buffers;
    private final TCConnectionEventCaller eventCaller = new TCConnectionEventCaller(logger);
    private final AtomicLong lastDataWriteTime = new AtomicLong(System.currentTimeMillis());
    private final LongAdder messagesWritten = new LongAdder();
    private final AtomicLong lastDataReceiveTime = new AtomicLong(System.currentTimeMillis());
    private final LongAdder messagesRead = new LongAdder();
    private final AtomicLong connectTime = new AtomicLong(-1L);
    private final List<TCConnectionEventListener> eventListeners = new CopyOnWriteArrayList<TCConnectionEventListener>();
    private final TCProtocolAdaptor protocolAdaptor;
    private final AtomicBoolean isSocketEndpoint = new AtomicBoolean(false);
    private final SetOnceFlag closed = new SetOnceFlag();
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private final SetOnceRef<InetSocketAddress> localSocketAddress = new SetOnceRef();
    private final SetOnceRef<InetSocketAddress> remoteSocketAddress = new SetOnceRef();
    private final SocketParams socketParams;
    private final AtomicLong totalRead = new AtomicLong(0L);
    private final AtomicLong totalWrite = new AtomicLong(0L);
    private final Queue<WriteContext> writeContexts = new ConcurrentLinkedQueue<WriteContext>();
    private final ReentrantLock writeContextControl = new ReentrantLock();
    private static final boolean MSG_GROUPING_ENABLED = TCPropertiesImpl.getProperties().getBoolean("tc.messages.grouping.enabled");
    private static final int MSG_GROUPING_MAX_SIZE_BYTES = TCPropertiesImpl.getProperties().getInt("tc.messages.grouping.maxSizeKiloBytes", 128) * 1024;
    private static final int MSG_GROUPING_MAX_COUNT = TCPropertiesImpl.getProperties().getInt("tc.messages.grouping.maxCount", 1024);
    private static final boolean MESSAGE_PACKUP = TCPropertiesImpl.getProperties().getBoolean("tc.messages.packup.enabled", false);
    private final Object readerLock = new Object();
    private final Object writerLock = new Object();

    TCConnectionImpl(TCConnectionEventListener listener, TCProtocolAdaptor adaptor, TCConnectionManagerImpl managerJDK14, CoreNIOServices nioServiceThread, SocketParams socketParams, BufferManagerFactory bufferManagerFactory) {
        this(listener, adaptor, null, managerJDK14, nioServiceThread, socketParams, bufferManagerFactory);
    }

    TCConnectionImpl(TCConnectionEventListener listener, TCProtocolAdaptor adaptor, SocketChannel ch, TCConnectionManagerImpl parent, CoreNIOServices nioServiceThread, SocketParams socketParams, BufferManagerFactory bufferManagerFactory) {
        Assert.assertNotNull(parent);
        Assert.assertNotNull(adaptor);
        this.parent = parent;
        this.protocolAdaptor = adaptor;
        if (listener != null) {
            this.addListener(listener);
        }
        this.channel = ch;
        this.bufferManagerFactory = bufferManagerFactory;
        if (ch != null) {
            socketParams.applySocketParams(ch.socket());
            this.clientConnection = false;
        } else {
            this.clientConnection = true;
        }
        this.socketParams = socketParams;
        this.commWorker = nioServiceThread;
        this.buffers = new TCDirectByteBufferCache(parent.getBufferCache());
    }

    @Override
    public Map<String, ?> getState() {
        LinkedHashMap<String, Object> state = new LinkedHashMap<String, Object>();
        state.put("localAddress", this.getLocalAddress());
        state.put("remoteAddress", this.getRemoteAddress());
        state.put("totalRead", this.totalRead.get());
        state.put("totalWrite", this.totalWrite.get());
        state.put("connectTime", new Date(this.getConnectTime()));
        state.put("receiveIdleTime", this.getIdleReceiveTime());
        state.put("idleTime", this.getIdleTime());
        state.put("messageWritten", this.messagesWritten.longValue());
        state.put("messageRead", this.messagesRead.longValue());
        state.put("worker", this.commWorker.getName());
        state.put("closed", this.isClosed());
        state.put("connected", this.isConnected());
        state.put("transportConnected", this.isTransportEstablished());
        state.put("buffers.cached", this.buffers.size());
        state.put("buffers.referenced", this.buffers.referenced());
        if (this.bufferManager instanceof PrettyPrintable) {
            state.put("buffer", ((PrettyPrintable)((Object)this.bufferManager)).getStateMap());
        } else {
            state.put("buffer", this.bufferManager.toString());
        }
        return state;
    }

    public void setCommWorker(CoreNIOServices worker) {
        this.commWorker = worker;
    }

    private Future<Void> closeImpl(Runnable callback) {
        Assert.assertTrue(this.closed.isSet());
        this.transportEstablished.set(false);
        try {
            if (this.bufferManager != null) {
                this.bufferManager.close();
            }
        }
        catch (EOFException eof) {
            logger.debug("closed", (Throwable)eof);
        }
        catch (IOException ioe) {
            logger.warn("failed to close buffer manager", (Throwable)ioe);
        }
        if (this.channel != null) {
            CompletableFuture<Void> complete = new CompletableFuture<Void>();
            this.commWorker.cleanupChannel(this.channel, () -> {
                try {
                    callback.run();
                    this.cleanupUnsentWriteMessages();
                    complete.complete(null);
                }
                catch (Exception e) {
                    complete.completeExceptionally(e);
                }
            });
            return complete;
        }
        callback.run();
        this.cleanupUnsentWriteMessages();
        return CompletableFuture.completedFuture(null);
    }

    private void cleanupUnsentWriteMessages() {
        this.writeMessages.forEach(TCNetworkMessage::complete);
        this.writeMessages.clear();
        this.writeContexts.forEach(WriteContext::writeComplete);
        this.writeContexts.clear();
    }

    protected void finishConnect() throws IOException {
        Assert.assertNotNull("channel", this.channel);
        Assert.assertNotNull("commWorker", this.commWorker);
        this.installBufferManager();
        this.recordSocketAddress(this.channel.socket());
        this.setConnected(true);
        this.eventCaller.fireConnectEvent(this.eventListeners, this);
    }

    private void connectImpl(InetSocketAddress addr, int timeout) throws IOException, TCTimeoutException {
        SocketChannel newSocket = null;
        InetSocketAddress inetAddr = new InetSocketAddress(InetAddress.getByName(addr.getHostString()), addr.getPort());
        int i = 1;
        if (i <= 3) {
            try {
                newSocket = this.createChannel();
                newSocket.configureBlocking(true);
                newSocket.socket().connect(inetAddr, timeout);
                newSocket.configureBlocking(false);
            }
            catch (SocketTimeoutException ste) {
                Assert.eval(this.commWorker != null);
                this.commWorker.cleanupChannel(newSocket, null);
                throw new TCTimeoutException("Timeout of " + timeout + "ms occured connecting to " + addr, ste);
            }
        }
        this.channel = newSocket;
    }

    private void installBufferManager() throws IOException {
        this.bufferManager = this.bufferManagerFactory.createBufferManager(this.channel, this.clientConnection);
        if (this.bufferManager == null) {
            throw new IOException("buffer manager not provided");
        }
    }

    private SocketChannel createChannel() throws IOException, SocketException {
        SocketChannel rv = SocketChannel.open();
        Socket s = rv.socket();
        this.socketParams.applySocketParams(s);
        return rv;
    }

    private boolean asynchConnectImpl(InetSocketAddress address) throws IOException {
        SocketChannel newSocket = this.createChannel();
        newSocket.configureBlocking(false);
        InetSocketAddress inetAddr = new InetSocketAddress(address.getAddress(), address.getPort());
        boolean rv = newSocket.connect(inetAddr);
        this.setConnected(rv);
        this.channel = newSocket;
        if (!rv) {
            this.commWorker.requestConnectInterest(this, newSocket);
        }
        return rv;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int doRead() throws IOException {
        Object object = this.readerLock;
        synchronized (object) {
            return this.doReadInternal();
        }
    }

    private int doReadInternal() throws IOException {
        int read;
        try {
            this.bufferManager.recvToBuffer();
        }
        catch (IOException ioe) {
            this.closeReadOnException(ioe);
            return 0;
        }
        int totalBytesReadFromBuffer = 0;
        do {
            try {
                read = this.doReadFromBuffer();
                totalBytesReadFromBuffer += read;
            }
            catch (IOException ioe) {
                this.closeReadOnException(ioe);
                break;
            }
        } while (read != 0);
        this.totalRead.addAndGet(totalBytesReadFromBuffer);
        this.messagesRead.increment();
        return totalBytesReadFromBuffer;
    }

    public int doReadFromBuffer() throws IOException {
        return this.doReadFromBufferInternal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int doWrite() throws IOException {
        Object object = this.writerLock;
        synchronized (object) {
            return this.doWriteInternal();
        }
    }

    private int doWriteInternal() throws IOException {
        int channelWritten;
        int sent;
        int written;
        try {
            written = this.doWriteToBuffer();
        }
        catch (IOException ioe) {
            this.closeWriteOnException(ioe);
            return 0;
        }
        for (channelWritten = 0; channelWritten != written; channelWritten += sent) {
            try {
                sent = this.bufferManager.sendFromBuffer();
            }
            catch (IOException ioe) {
                this.closeWriteOnException(ioe);
                break;
            }
            if (!this.isClosed()) continue;
            logger.debug("stop write due to closed connection");
            break;
        }
        this.totalWrite.addAndGet(channelWritten);
        return channelWritten;
    }

    private int doWriteToBuffer() throws IOException {
        return this.doWriteToBufferInternal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean buildWriteContextsFromMessages(boolean failfast) {
        if (failfast) {
            if (!this.writeContextControl.tryLock()) {
                this.writeContexts.removeIf(WriteContext::isNotValid);
                return false;
            }
        } else {
            this.writeContextControl.lock();
        }
        try {
            if (!this.writeMessages.isEmpty()) {
                WireProtocolMessage ms;
                ArrayList<TCActionNetworkMessage> currentBatch = new ArrayList<TCActionNetworkMessage>();
                int batchSize = 0;
                int batchMsgCount = 0;
                TCNetworkMessage element = (TCNetworkMessage)this.writeMessages.poll();
                while (element != null) {
                    if (this.closed.isSet()) {
                        element.complete();
                        boolean bl = false;
                        return bl;
                    }
                    if (element instanceof WireProtocolMessage) {
                        ms = this.finalizeWireProtocolMessage((WireProtocolMessage)element, 1);
                        this.writeContexts.add(new WriteContext(ms));
                    } else {
                        TCActionNetworkMessage batchable = (TCActionNetworkMessage)element;
                        if (batchable.load()) {
                            int bytesToWrite = batchable.getTotalLength();
                            if ((long)bytesToWrite >= 0x800000L) {
                                logger.warn("Warning: Attempting to send a message (" + batchable.getClass().getName() + ") of size " + bytesToWrite + " bytes");
                            }
                            if (MSG_GROUPING_ENABLED) {
                                if (!this.canBatch(bytesToWrite, batchSize, batchMsgCount)) {
                                    this.writeContexts.add(new WriteContext(this.buildWireProtocolMessageGroup(currentBatch)));
                                    batchSize = 0;
                                    currentBatch = new ArrayList(batchMsgCount);
                                    batchMsgCount = 0;
                                }
                                batchSize += bytesToWrite;
                                ++batchMsgCount;
                                currentBatch.add(batchable);
                            } else {
                                this.writeContexts.add(new WriteContext(this.buildWireProtocolMessage(batchable)));
                            }
                        } else {
                            batchable.complete();
                        }
                    }
                    element = (TCNetworkMessage)this.writeMessages.poll();
                }
                if (MSG_GROUPING_ENABLED && batchMsgCount > 0) {
                    ms = this.buildWireProtocolMessageGroup(currentBatch);
                    this.writeContexts.add(new WriteContext(ms));
                }
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.writeContextControl.unlock();
        }
    }

    private boolean canBatch(int realMessageSize, int currentBatchSize, int currentBatchMsgCount) {
        return 0 == currentBatchMsgCount || currentBatchSize + realMessageSize <= MSG_GROUPING_MAX_SIZE_BYTES && currentBatchMsgCount + 1 <= 65535;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int doReadFromBufferInternal() {
        boolean debug = logger.isDebugEnabled();
        TCByteBuffer[] readBuffers = this.getReadBuffers();
        int bytesRead = 0;
        for (TCByteBuffer readBuffer : readBuffers) {
            ByteBuffer buf = readBuffer.getNioBuffer();
            try {
                if (!buf.hasRemaining()) continue;
                int read = this.bufferManager.forwardFromReadBuffer(buf);
                if (0 == read) break;
                bytesRead += read;
                if (!buf.hasRemaining()) continue;
                break;
            }
            finally {
                readBuffer.returnNioBuffer(buf);
            }
        }
        Assert.eval(bytesRead >= 0);
        if (debug) {
            logger.debug("Read " + bytesRead + " bytes on connection " + this.channel.toString());
        }
        this.addNetworkData(readBuffers, bytesRead);
        return bytesRead;
    }

    private int doWriteToBufferInternal() throws IOException {
        boolean debug = logger.isDebugEnabled();
        int totalBytesWritten = 0;
        WriteContext context = this.writeContexts.peek();
        if (context == null && this.buildWriteContextsFromMessages(true)) {
            context = this.writeContexts.peek();
        }
        while (context != null) {
            long bytesWritten = context.writeBuffers();
            if (debug) {
                logger.debug("Wrote " + bytesWritten + " bytes on connection " + this.channel.toString());
            }
            totalBytesWritten = (int)((long)totalBytesWritten + bytesWritten);
            if (context.done()) {
                if (debug) {
                    logger.debug("Complete message sent on connection " + this.channel.toString());
                }
                context.writeComplete();
                this.writeContexts.remove();
                context = this.writeContexts.peek();
                continue;
            }
            if (!debug) break;
            logger.debug("Message not yet completely sent on connection " + this.channel.toString());
            break;
        }
        if (!this.closed.isSet() && context == null && !this.buildWriteContextsFromMessages(false)) {
            this.commWorker.removeWriteInterest(this, this.channel);
        }
        return totalBytesWritten;
    }

    private void putMessageImpl(TCNetworkMessage message) {
        boolean debug = logger.isDebugEnabled();
        boolean placed = false;
        boolean newData = false;
        while (!placed) {
            if (this.closed.isSet()) {
                message.complete();
                return;
            }
            placed = this.writeMessages.offer(message);
            if (!placed) {
                this.buildWriteContextsFromMessages(true);
                continue;
            }
            newData = this.writeMessages.peek() == message;
        }
        if (debug) {
            logger.debug("Connection (" + this.channel.toString() + ") has " + this.writeMessages.size() + " messages queued");
        }
        if (newData) {
            if (debug) {
                logger.debug("New message on connection, registering for write interest");
            }
            this.commWorker.requestWriteInterest(this, this.channel);
        }
    }

    @Override
    public void close() {
        try {
            this.asynchClose().get();
        }
        catch (ExecutionException exception) {
            logger.warn("error closing connection", (Throwable)exception);
        }
        catch (InterruptedException exception) {
            logger.warn("interrupted closing connection", (Throwable)exception);
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public Future<Void> asynchClose() {
        if (this.closed.attemptSet()) {
            return this.closeImpl(this.createCloseCallback());
        }
        this.parent.removeConnection(this);
        return CompletableFuture.completedFuture(null);
    }

    private Runnable createCloseCallback() {
        final boolean fireClose = this.isConnected();
        return new Runnable(){

            @Override
            public void run() {
                TCConnectionImpl.this.setConnected(false);
                TCConnectionImpl.this.parent.connectionClosed(TCConnectionImpl.this);
                if (fireClose) {
                    TCConnectionImpl.this.eventCaller.fireCloseEvent(TCConnectionImpl.this.eventListeners, TCConnectionImpl.this);
                }
                if (TCConnectionImpl.this.buffers != null) {
                    TCConnectionImpl.this.buffers.close();
                }
                if (TCConnectionImpl.this.bufferManager != null) {
                    TCConnectionImpl.this.bufferManager.dispose();
                }
            }
        };
    }

    @Override
    public final boolean isClosed() {
        return this.closed.isSet();
    }

    @Override
    public final boolean isConnected() {
        return this.connected.get();
    }

    public final String toString() {
        StringBuffer buf = new StringBuffer();
        buf.append(this.getClass().getName()).append('@').append(this.hashCode()).append(":");
        buf.append(" connected: ").append(this.isConnected());
        buf.append(", closed: ").append(this.isClosed());
        if (this.isSocketEndpoint.get()) {
            buf.append(" local=");
            if (this.localSocketAddress.isSet()) {
                buf.append(this.localSocketAddress.get().toString());
            } else {
                buf.append("[unknown]");
            }
            buf.append(" remote=");
            if (this.remoteSocketAddress.isSet()) {
                buf.append(this.remoteSocketAddress.get().toString());
            } else {
                buf.append("[unknown]");
            }
        }
        buf.append(" connect=[");
        long connect = this.getConnectTime();
        if (connect != -1L) {
            buf.append(new Date(connect));
        } else {
            buf.append("no connect time");
        }
        buf.append(']');
        buf.append(" idle=").append(this.getIdleTime()).append("ms");
        buf.append(" [").append(this.totalRead.get()).append(" read, ").append(this.totalWrite.get()).append(" write]");
        buf.append(" buffer=").append(this.bufferManager);
        return buf.toString();
    }

    @Override
    public final void addListener(TCConnectionEventListener listener) {
        if (listener == null) {
            return;
        }
        this.eventListeners.add(listener);
    }

    @Override
    public final void removeListener(TCConnectionEventListener listener) {
        if (listener == null) {
            return;
        }
        this.eventListeners.remove(listener);
    }

    @Override
    public final long getConnectTime() {
        return this.connectTime.get();
    }

    @Override
    public final long getIdleTime() {
        return System.currentTimeMillis() - (this.lastDataWriteTime.get() > this.lastDataReceiveTime.get() ? this.lastDataWriteTime.get() : this.lastDataReceiveTime.get());
    }

    @Override
    public final long getIdleReceiveTime() {
        return System.currentTimeMillis() - this.lastDataReceiveTime.get();
    }

    @Override
    public final synchronized Socket connect(InetSocketAddress addr, int timeout) throws IOException, TCTimeoutException {
        if (this.closed.isSet() || this.connected.get()) {
            throw new IllegalStateException("Connection closed or already connected");
        }
        this.connectImpl(addr, timeout);
        this.finishConnect();
        Assert.assertNotNull(this.commWorker);
        Assert.assertNotNull(this.bufferManager);
        this.commWorker.requestReadInterest(this, this.channel);
        return this.channel.socket();
    }

    @Override
    public final synchronized boolean asynchConnect(InetSocketAddress addr) throws IOException {
        if (this.closed.isSet() || this.connected.get()) {
            throw new IllegalStateException("Connection closed or already connected");
        }
        boolean rv = this.asynchConnectImpl(addr);
        if (rv) {
            this.finishConnect();
        }
        return rv;
    }

    @Override
    public final void putMessage(TCNetworkMessage message) {
        this.lastDataWriteTime.set(System.currentTimeMillis());
        this.messagesWritten.increment();
        this.putMessageImpl(message);
    }

    @Override
    public final InetSocketAddress getLocalAddress() {
        if (this.localSocketAddress.isSet()) {
            return this.localSocketAddress.get();
        }
        return null;
    }

    @Override
    public final InetSocketAddress getRemoteAddress() {
        if (this.remoteSocketAddress.isSet()) {
            return this.remoteSocketAddress.get();
        }
        return null;
    }

    private void setConnected(boolean connected) {
        if (connected) {
            this.connectTime.set(System.currentTimeMillis());
        }
        this.connected.set(connected);
    }

    private void recordSocketAddress(Socket socket) throws IOException {
        if (socket != null) {
            InetAddress localAddress = socket.getLocalAddress();
            InetAddress remoteAddress = socket.getInetAddress();
            if (remoteAddress != null && localAddress != null) {
                this.isSocketEndpoint.set(true);
                this.localSocketAddress.set(new InetSocketAddress(TCConnectionImpl.cloneInetAddress(localAddress), socket.getLocalPort()));
                this.remoteSocketAddress.set(new InetSocketAddress(TCConnectionImpl.cloneInetAddress(remoteAddress), socket.getPort()));
            } else {
                throw new IOException("socket is not connected");
            }
        }
    }

    private static InetAddress cloneInetAddress(InetAddress addr) {
        try {
            byte[] address = addr.getAddress();
            return InetAddress.getByAddress(address);
        }
        catch (UnknownHostException e) {
            throw new AssertionError((Object)e);
        }
    }

    private void addNetworkData(TCByteBuffer[] data, int length) {
        this.lastDataReceiveTime.set(System.currentTimeMillis());
        try {
            this.protocolAdaptor.addReadData(this, data, length, MESSAGE_PACKUP ? this.buffers : null);
        }
        catch (Exception e) {
            logger.error(this.toString() + " " + e.getMessage());
            for (TCByteBuffer tcByteBuffer : data) {
                tcByteBuffer.clear();
            }
            this.eventCaller.fireErrorEvent(this.eventListeners, this, e, null);
        }
    }

    protected final TCByteBuffer[] getReadBuffers() {
        return this.protocolAdaptor.getReadBuffers();
    }

    protected final void fireErrorEvent(Exception e, TCNetworkMessage context) {
        this.eventCaller.fireErrorEvent(this.eventListeners, this, e, context);
    }

    private WireProtocolMessage buildWireProtocolMessageGroup(ArrayList<TCActionNetworkMessage> messages) {
        int messageGroupSize = messages.size();
        Assert.assertTrue("Messages count not ok to build WireProtocolMessageGroup : " + messageGroupSize, messageGroupSize > 0 && messageGroupSize <= 65535);
        if (messageGroupSize == 1) {
            return this.buildWireProtocolMessage(messages.get(0));
        }
        WireProtocolGroupMessageImpl message = WireProtocolGroupMessageImpl.wrapMessages(messages, this);
        return this.finalizeWireProtocolMessage(message, messageGroupSize);
    }

    private WireProtocolMessage buildWireProtocolMessage(TCActionNetworkMessage message) {
        Assert.eval(!(message instanceof WireProtocolMessage));
        WireProtocolMessage wireMessage = WireProtocolMessageImpl.wrapMessage(message, this);
        return this.finalizeWireProtocolMessage(wireMessage, 1);
    }

    private WireProtocolMessage finalizeWireProtocolMessage(WireProtocolMessage message, int messageCount) {
        WireProtocolHeader hdr = (WireProtocolHeader)message.getHeader();
        hdr.setSourceAddress(this.getLocalAddress().getAddress().getAddress());
        hdr.setSourcePort(this.getLocalAddress().getPort());
        hdr.setDestinationAddress(this.getLocalAddress().getAddress().getAddress());
        hdr.setDestinationPort(this.getRemoteAddress().getPort());
        hdr.setMessageCount(messageCount);
        if (logger.isDebugEnabled()) {
            logger.debug("finalize header " + hdr);
        }
        return message;
    }

    public void closeReadOnException(IOException ioe) throws IOException {
        if (ioe instanceof EOFException) {
            if (logger.isDebugEnabled()) {
                logger.debug("EOF reading from channel " + this.channel.toString());
            }
            this.eventCaller.fireEndOfFileEvent(this.eventListeners, this);
        } else {
            if (!this.isClosed()) {
                logger.info("error reading from channel " + this.channel.toString() + ": " + ioe.getMessage());
            } else if (logger.isDebugEnabled()) {
                logger.debug("error reading from channel " + this.channel.toString() + ": " + ioe.getMessage());
            }
            this.eventCaller.fireErrorEvent(this.eventListeners, this, ioe, null);
        }
    }

    public void closeWriteOnException(IOException ioe) throws IOException {
        if (ioe instanceof EOFException) {
            if (logger.isDebugEnabled()) {
                logger.debug("EOF writing to channel " + this.channel.toString());
            }
            this.eventCaller.fireEndOfFileEvent(this.eventListeners, this);
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("error writing to channel " + this.channel.toString() + ": " + ioe.getMessage());
            }
            this.eventCaller.fireErrorEvent(this.eventListeners, this, ioe, null);
        }
    }

    @Override
    public TCByteBufferOutputStream createOutput() {
        return MESSAGE_PACKUP ? new TCDirectByteBufferOutputStream(this.buffers) : new TCByteBufferOutputStream();
    }

    @Override
    public void setTransportEstablished() {
        this.commWorker.addConnection(this, this.channel);
        this.transportEstablished.set(true);
    }

    @Override
    public boolean isTransportEstablished() {
        return this.transportEstablished.get();
    }

    static {
        logger.debug("Comms Message Batching " + (MSG_GROUPING_ENABLED ? "enabled" : "disabled"));
    }

    protected class WriteContext {
        private final WireProtocolMessage message;
        private Iterator<TCByteBuffer> messageBytes;
        private TCByteBuffer current;

        WriteContext(WireProtocolMessage message) {
            this.message = message;
        }

        private void prepIfNeeded() {
            if (this.messageBytes == null) {
                this.messageBytes = this.message.prepareToSend() ? StreamSupport.stream(this.message.getEntireMessageData().spliterator(), false).map(TCByteBuffer::asReadOnlyBuffer).iterator() : Collections.emptyIterator();
                this.current = this.messageBytes.hasNext() ? this.messageBytes.next() : null;
            }
        }

        boolean done() {
            return this.messageBytes != null && this.current == null;
        }

        void writeComplete() {
            this.message.complete();
        }

        boolean isNotValid() {
            return !this.message.isValid();
        }

        long writeBuffers() {
            long bytesWritten = 0L;
            this.prepIfNeeded();
            while (this.current != null) {
                ByteBuffer buf = this.current.getNioBuffer();
                int written = TCConnectionImpl.this.bufferManager.forwardToWriteBuffer(buf);
                bytesWritten += (long)written;
                this.current.returnNioBuffer(buf);
                if (written == 0 || this.current.hasRemaining()) break;
                if (this.messageBytes.hasNext()) {
                    this.current = this.messageBytes.next();
                    continue;
                }
                this.current = null;
            }
            return bytesWritten;
        }
    }
}

