package com.github.ambry.messageformat;

import com.github.ambry.messageformat.MessageFormatRecord;
import com.github.ambry.network.Send;
import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.Callback;
import com.github.ambry.store.MessageReadSet;
import com.github.ambry.store.StoreKey;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.ByteBufferOutputStream;
import com.github.ambry.utils.SystemTime;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ambry/messageformat/MessageFormatSend.class */
public class MessageFormatSend implements Send {
    private MessageReadSet readSet;
    private MessageFormatFlags flag;
    private ArrayList<SendInfo> sendInfoList;
    private ArrayList<MessageMetadata> messageMetadataList;
    private long sizeWritten;
    private int currentWriteIndex;
    private long sizeWrittenFromCurrentIndex;
    private StoreKeyFactory storeKeyFactory;
    private static final int BUFFERED_INPUT_STREAM_BUFFER_SIZE = 256;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private long totalSizeToWrite = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/ambry/messageformat/MessageFormatSend$SendInfo.class */
    public class SendInfo {
        private long relativeOffset;
        private long sizeToSend;

        public SendInfo(long j, long j2) {
            this.relativeOffset = j;
            this.sizeToSend = j2;
        }

        public long relativeOffset() {
            return this.relativeOffset;
        }

        public long sizetoSend() {
            return this.sizeToSend;
        }
    }

    public MessageFormatSend(MessageReadSet messageReadSet, MessageFormatFlags messageFormatFlags, MessageFormatMetrics messageFormatMetrics, StoreKeyFactory storeKeyFactory, boolean z) throws IOException, MessageFormatException {
        this.readSet = messageReadSet;
        this.flag = messageFormatFlags;
        this.storeKeyFactory = storeKeyFactory;
        long milliseconds = SystemTime.getInstance().milliseconds();
        calculateOffsets(z);
        messageFormatMetrics.calculateOffsetMessageFormatSendTime.update(SystemTime.getInstance().milliseconds() - milliseconds);
        this.sizeWritten = 0L;
        this.currentWriteIndex = 0;
        this.sizeWrittenFromCurrentIndex = 0L;
    }

