/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import com.ning.compress.lzf.LZFOutputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamHeader;
import org.apache.cassandra.streaming.StreamOutSession;
import org.apache.cassandra.streaming.StreamReply;
import org.apache.cassandra.streaming.StreamReplyVerbHandler;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStreamTask
extends WrappedRunnable {
    private static final Logger logger = LoggerFactory.getLogger(FileStreamTask.class);
    public static final int CHUNK_SIZE = 65536;
    public static final int MAX_CONNECT_ATTEMPTS = 4;
    protected final StreamHeader header;
    protected final InetAddress to;
    protected Socket socket;
    private OutputStream output;
    private OutputStream compressedoutput;
    private DataInputStream input;
    private final byte[] transferBuffer = new byte[65536];
    protected final Throttle throttle;
    private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
    protected final StreamingMetrics metrics;

    public FileStreamTask(StreamHeader header, InetAddress to) {
        this.header = header;
        this.to = to;
        this.throttle = new Throttle(this.toString(), new Throttle.ThroughputFunction(){

            @Override
            public int targetThroughput() {
                if (DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() < 1) {
                    return 0;
                }
                int totalBytesPerMS = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024 * 1024 / 8 / 1000;
                return totalBytesPerMS / Math.max(1, (int)StreamingMetrics.activeStreamsOutbound.count());
            }
        });
        this.metrics = StreamingMetrics.get(to);
    }

    @Override
    public void runMayThrow() throws IOException {
        try {
            this.connectAttempt();
            this.stream();
            StreamOutSession session = StreamOutSession.get(this.to, this.header.sessionId);
            if (session == null) {
                logger.info("Found no stream out session at end of file stream task - this is expected if the receiver went down");
            } else if (session.getFiles().size() == 0) {
                this.receiveReply();
                logger.info("Finished streaming session to {}", (Object)this.to);
            }
        }
        catch (IOException e) {
            StreamOutSession session = StreamOutSession.get(this.to, this.header.sessionId);
            if (session != null) {
                session.close(false);
            }
            throw e;
        }
        finally {
            block15: {
                try {
                    this.close();
                }
                catch (IOException e) {
                    if (!logger.isDebugEnabled()) break block15;
                    logger.debug("error closing socket", (Throwable)e);
                }
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Done streaming " + this.header.file);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void stream() throws IOException {
        ByteBuffer headerBuffer = MessagingService.instance().constructStreamHeader(this.header, false, MessagingService.instance().getVersion(this.to));
        this.output.write(ByteBufferUtil.getArray(headerBuffer));
        if (this.header.file == null) {
            return;
        }
        RandomAccessReader file = RandomAccessReader.open(new File(this.header.file.getFilename()), true);
        this.compressedoutput = new LZFOutputStream(this.output);
        StreamingMetrics.activeStreamsOutbound.inc();
        try {
            long totalBytesTransferred = 0L;
            for (Pair<Long, Long> section : this.header.file.sections) {
                file.seek((Long)section.left);
                long length = (Long)section.right - (Long)section.left;
                long bytesTransferred = 0L;
                while (bytesTransferred < length) {
                    long lastWrite = this.write(file, length, bytesTransferred);
                    bytesTransferred += lastWrite;
                    totalBytesTransferred += lastWrite;
                    this.header.file.progress += lastWrite;
                }
                this.compressedoutput.flush();
                if (!logger.isDebugEnabled()) continue;
                logger.debug("Bytes transferred " + bytesTransferred + "/" + this.header.file.size);
            }
            StreamingMetrics.totalOutgoingBytes.inc(totalBytesTransferred);
            this.metrics.outgoingBytes.inc(totalBytesTransferred);
            this.receiveReply();
        }
        finally {
            StreamingMetrics.activeStreamsOutbound.dec();
            FileUtils.closeQuietly(file);
        }
    }

    protected void receiveReply() throws IOException {
        MessagingService.validateMagic(this.input.readInt());
        String id = this.input.readUTF();
        this.input.readInt();
        MessageIn<StreamReply> message = MessageIn.read(this.input, 5, id);
        assert (message.verb == MessagingService.Verb.STREAM_REPLY) : "Non-reply message received on stream socket";
        this.handler.doVerb(message, id);
    }

    protected long write(RandomAccessReader reader, long length, long bytesTransferred) throws IOException {
        int toTransfer = (int)Math.min(65536L, length - bytesTransferred);
        reader.readFully(this.transferBuffer, 0, toTransfer);
        this.compressedoutput.write(this.transferBuffer, 0, toTransfer);
        this.throttle.throttleDelta(toTransfer);
        return toTransfer;
    }

    private void connectAttempt() throws IOException {
        int attempts = 0;
        while (true) {
            try {
                this.socket = MessagingService.instance().getConnectionPool(this.to).newSocket();
                this.socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
                this.output = this.socket.getOutputStream();
                this.input = new DataInputStream(this.socket.getInputStream());
                break;
            }
            catch (IOException e) {
                if (++attempts >= 4) {
                    throw e;
                }
                long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2.0, attempts);
                logger.warn("Failed attempt " + attempts + " to connect to " + this.to + " to stream " + this.header.file + ". Retrying in " + waitms + " ms. (" + e + ")");
                try {
                    Thread.sleep(waitms);
                }
                catch (InterruptedException wtf) {
                    throw new RuntimeException(wtf);
                }
            }
        }
    }

    protected void close() throws IOException {
        if (this.output != null) {
            this.output.close();
        }
    }

    public String toString() {
        return String.format("FileStreamTask(session=%s, to=%s)", this.header.sessionId, this.to);
    }
}

