/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.streaming;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.cassandra.db.streaming.CassandraStreamWriter;
import org.apache.cassandra.db.streaming.CompressionInfo;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.AsyncStreamingOutputPlus;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraCompressedStreamWriter
extends CassandraStreamWriter {
    private static final int CHUNK_SIZE = 65536;
    private static final int CRC_LENGTH = 4;
    private static final Logger logger = LoggerFactory.getLogger(CassandraCompressedStreamWriter.class);
    private final CompressionInfo compressionInfo;

    public CassandraCompressedStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, StreamSession session) {
        super(sstable, sections, session);
        this.compressionInfo = compressionInfo;
    }

    @Override
    public void write(DataOutputStreamPlus output) throws IOException {
        AsyncStreamingOutputPlus out = (AsyncStreamingOutputPlus)output;
        long totalSize = this.totalSize();
        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", new Object[]{this.session.planId(), this.sstable.getFilename(), this.session.peer, this.sstable.getSSTableMetadata().repairedAt, totalSize});
        try (ChannelProxy fc = this.sstable.getDataChannel().newChannel();){
            long progress = 0L;
            List<Section> sections = this.fuseAdjacentChunks(this.compressionInfo.chunks);
            int sectionIdx = 0;
            for (Section section : sections) {
                int toTransfer;
                long length = section.end - section.start;
                logger.debug("[Stream #{}] Writing section {} with length {} to stream.", new Object[]{this.session.planId(), sectionIdx++, length});
                for (long bytesTransferred = 0L; bytesTransferred < length; bytesTransferred += (long)toTransfer) {
                    toTransfer = (int)Math.min(65536L, length - bytesTransferred);
                    long position = section.start + bytesTransferred;
                    out.writeToChannel(bufferSupplier -> {
                        ByteBuffer outBuffer = bufferSupplier.get(toTransfer);
                        long read = fc.read(outBuffer, position);
                        assert (read == (long)toTransfer) : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", read, toTransfer);
                        outBuffer.flip();
                    }, this.limiter);
                    this.session.progress(this.sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress += (long)toTransfer, totalSize);
                }
            }
            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", new Object[]{this.session.planId(), this.sstable.getFilename(), this.session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize)});
        }
    }

    @Override
    protected long totalSize() {
        long size = 0L;
        for (CompressionMetadata.Chunk chunk : this.compressionInfo.chunks) {
            size += (long)(chunk.length + 4);
        }
        return size;
    }

    private List<Section> fuseAdjacentChunks(CompressionMetadata.Chunk[] chunks) {
        if (chunks.length == 0) {
            return Collections.emptyList();
        }
        long start = chunks[0].offset;
        long end = start + (long)chunks[0].length + 4L;
        ArrayList<Section> sections = new ArrayList<Section>();
        for (int i = 1; i < chunks.length; ++i) {
            CompressionMetadata.Chunk chunk = chunks[i];
            if (chunk.offset == end) {
                end += (long)(chunk.length + 4);
                continue;
            }
            sections.add(new Section(start, end));
            start = chunk.offset;
            end = start + (long)chunk.length + 4L;
        }
        sections.add(new Section(start, end));
        return sections;
    }

    private static class Section {
        private final long start;
        private final long end;

        private Section(long start, long end) {
            this.start = start;
            this.end = end;
        }
    }
}