    private void calculateOffsets(boolean z) throws IOException, MessageFormatException {
        try {
            int count = this.readSet.count();
            this.sendInfoList = new ArrayList<>(count);
            this.messageMetadataList = new ArrayList<>(count);
            this.logger.trace("Calculate offsets of messages for one partition, MessageFormatFlag : {} number of messages : {}", this.flag, Integer.valueOf(count));
            for (int i = 0; i < count; i++) {
                if (this.flag == MessageFormatFlags.All) {
                    this.sendInfoList.add(i, new SendInfo(0L, this.readSet.sizeInBytes(i)));
                    this.messageMetadataList.add(i, null);
                    this.totalSizeToWrite += this.readSet.sizeInBytes(i);
                    if (z) {
                        this.readSet.doPrefetch(i, 0L, this.readSet.sizeInBytes(i));
                    }
                } else {
                    long milliseconds = SystemTime.getInstance().milliseconds();
                    BufferedInputStream bufferedInputStream = new BufferedInputStream(new MessageReadSetIndexInputStream(this.readSet, i, 0), BUFFERED_INPUT_STREAM_BUFFER_SIZE);
                    byte[] bArr = new byte[2];
                    bufferedInputStream.read(bArr, 0, 2);
                    short s = ByteBuffer.wrap(bArr).getShort();
                    if (!MessageFormatRecord.isValidHeaderVersion(s)) {
                        throw new MessageFormatException("Version not known while reading message - version " + ((int) s) + ", StoreKey " + this.readSet.getKeyAt(i), MessageFormatErrorCodes.Unknown_Format_Version);
                    }
                    this.logger.trace("Calculate offsets, read and verify header version time: {}", Long.valueOf(SystemTime.getInstance().milliseconds() - milliseconds));
                    long milliseconds2 = SystemTime.getInstance().milliseconds();
                    byte[] bArr2 = new byte[MessageFormatRecord.getHeaderSizeForVersion(s)];
                    bufferedInputStream.read(bArr2, 2, bArr2.length - 2);
                    ByteBuffer wrap = ByteBuffer.wrap(bArr2);
                    wrap.putShort(s);
                    wrap.rewind();
                    MessageFormatRecord.MessageHeader_Format messageHeader = MessageFormatRecord.getMessageHeader(s, wrap);
                    messageHeader.verifyHeader();
                    this.logger.trace("Calculate offsets, read and verify header time: {}", Long.valueOf(SystemTime.getInstance().milliseconds() - milliseconds2));
                    long milliseconds3 = SystemTime.getInstance().milliseconds();
                    StoreKey storeKey = this.storeKeyFactory.getStoreKey(new DataInputStream(bufferedInputStream));
                    if (storeKey.compareTo(this.readSet.getKeyAt(i)) != 0) {
                        throw new MessageFormatException("Id mismatch between metadata and store - metadataId " + this.readSet.getKeyAt(i) + " storeId " + storeKey, MessageFormatErrorCodes.Store_Key_Id_MisMatch);
                    }
                    this.logger.trace("Calculate offsets, read and verify storeKey time: {}", Long.valueOf(SystemTime.getInstance().milliseconds() - milliseconds3));
                    long milliseconds4 = SystemTime.getInstance().milliseconds();
                    if (this.flag == MessageFormatFlags.BlobProperties) {
                        this.sendInfoList.add(i, new SendInfo(messageHeader.getBlobPropertiesRecordRelativeOffset(), messageHeader.getBlobPropertiesRecordSize()));
                        this.messageMetadataList.add(null);
                        if (z) {
                            this.readSet.doPrefetch(i, messageHeader.getBlobPropertiesRecordRelativeOffset(), messageHeader.getBlobPropertiesRecordSize());
                        }
                        this.totalSizeToWrite += messageHeader.getBlobPropertiesRecordSize();
                        this.logger.trace("Calculate offsets, get total size of blob properties time: {}", Long.valueOf(SystemTime.getInstance().milliseconds() - milliseconds4));
                        this.logger.trace("Sending blob properties for message relativeOffset : {} size : {}", Long.valueOf(this.sendInfoList.get(i).relativeOffset()), Long.valueOf(this.sendInfoList.get(i).sizetoSend()));
                    } else if (this.flag == MessageFormatFlags.BlobUserMetadata) {
                        this.messageMetadataList.add(messageHeader.hasEncryptionKeyRecord() ? new MessageMetadata(extractEncryptionKey(i, messageHeader.getBlobEncryptionKeyRecordRelativeOffset(), messageHeader.getBlobEncryptionKeyRecordSize())) : null);
                        this.sendInfoList.add(i, new SendInfo(messageHeader.getUserMetadataRecordRelativeOffset(), messageHeader.getUserMetadataRecordSize()));
                        if (z) {
                            this.readSet.doPrefetch(i, messageHeader.getUserMetadataRecordRelativeOffset(), messageHeader.getUserMetadataRecordSize());
                        }
                        this.totalSizeToWrite += messageHeader.getUserMetadataRecordSize();
                        this.logger.trace("Calculate offsets, get total size of user metadata time: {}", Long.valueOf(SystemTime.getInstance().milliseconds() - milliseconds4));
                        this.logger.trace("Sending user metadata for message relativeOffset : {} size : {}", Long.valueOf(this.sendInfoList.get(i).relativeOffset()), Long.valueOf(this.sendInfoList.get(i).sizetoSend()));
                    } else if (this.flag == MessageFormatFlags.BlobInfo) {
                        this.messageMetadataList.add(messageHeader.hasEncryptionKeyRecord() ? new MessageMetadata(extractEncryptionKey(i, messageHeader.getBlobEncryptionKeyRecordRelativeOffset(), messageHeader.getBlobEncryptionKeyRecordSize())) : null);
                        this.sendInfoList.add(i, new SendInfo(messageHeader.getBlobPropertiesRecordRelativeOffset(), messageHeader.getBlobPropertiesRecordSize() + messageHeader.getUserMetadataRecordSize()));
                        if (z) {
                            this.readSet.doPrefetch(i, messageHeader.getBlobPropertiesRecordRelativeOffset(), messageHeader.getBlobPropertiesRecordSize() + messageHeader.getUserMetadataRecordSize());
                        }
                        this.totalSizeToWrite += messageHeader.getBlobPropertiesRecordSize() + messageHeader.getUserMetadataRecordSize();
                        this.logger.trace("Calculate offsets, get total size of blob info time: {}", Long.valueOf(SystemTime.getInstance().milliseconds() - milliseconds4));
                        this.logger.trace("Sending blob info (blob properties + user metadata) for message relativeOffset : {} size : {}", Long.valueOf(this.sendInfoList.get(i).relativeOffset()), Long.valueOf(this.sendInfoList.get(i).sizetoSend()));
                    } else {
                        if (this.flag != MessageFormatFlags.Blob) {
                            throw new MessageFormatException("Unknown flag in request " + this.flag, MessageFormatErrorCodes.IO_Error);
                        }
                        this.messageMetadataList.add(messageHeader.hasEncryptionKeyRecord() ? new MessageMetadata(extractEncryptionKey(i, messageHeader.getBlobEncryptionKeyRecordRelativeOffset(), messageHeader.getBlobEncryptionKeyRecordSize())) : null);
                        this.sendInfoList.add(i, new SendInfo(messageHeader.getBlobRecordRelativeOffset(), messageHeader.getBlobRecordSize()));
                        if (z) {
                            this.readSet.doPrefetch(i, messageHeader.getBlobRecordRelativeOffset(), messageHeader.getBlobRecordSize());
                        }
                        this.totalSizeToWrite += messageHeader.getBlobRecordSize();
                        this.logger.trace("Calculate offsets, get total size of blob time: {}", Long.valueOf(SystemTime.getInstance().milliseconds() - milliseconds4));
                        this.logger.trace("Sending data for message relativeOffset : {} size : {}", Long.valueOf(this.sendInfoList.get(i).relativeOffset()), Long.valueOf(this.sendInfoList.get(i).sizetoSend()));
                    }
                }
            }
        } catch (IOException e) {
            this.logger.trace("IOError when calculating offsets");
            throw new MessageFormatException("IOError when calculating offsets ", e, MessageFormatErrorCodes.IO_Error);
        }
    }

