/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.streaming;

import com.google.common.base.Preconditions;
import com.google.common.collect.UnmodifiableIterator;
import java.io.IOError;
import java.io.IOException;
import java.util.Collection;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.streaming.CassandraStreamHeader;
import org.apache.cassandra.db.streaming.CassandraStreamReceiver;
import org.apache.cassandra.db.streaming.IStreamReader;
import org.apache.cassandra.exceptions.UnknownColumnException;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.TrackedDataInputPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.compress.StreamCompressionInputStream;
import org.apache.cassandra.streaming.messages.StreamMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.TimeUUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraStreamReader
implements IStreamReader {
    private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReader.class);
    protected final TableId tableId;
    protected final long estimatedKeys;
    protected final Collection<SSTableReader.PartitionPositionBounds> sections;
    protected final StreamSession session;
    protected final Version inputVersion;
    protected final long repairedAt;
    protected final TimeUUID pendingRepair;
    protected final SSTableFormat.Type format;
    protected final int sstableLevel;
    protected final SerializationHeader.Component header;
    protected final int fileSeqNum;

    public CassandraStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) {
        if (session.getPendingRepair() != null) assert (session.getPendingRepair().equals(header.pendingRepair));
        this.session = session;
        this.tableId = header.tableId;
        this.estimatedKeys = streamHeader.estimatedKeys;
        this.sections = streamHeader.sections;
        this.inputVersion = streamHeader.version;
        this.repairedAt = header.repairedAt;
        this.pendingRepair = header.pendingRepair;
        this.format = streamHeader.format;
        this.sstableLevel = streamHeader.sstableLevel;
        this.header = streamHeader.serializationHeader;
        this.fileSeqNum = header.sequenceNumber;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable {
        long totalSize = this.totalSize();
        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(this.tableId);
        if (cfs == null) {
            throw new IllegalStateException("Table " + this.tableId + " was dropped during streaming");
        }
        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.", new Object[]{this.session.planId(), this.fileSeqNum, this.session.peer, this.repairedAt, totalSize, cfs.keyspace.getName(), cfs.getTableName(), this.pendingRepair});
        StreamDeserializer deserializer = null;
        SSTableMultiWriter writer = null;
        try (StreamCompressionInputStream streamCompressionInputStream = new StreamCompressionInputStream(inputPlus, 12);){
            TrackedDataInputPlus in = new TrackedDataInputPlus(streamCompressionInputStream);
            deserializer = new StreamDeserializer(cfs.metadata(), in, this.inputVersion, this.getHeader(cfs.metadata()));
            writer = this.createWriter(cfs, totalSize, this.repairedAt, this.pendingRepair, this.format);
            String sequenceName = writer.getFilename() + '-' + this.fileSeqNum;
            long lastBytesRead = 0L;
            while (in.getBytesRead() < totalSize) {
                this.writePartition(deserializer, writer);
                long bytesRead = in.getBytesRead();
                long bytesDelta = bytesRead - lastBytesRead;
                lastBytesRead = bytesRead;
                this.session.progress(sequenceName, ProgressInfo.Direction.IN, bytesRead, bytesDelta, totalSize);
            }
            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", new Object[]{this.session.planId(), this.fileSeqNum, this.session.peer, FBUtilities.prettyPrintMemory(in.getBytesRead()), FBUtilities.prettyPrintMemory(totalSize)});
            SSTableMultiWriter sSTableMultiWriter = writer;
            return sSTableMultiWriter;
        }
        catch (Throwable e) {
            Object partitionKey = deserializer != null ? deserializer.partitionKey() : "";
            logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", new Object[]{this.session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName(), e});
            if (writer == null) throw e;
            e = writer.abort(e);
            throw e;
        }
    }

    protected SerializationHeader getHeader(TableMetadata metadata) throws UnknownColumnException {
        return this.header != null ? this.header.toHeader(metadata) : null;
    }

    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat.Type format) throws IOException {
        Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
        if (localDir == null) {
            throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
        }
        StreamReceiver streamReceiver = this.session.getAggregator(this.tableId);
        Preconditions.checkState((boolean)(streamReceiver instanceof CassandraStreamReceiver));
        LifecycleNewTracker lifecycleNewTracker = CassandraStreamReceiver.fromReceiver(this.session.getAggregator(this.tableId)).createLifecycleNewTracker();
        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, this.estimatedKeys, repairedAt, pendingRepair, false, format, this.sstableLevel, totalSize, lifecycleNewTracker, this.getHeader(cfs.metadata()));
        return writer;
    }

    protected long totalSize() {
        long size = 0L;
        for (SSTableReader.PartitionPositionBounds section : this.sections) {
            size += section.upperPosition - section.lowerPosition;
        }
        return size;
    }

    protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer) throws IOException {
        writer.append(deserializer.newPartition());
        deserializer.checkForExceptions();
    }

    public static class StreamDeserializer
    extends UnmodifiableIterator<Unfiltered>
    implements UnfilteredRowIterator {
        private final TableMetadata metadata;
        private final DataInputPlus in;
        private final SerializationHeader header;
        private final DeserializationHelper helper;
        private DecoratedKey key;
        private DeletionTime partitionLevelDeletion;
        private SSTableSimpleIterator iterator;
        private Row staticRow;
        private IOException exception;

        public StreamDeserializer(TableMetadata metadata, DataInputPlus in, Version version, SerializationHeader header) throws IOException {
            this.metadata = metadata;
            this.in = in;
            this.helper = new DeserializationHelper(metadata, version.correspondingMessagingVersion(), DeserializationHelper.Flag.PRESERVE_SIZE);
            this.header = header;
        }

        public StreamDeserializer newPartition() throws IOException {
            this.key = this.metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(this.in));
            this.partitionLevelDeletion = DeletionTime.serializer.deserialize(this.in);
            this.iterator = SSTableSimpleIterator.create(this.metadata, this.in, this.header, this.helper, this.partitionLevelDeletion);
            this.staticRow = this.iterator.readStaticRow();
            return this;
        }

        @Override
        public TableMetadata metadata() {
            return this.metadata;
        }

        @Override
        public RegularAndStaticColumns columns() {
            return this.metadata.regularAndStaticColumns();
        }

        @Override
        public boolean isReverseOrder() {
            return false;
        }

        @Override
        public DecoratedKey partitionKey() {
            return this.key;
        }

        @Override
        public DeletionTime partitionLevelDeletion() {
            return this.partitionLevelDeletion;
        }

        @Override
        public Row staticRow() {
            return this.staticRow;
        }

        @Override
        public EncodingStats stats() {
            return this.header.stats();
        }

        @Override
        public boolean hasNext() {
            try {
                return this.iterator.hasNext();
            }
            catch (IOError e) {
                if (e.getCause() != null && e.getCause() instanceof IOException) {
                    this.exception = (IOException)e.getCause();
                    return false;
                }
                throw e;
            }
        }

        @Override
        public Unfiltered next() {
            Unfiltered unfiltered = (Unfiltered)this.iterator.next();
            return this.metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW ? this.maybeMarkLocalToBeCleared((Row)unfiltered) : unfiltered;
        }

        private Row maybeMarkLocalToBeCleared(Row row) {
            return this.metadata.isCounter() ? row.markCounterLocalToBeCleared() : row;
        }

        public void checkForExceptions() throws IOException {
            if (this.exception != null) {
                throw this.exception;
            }
        }

        @Override
        public void close() {
        }
    }
}

