/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDeserializationTask;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.ProtocolHeader;
import org.apache.cassandra.net.SelectionKeyHandler;
import org.apache.cassandra.net.SelectorManager;
import org.apache.cassandra.net.TcpConnectionManager;
import org.apache.cassandra.net.io.FastSerializer;
import org.apache.cassandra.net.io.ISerializer;
import org.apache.cassandra.net.io.ProtocolState;
import org.apache.cassandra.net.io.StartState;
import org.apache.cassandra.net.io.TcpReader;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;

public class TcpConnection
extends SelectionKeyHandler
implements Comparable {
    private static Logger logger_ = Logger.getLogger(TcpConnection.class);
    private static ISerializer serializer_ = new FastSerializer();
    private SocketChannel socketChannel_;
    private SelectionKey key_;
    private TcpConnectionManager pool_;
    private boolean isIncoming_ = false;
    private TcpReader tcpReader_;
    private ReadWorkItem readWork_ = new ReadWorkItem();
    private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
    private EndPoint localEp_;
    private EndPoint remoteEp_;
    boolean inUse_ = false;
    private boolean bStream_ = false;
    private Lock lock_;
    private Condition condition_;

    TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException {
        this.socketChannel_ = SocketChannel.open();
        this.socketChannel_.configureBlocking(false);
        this.pool_ = pool;
        this.localEp_ = from;
        this.remoteEp_ = to;
        this.key_ = !this.socketChannel_.connect(this.remoteEp_.getInetAddress()) ? SelectorManager.getSelectorManager().register(this.socketChannel_, this, 8) : SelectorManager.getSelectorManager().register(this.socketChannel_, this, 1);
    }

    TcpConnection(EndPoint from, EndPoint to) throws IOException {
        this.socketChannel_ = SocketChannel.open();
        this.socketChannel_.configureBlocking(false);
        this.localEp_ = from;
        this.remoteEp_ = to;
        this.key_ = !this.socketChannel_.connect(this.remoteEp_.getInetAddress()) ? SelectorManager.getSelectorManager().register(this.socketChannel_, this, 8) : SelectorManager.getSelectorManager().register(this.socketChannel_, this, 1);
        this.bStream_ = true;
        this.lock_ = new ReentrantLock();
        this.condition_ = this.lock_.newCondition();
    }

    static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException {
        TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
        tcpConnection.registerReadInterest();
    }

    private void registerReadInterest() throws IOException {
        this.key_ = SelectorManager.getSelectorManager().register(this.socketChannel_, this, 1);
    }

    TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException {
        this.socketChannel_ = socketChannel;
        this.socketChannel_.configureBlocking(false);
        this.isIncoming_ = isIncoming;
        this.localEp_ = localEp;
    }

    EndPoint getLocalEp() {
        return this.localEp_;
    }

    public void setLocalEp(EndPoint localEp) {
        this.localEp_ = localEp;
    }

    public EndPoint getEndPoint() {
        return this.remoteEp_;
    }

    public boolean isIncoming() {
        return this.isIncoming_;
    }

    public SocketChannel getSocketChannel() {
        return this.socketChannel_;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(Message message) throws IOException {
        byte[] data = serializer_.serialize(message);
        if (data.length > 0) {
            boolean listening = !message.getFrom().equals(EndPoint.sentinelLocalEndPoint_);
            ByteBuffer buffer = MessagingService.packIt(data, false, false, listening);
            TcpConnection tcpConnection = this;
            synchronized (tcpConnection) {
                if (!this.pendingWrites_.isEmpty() || !this.socketChannel_.isConnected()) {
                    this.pendingWrites_.add(buffer);
                    return;
                }
                this.socketChannel_.write(buffer);
                if (buffer.remaining() > 0) {
                    this.pendingWrites_.add(buffer);
                    TcpConnection.turnOnInterestOps(this.key_, 4);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stream(File file, long startPosition, long endPosition) throws IOException, InterruptedException {
        if (!this.bStream_) {
            throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
        }
        this.lock_.lock();
        try {
            int limit = 0x4000000;
            long total = endPosition - startPosition;
            long bytesWritten = 0L;
            RandomAccessFile raf = new RandomAccessFile(file, "r");
            FileChannel fc = raf.getChannel();
            long waitTime = 2L;
            int retry = 0;
            while (!this.socketChannel_.isConnected()) {
                if (retry == 3) {
                    throw new IOException("Unable to connect to " + this.remoteEp_ + " after " + retry + " attempts.");
                }
                this.condition_.await(waitTime, TimeUnit.SECONDS);
                ++retry;
            }
            while (bytesWritten < total) {
                if (startPosition == 0L) {
                    ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);
                    this.socketChannel_.write(buffer);
                    if (buffer.remaining() > 0) {
                        this.pendingWrites_.add(buffer);
                        TcpConnection.turnOnInterestOps(this.key_, 4);
                        this.condition_.await();
                    }
                }
                long bytesTransferred = fc.transferTo(startPosition, limit, this.socketChannel_);
                if (logger_.isDebugEnabled()) {
                    logger_.debug((Object)("Bytes transferred " + bytesTransferred));
                }
                startPosition += bytesTransferred;
                if (bytesTransferred >= (long)limit || (bytesWritten += bytesTransferred) == total) continue;
                TcpConnection.turnOnInterestOps(this.key_, 4);
                this.condition_.await();
            }
        }
        finally {
            this.lock_.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resumeStreaming() {
        if (!this.bStream_) {
            return;
        }
        this.lock_.lock();
        try {
            this.condition_.signal();
        }
        finally {
            this.lock_.unlock();
        }
    }

    public void close() {
        this.inUse_ = false;
        if (this.pool_.contains(this)) {
            this.pool_.decUsed();
        }
    }

    public boolean isConnected() {
        return this.socketChannel_.isConnected();
    }

    public boolean equals(Object o) {
        if (!(o instanceof TcpConnection)) {
            return false;
        }
        TcpConnection rhs = (TcpConnection)o;
        return this.localEp_.equals(rhs.localEp_) && this.remoteEp_.equals(rhs.remoteEp_);
    }

    public int hashCode() {
        return (this.localEp_ + ":" + this.remoteEp_).hashCode();
    }

    public String toString() {
        return this.socketChannel_.toString();
    }

    void closeSocket() {
        logger_.warn((Object)("Closing down connection " + this.socketChannel_ + " with " + this.pendingWrites_.size() + " writes remaining."));
        if (this.pool_ != null) {
            this.pool_.removeConnection(this);
        }
        this.cancel(this.key_);
        this.pendingWrites_.clear();
    }

    void errorClose() {
        logger_.warn((Object)("Closing down connection " + this.socketChannel_));
        this.pendingWrites_.clear();
        this.cancel(this.key_);
        this.pendingWrites_.clear();
        if (this.pool_ != null) {
            this.pool_.removeConnection(this);
        }
    }

    private void cancel(SelectionKey key) {
        if (key != null) {
            key.cancel();
            try {
                key.channel().close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void connect(SelectionKey key) {
        block7: {
            TcpConnection.turnOffInterestOps(key, 8);
            try {
                if (this.socketChannel_.finishConnect()) {
                    TcpConnection.turnOnInterestOps(key, 1);
                    TcpConnection tcpConnection = this;
                    synchronized (tcpConnection) {
                        if (!this.pendingWrites_.isEmpty()) {
                            TcpConnection.turnOnInterestOps(this.key_, 4);
                        }
                    }
                    this.resumeStreaming();
                    break block7;
                }
                logger_.error((Object)"Closing connection because socket channel could not finishConnect.");
                this.errorClose();
            }
            catch (IOException e) {
                logger_.error((Object)("Encountered IOException on connection: " + this.socketChannel_), (Throwable)e);
                this.errorClose();
            }
        }
    }

    @Override
    public void write(SelectionKey key) {
        TcpConnection.turnOffInterestOps(key, 4);
        this.doPendingWrites();
        this.resumeStreaming();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doPendingWrites() {
        TcpConnection tcpConnection = this;
        synchronized (tcpConnection) {
            try {
                while (!this.pendingWrites_.isEmpty()) {
                    ByteBuffer buffer = this.pendingWrites_.peek();
                    this.socketChannel_.write(buffer);
                    if (buffer.remaining() > 0) {
                        break;
                    }
                    this.pendingWrites_.remove();
                }
            }
            catch (IOException ex) {
                logger_.error((Object)LogUtil.throwableToString(ex));
                this.errorClose();
            }
            finally {
                if (!this.pendingWrites_.isEmpty()) {
                    TcpConnection.turnOnInterestOps(this.key_, 4);
                }
            }
        }
    }

    @Override
    public void read(SelectionKey key) {
        TcpConnection.turnOffInterestOps(key, 1);
        MessagingService.getReadExecutor().execute(this.readWork_);
    }

    public int pending() {
        return this.pendingWrites_.size();
    }

    public int compareTo(Object o) {
        if (o instanceof TcpConnection) {
            return this.pendingWrites_.size() - ((TcpConnection)o).pendingWrites_.size();
        }
        throw new IllegalArgumentException();
    }

    class ReadWorkItem
    implements Runnable {
        ReadWorkItem() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (TcpConnection.this.tcpReader_ == null) {
                TcpConnection.this.tcpReader_ = new TcpReader(TcpConnection.this);
                StartState nextState = TcpConnection.this.tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
                if (nextState == null) {
                    nextState = new ProtocolState(TcpConnection.this.tcpReader_);
                    TcpConnection.this.tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
                }
                TcpConnection.this.tcpReader_.morphState(nextState);
            }
            try {
                byte[] bytes = new byte[]{};
                while ((bytes = TcpConnection.this.tcpReader_.read()).length > 0) {
                    ProtocolHeader pH = TcpConnection.this.tcpReader_.getProtocolHeader();
                    if (!pH.isStreamingMode_) {
                        if (TcpConnection.this.remoteEp_ == null) {
                            int port = pH.isListening_ ? DatabaseDescriptor.getStoragePort() : 5555;
                            TcpConnection.this.remoteEp_ = new EndPoint(TcpConnection.this.socketChannel_.socket().getInetAddress().getHostAddress(), port);
                            TcpConnection.this.pool_ = MessagingService.getConnectionPool(TcpConnection.this.localEp_, TcpConnection.this.remoteEp_);
                            TcpConnection.this.pool_.addToPool(TcpConnection.this);
                        }
                        MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(pH.serializerType_, bytes));
                        TcpConnection.this.tcpReader_.resetState();
                        continue;
                    }
                    MessagingService.setStreamingMode(false);
                    TcpConnection.this.closeSocket();
                }
            }
            catch (IOException ex) {
                this.handleException(ex);
            }
            catch (Throwable th) {
                this.handleException(th);
            }
            finally {
                if (TcpConnection.this.key_.isValid()) {
                    SelectionKeyHandler.turnOnInterestOps(TcpConnection.this.key_, 1);
                }
            }
        }

        private void handleException(Throwable th) {
            logger_.warn((Object)("Problem reading from socket connected to : " + TcpConnection.this.socketChannel_));
            logger_.warn((Object)LogUtil.throwableToString(th));
            TcpConnection.this.errorClose();
        }
    }
}

