/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.streaming;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.AsyncStreamingOutputPlus;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.async.StreamCompressionSerializer;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.memory.BufferPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraStreamWriter {
    private static final int DEFAULT_CHUNK_SIZE = 65536;
    private static final Logger logger = LoggerFactory.getLogger(CassandraStreamWriter.class);
    protected final SSTableReader sstable;
    private final LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
    protected final Collection<SSTableReader.PartitionPositionBounds> sections;
    protected final StreamManager.StreamRateLimiter limiter;
    protected final StreamSession session;

    public CassandraStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, StreamSession session) {
        this.session = session;
        this.sstable = sstable;
        this.sections = sections;
        this.limiter = StreamManager.getRateLimiter(session.peer);
    }

    public void write(DataOutputStreamPlus output) throws IOException {
        long totalSize = this.totalSize();
        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", new Object[]{this.session.planId(), this.sstable.getFilename(), this.session.peer, this.sstable.getSSTableMetadata().repairedAt, totalSize});
        AsyncStreamingOutputPlus out = (AsyncStreamingOutputPlus)output;
        try (ChannelProxy proxy = this.sstable.getDataChannel().newChannel();
             DataIntegrityMetadata.ChecksumValidator validator = new File(this.sstable.descriptor.filenameFor(Component.CRC)).exists() ? DataIntegrityMetadata.checksumValidator(this.sstable.descriptor) : null;){
            int bufferSize = validator == null ? 65536 : validator.chunkSize;
            long progress = 0L;
            for (SSTableReader.PartitionPositionBounds section : this.sections) {
                long lastBytesRead;
                long start = validator == null ? section.lowerPosition : validator.chunkStart(section.lowerPosition);
                int transferOffset = (int)(section.lowerPosition - start);
                if (validator != null) {
                    validator.seek(start);
                }
                long length = section.upperPosition - start;
                for (long bytesRead = 0L; bytesRead < length; bytesRead += lastBytesRead) {
                    int toTransfer = (int)Math.min((long)bufferSize, length - bytesRead);
                    lastBytesRead = this.write(proxy, validator, out, start, transferOffset, toTransfer, bufferSize);
                    start += lastBytesRead;
                    this.session.progress(this.sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress += lastBytesRead - (long)transferOffset, totalSize);
                    transferOffset = 0;
                }
                out.flush();
            }
            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", new Object[]{this.session.planId(), this.sstable.getFilename(), this.session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize)});
        }
    }

    protected long totalSize() {
        long size = 0L;
        for (SSTableReader.PartitionPositionBounds section : this.sections) {
            size += section.upperPosition - section.lowerPosition;
        }
        return size;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected long write(ChannelProxy proxy, DataIntegrityMetadata.ChecksumValidator validator, AsyncStreamingOutputPlus output, long start, int transferOffset, int toTransfer, int bufferSize) throws IOException {
        int minReadable = (int)Math.min((long)bufferSize, proxy.size() - start);
        ByteBuffer buffer = BufferPool.get(minReadable, BufferType.OFF_HEAP);
        try {
            int readCount = proxy.read(buffer, start);
            assert (readCount == minReadable) : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", readCount, minReadable);
            buffer.flip();
            if (validator != null) {
                validator.validate(buffer);
                buffer.flip();
            }
            buffer.position(transferOffset);
            buffer.limit(transferOffset + (toTransfer - transferOffset));
            output.writeToChannel(StreamCompressionSerializer.serialize(this.compressor, buffer, 12), this.limiter);
        }
        finally {
            BufferPool.put(buffer);
        }
        return toTransfer;
    }
}