    private ByteBuffer extractEncryptionKey(int i, int i2, int i3) throws IOException, MessageFormatException {
        ByteBuffer allocate = ByteBuffer.allocate(i3);
        this.readSet.writeTo(i, Channels.newChannel((OutputStream) new ByteBufferOutputStream(allocate)), i2, i3);
        allocate.flip();
        return MessageFormatRecord.deserializeBlobEncryptionKey(new ByteBufferInputStream(allocate));
    }

    public List<MessageMetadata> getMessageMetadataList() {
        return this.messageMetadataList;
    }

    public long writeTo(WritableByteChannel writableByteChannel) throws IOException {
        long j = 0;
        if (!isSendComplete()) {
            j = this.readSet.writeTo(this.currentWriteIndex, writableByteChannel, this.sendInfoList.get(this.currentWriteIndex).relativeOffset() + this.sizeWrittenFromCurrentIndex, this.sendInfoList.get(this.currentWriteIndex).sizetoSend() - this.sizeWrittenFromCurrentIndex);
            this.logger.trace("writeindex {} relativeOffset {} maxSize {} written {}", new Object[]{Integer.valueOf(this.currentWriteIndex), Long.valueOf(this.sendInfoList.get(this.currentWriteIndex).relativeOffset() + this.sizeWrittenFromCurrentIndex), Long.valueOf(this.sendInfoList.get(this.currentWriteIndex).sizetoSend() - this.sizeWrittenFromCurrentIndex), Long.valueOf(j)});
            this.sizeWritten += j;
            this.sizeWrittenFromCurrentIndex += j;
            this.logger.trace("size written in this loop : {} size written till now : {}", Long.valueOf(j), Long.valueOf(this.sizeWritten));
            if (this.sizeWrittenFromCurrentIndex == this.sendInfoList.get(this.currentWriteIndex).sizetoSend()) {
                this.currentWriteIndex++;
                this.sizeWrittenFromCurrentIndex = 0L;
            }
        }
        return j;
    }

    public void writeTo(AsyncWritableChannel asyncWritableChannel, Callback<Long> callback) {
        this.readSet.writeTo(asyncWritableChannel, callback);
    }

    public boolean isSendComplete() {
        return this.totalSizeToWrite == this.sizeWritten;
    }

    public long sizeInBytes() {
        return this.totalSizeToWrite;
    }
}
