/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundTcpConnectionPool;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.SnappyOutputStream;

public class OutboundTcpConnection
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class);
    private static final MessageOut CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
    private volatile boolean isStopped = false;
    private static final int OPEN_RETRY_DELAY = 100;
    private volatile BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<QueuedMessage>();
    private volatile BlockingQueue<QueuedMessage> active = new LinkedBlockingQueue<QueuedMessage>();
    private final OutboundTcpConnectionPool poolReference;
    private DataOutputStream out;
    private Socket socket;
    private volatile long completed;
    private final AtomicLong dropped = new AtomicLong();
    private int targetVersion;

    public OutboundTcpConnection(OutboundTcpConnectionPool pool) {
        super("WRITE-" + pool.endPoint());
        this.poolReference = pool;
    }

    private static boolean isLocalDC(InetAddress targetHost) {
        String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
        String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
        return remoteDC.equals(localDC);
    }

    public void enqueue(MessageOut<?> message, String id) {
        this.expireMessages();
        try {
            this.backlog.put(new QueuedMessage(message, id));
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)e);
        }
    }

    void closeSocket(boolean destroyThread) {
        this.active.clear();
        this.backlog.clear();
        this.isStopped = destroyThread;
        this.enqueue(CLOSE_SENTINEL, null);
    }

    void softCloseSocket() {
        this.enqueue(CLOSE_SENTINEL, null);
    }

    public int getTargetVersion() {
        return this.targetVersion;
    }

    @Override
    public void run() {
        while (true) {
            MessageOut<?> m;
            QueuedMessage qm;
            if ((qm = (QueuedMessage)this.active.poll()) == null) {
                try {
                    qm = this.backlog.take();
                }
                catch (InterruptedException e) {
                    throw new AssertionError((Object)e);
                }
                BlockingQueue<QueuedMessage> tmp = this.backlog;
                this.backlog = this.active;
                this.active = tmp;
            }
            if ((m = qm.message) == CLOSE_SENTINEL) {
                this.disconnect();
                if (!this.isStopped) continue;
                break;
            }
            if (qm.timestamp < System.currentTimeMillis() - m.getTimeout()) {
                this.dropped.incrementAndGet();
                continue;
            }
            if (this.socket != null || this.connect()) {
                this.writeConnected(qm);
                continue;
            }
            this.active.clear();
        }
    }

    public int getPendingMessages() {
        return this.active.size() + this.backlog.size();
    }

    public long getCompletedMesssages() {
        return this.completed;
    }

    public long getDroppedMessages() {
        return this.dropped.get();
    }

    private boolean shouldCompressConnection() {
        return DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all || DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !OutboundTcpConnection.isLocalDC(this.poolReference.endPoint());
    }

    private void writeConnected(QueuedMessage qm) {
        block11: {
            try {
                byte[] sessionBytes = qm.message.parameters.get("TraceSession");
                if (sessionBytes != null) {
                    UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
                    TraceState state = Tracing.instance().get(sessionId);
                    String message = String.format("Sending message to %s", this.poolReference.endPoint());
                    if (state == null) {
                        TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
                    } else {
                        state.trace(message);
                        if (qm.message.verb == MessagingService.Verb.REQUEST_RESPONSE) {
                            Tracing.instance().stopNonLocal(state);
                        }
                    }
                }
                OutboundTcpConnection.write(qm.message, qm.id, qm.timestamp, this.out, this.targetVersion);
                ++this.completed;
                if (this.active.peek() == null) {
                    this.out.flush();
                }
            }
            catch (Exception e) {
                this.disconnect();
                if (e instanceof IOException) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("error writing to " + this.poolReference.endPoint(), (Throwable)e);
                    }
                    if (!(e instanceof SocketException) || !qm.shouldRetry()) break block11;
                    try {
                        this.backlog.put(new RetriedQueuedMessage(qm));
                    }
                    catch (InterruptedException e1) {
                        throw new AssertionError((Object)e1);
                    }
                }
                logger.error("error writing to " + this.poolReference.endPoint(), (Throwable)e);
            }
        }
    }

    public static void write(MessageOut message, String id, long timestamp, DataOutputStream out, int version) throws IOException {
        out.writeInt(-900387334);
        if (version < 6) {
            OutboundTcpConnection.writeHeader(out, version, false);
            out.writeInt(-1);
        }
        out.writeUTF(id);
        if (version >= 6) {
            out.writeInt((int)timestamp);
        }
        message.serialize(out, version);
    }

    private static void writeHeader(DataOutputStream out, int version, boolean compressionEnabled) throws IOException {
        int header = 0;
        if (compressionEnabled) {
            header |= 4;
        }
        out.writeInt(header |= version << 8);
    }

    private void disconnect() {
        if (this.socket != null) {
            block3: {
                try {
                    this.socket.close();
                }
                catch (IOException e) {
                    if (!logger.isTraceEnabled()) break block3;
                    logger.trace("exception closing connection to " + this.poolReference.endPoint(), (Throwable)e);
                }
            }
            this.out = null;
            this.socket = null;
        }
    }

    private boolean connect() {
        if (logger.isDebugEnabled()) {
            logger.debug("attempting to connect to " + this.poolReference.endPoint());
        }
        this.targetVersion = MessagingService.instance().getVersion(this.poolReference.endPoint());
        long start = System.currentTimeMillis();
        while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout()) {
            try {
                this.socket = this.poolReference.newSocket();
                this.socket.setKeepAlive(true);
                if (OutboundTcpConnection.isLocalDC(this.poolReference.endPoint())) {
                    this.socket.setTcpNoDelay(true);
                } else {
                    this.socket.setTcpNoDelay(DatabaseDescriptor.getInterDCTcpNoDelay());
                }
                if (DatabaseDescriptor.getInternodeSendBufferSize() != null) {
                    try {
                        this.socket.setSendBufferSize(DatabaseDescriptor.getInternodeSendBufferSize());
                    }
                    catch (SocketException se) {
                        logger.warn("Failed to set send buffer size on internode socket.", (Throwable)se);
                    }
                }
                this.out = new DataOutputStream(new BufferedOutputStream(this.socket.getOutputStream(), 4096));
                if (this.targetVersion >= 6) {
                    this.out.writeInt(-900387334);
                    OutboundTcpConnection.writeHeader(this.out, this.targetVersion, this.shouldCompressConnection());
                    this.out.flush();
                    DataInputStream in = new DataInputStream(this.socket.getInputStream());
                    int maxTargetVersion = in.readInt();
                    if (this.targetVersion > maxTargetVersion) {
                        logger.debug("Target max version is {}; will reconnect with that version", (Object)maxTargetVersion);
                        MessagingService.instance().setVersion(this.poolReference.endPoint(), maxTargetVersion);
                        this.disconnect();
                        return false;
                    }
                    if (this.targetVersion < maxTargetVersion && this.targetVersion < 6) {
                        logger.trace("Detected higher max version {} (using {}); will reconnect when queued messages are done", (Object)maxTargetVersion, (Object)this.targetVersion);
                        MessagingService.instance().setVersion(this.poolReference.endPoint(), Math.min(6, maxTargetVersion));
                        this.softCloseSocket();
                    }
                    this.out.writeInt(6);
                    CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), this.out);
                    if (this.shouldCompressConnection()) {
                        this.out.flush();
                        logger.trace("Upgrading OutputStream to be compressed");
                        this.out = new DataOutputStream((OutputStream)new SnappyOutputStream((OutputStream)new BufferedOutputStream(this.socket.getOutputStream())));
                    }
                }
                return true;
            }
            catch (IOException e) {
                this.socket = null;
                if (logger.isTraceEnabled()) {
                    logger.trace("unable to connect to " + this.poolReference.endPoint(), (Throwable)e);
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e1) {
                    throw new AssertionError((Object)e1);
                }
            }
        }
        return false;
    }

    private void expireMessages() {
        QueuedMessage qm;
        while ((qm = (QueuedMessage)this.backlog.peek()) != null && qm.timestamp < System.currentTimeMillis() - qm.message.getTimeout()) {
            QueuedMessage qm2 = (QueuedMessage)this.backlog.poll();
            if (qm2 != qm) {
                if (qm2 == null) break;
                this.active.add(qm2);
                break;
            }
            this.dropped.incrementAndGet();
        }
    }

    private static class RetriedQueuedMessage
    extends QueuedMessage {
        RetriedQueuedMessage(QueuedMessage msg) {
            super(msg.message, msg.id);
        }

        @Override
        boolean shouldRetry() {
            return false;
        }
    }

    private static class QueuedMessage {
        final MessageOut<?> message;
        final String id;
        final long timestamp;

        QueuedMessage(MessageOut<?> message, String id) {
            this.message = message;
            this.id = id;
            this.timestamp = System.currentTimeMillis();
        }

        boolean shouldRetry() {
            return !MessagingService.DROPPABLE_VERBS.contains((Object)this.message.verb);
        }
    }
}

