/*
 * Decompiled with CFR 0.152.
 */
package com.github.ambry.messageformat;

import com.github.ambry.messageformat.MessageFormatErrorCodes;
import com.github.ambry.messageformat.MessageFormatException;
import com.github.ambry.messageformat.MessageFormatFlags;
import com.github.ambry.messageformat.MessageFormatMetrics;
import com.github.ambry.messageformat.MessageFormatRecord;
import com.github.ambry.messageformat.MessageMetadata;
import com.github.ambry.messageformat.MessageReadSetIndexInputStream;
import com.github.ambry.network.Send;
import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.Callback;
import com.github.ambry.store.MessageReadSet;
import com.github.ambry.store.StoreKey;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.ByteBufferOutputStream;
import com.github.ambry.utils.SystemTime;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageFormatSend
implements Send {
    private MessageReadSet readSet;
    private MessageFormatFlags flag;
    private ArrayList<SendInfo> sendInfoList;
    private ArrayList<MessageMetadata> messageMetadataList;
    private long totalSizeToWrite;
    private long sizeWritten;
    private int currentWriteIndex;
    private long sizeWrittenFromCurrentIndex;
    private StoreKeyFactory storeKeyFactory;
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final int BUFFERED_INPUT_STREAM_BUFFER_SIZE = 256;

    public MessageFormatSend(MessageReadSet readSet, MessageFormatFlags flag, MessageFormatMetrics metrics, StoreKeyFactory storeKeyFactory, boolean enableDataPrefetch) throws IOException, MessageFormatException {
        this.readSet = readSet;
        this.flag = flag;
        this.storeKeyFactory = storeKeyFactory;
        this.totalSizeToWrite = 0L;
        long startTime = SystemTime.getInstance().milliseconds();
        this.calculateOffsets(enableDataPrefetch);
        metrics.calculateOffsetMessageFormatSendTime.update(SystemTime.getInstance().milliseconds() - startTime);
        this.sizeWritten = 0L;
        this.currentWriteIndex = 0;
        this.sizeWrittenFromCurrentIndex = 0L;
    }

    private void calculateOffsets(boolean enableDataPrefetch) throws IOException, MessageFormatException {
        try {
            int messageCount = this.readSet.count();
            this.sendInfoList = new ArrayList(messageCount);
            this.messageMetadataList = new ArrayList(messageCount);
            this.logger.trace("Calculate offsets of messages for one partition, MessageFormatFlag : {} number of messages : {}", (Object)this.flag, (Object)messageCount);
            for (int i = 0; i < messageCount; ++i) {
                if (this.flag == MessageFormatFlags.All) {
                    this.sendInfoList.add(i, new SendInfo(0L, this.readSet.sizeInBytes(i)));
                    this.messageMetadataList.add(i, null);
                    this.totalSizeToWrite += this.readSet.sizeInBytes(i);
                    if (!enableDataPrefetch) continue;
                    this.readSet.doPrefetch(i, 0L, this.readSet.sizeInBytes(i));
                    continue;
                }
                long startTime = SystemTime.getInstance().milliseconds();
                BufferedInputStream bufferedInputStream = new BufferedInputStream(new MessageReadSetIndexInputStream(this.readSet, i, 0), 256);
                byte[] headerVersionBytes = new byte[2];
                bufferedInputStream.read(headerVersionBytes, 0, 2);
                short version = ByteBuffer.wrap(headerVersionBytes).getShort();
                if (!MessageFormatRecord.isValidHeaderVersion(version)) {
                    throw new MessageFormatException("Version not known while reading message - version " + version + ", StoreKey " + this.readSet.getKeyAt(i), MessageFormatErrorCodes.Unknown_Format_Version);
                }
                this.logger.trace("Calculate offsets, read and verify header version time: {}", (Object)(SystemTime.getInstance().milliseconds() - startTime));
                startTime = SystemTime.getInstance().milliseconds();
                byte[] headerBytes = new byte[MessageFormatRecord.getHeaderSizeForVersion(version)];
                bufferedInputStream.read(headerBytes, 2, headerBytes.length - 2);
                ByteBuffer header = ByteBuffer.wrap(headerBytes);
                header.putShort(version);
                header.rewind();
                MessageFormatRecord.MessageHeader_Format headerFormat = MessageFormatRecord.getMessageHeader(version, header);
                headerFormat.verifyHeader();
                this.logger.trace("Calculate offsets, read and verify header time: {}", (Object)(SystemTime.getInstance().milliseconds() - startTime));
                startTime = SystemTime.getInstance().milliseconds();
                StoreKey storeKey = this.storeKeyFactory.getStoreKey(new DataInputStream(bufferedInputStream));
                if (storeKey.compareTo((Object)this.readSet.getKeyAt(i)) != 0) {
                    throw new MessageFormatException("Id mismatch between metadata and store - metadataId " + this.readSet.getKeyAt(i) + " storeId " + storeKey, MessageFormatErrorCodes.Store_Key_Id_MisMatch);
                }
                this.logger.trace("Calculate offsets, read and verify storeKey time: {}", (Object)(SystemTime.getInstance().milliseconds() - startTime));
                startTime = SystemTime.getInstance().milliseconds();
                if (this.flag == MessageFormatFlags.BlobProperties) {
                    this.sendInfoList.add(i, new SendInfo(headerFormat.getBlobPropertiesRecordRelativeOffset(), headerFormat.getBlobPropertiesRecordSize()));
                    this.messageMetadataList.add(null);
                    if (enableDataPrefetch) {
                        this.readSet.doPrefetch(i, (long)headerFormat.getBlobPropertiesRecordRelativeOffset(), (long)headerFormat.getBlobPropertiesRecordSize());
                    }
                    this.totalSizeToWrite += (long)headerFormat.getBlobPropertiesRecordSize();
                    this.logger.trace("Calculate offsets, get total size of blob properties time: {}", (Object)(SystemTime.getInstance().milliseconds() - startTime));
                    this.logger.trace("Sending blob properties for message relativeOffset : {} size : {}", (Object)this.sendInfoList.get(i).relativeOffset(), (Object)this.sendInfoList.get(i).sizetoSend());
                    continue;
                }
                if (this.flag == MessageFormatFlags.BlobUserMetadata) {
                    this.messageMetadataList.add(headerFormat.hasEncryptionKeyRecord() ? new MessageMetadata(this.extractEncryptionKey(i, headerFormat.getBlobEncryptionKeyRecordRelativeOffset(), headerFormat.getBlobEncryptionKeyRecordSize())) : null);
                    this.sendInfoList.add(i, new SendInfo(headerFormat.getUserMetadataRecordRelativeOffset(), headerFormat.getUserMetadataRecordSize()));
                    if (enableDataPrefetch) {
                        this.readSet.doPrefetch(i, (long)headerFormat.getUserMetadataRecordRelativeOffset(), (long)headerFormat.getUserMetadataRecordSize());
                    }
                    this.totalSizeToWrite += (long)headerFormat.getUserMetadataRecordSize();
                    this.logger.trace("Calculate offsets, get total size of user metadata time: {}", (Object)(SystemTime.getInstance().milliseconds() - startTime));
                    this.logger.trace("Sending user metadata for message relativeOffset : {} size : {}", (Object)this.sendInfoList.get(i).relativeOffset(), (Object)this.sendInfoList.get(i).sizetoSend());
                    continue;
                }
                if (this.flag == MessageFormatFlags.BlobInfo) {
                    this.messageMetadataList.add(headerFormat.hasEncryptionKeyRecord() ? new MessageMetadata(this.extractEncryptionKey(i, headerFormat.getBlobEncryptionKeyRecordRelativeOffset(), headerFormat.getBlobEncryptionKeyRecordSize())) : null);
                    this.sendInfoList.add(i, new SendInfo(headerFormat.getBlobPropertiesRecordRelativeOffset(), headerFormat.getBlobPropertiesRecordSize() + headerFormat.getUserMetadataRecordSize()));
                    if (enableDataPrefetch) {
                        this.readSet.doPrefetch(i, (long)headerFormat.getBlobPropertiesRecordRelativeOffset(), (long)(headerFormat.getBlobPropertiesRecordSize() + headerFormat.getUserMetadataRecordSize()));
                    }
                    this.totalSizeToWrite += (long)(headerFormat.getBlobPropertiesRecordSize() + headerFormat.getUserMetadataRecordSize());
                    this.logger.trace("Calculate offsets, get total size of blob info time: {}", (Object)(SystemTime.getInstance().milliseconds() - startTime));
                    this.logger.trace("Sending blob info (blob properties + user metadata) for message relativeOffset : {} size : {}", (Object)this.sendInfoList.get(i).relativeOffset(), (Object)this.sendInfoList.get(i).sizetoSend());
                    continue;
                }
                if (this.flag == MessageFormatFlags.Blob) {
                    this.messageMetadataList.add(headerFormat.hasEncryptionKeyRecord() ? new MessageMetadata(this.extractEncryptionKey(i, headerFormat.getBlobEncryptionKeyRecordRelativeOffset(), headerFormat.getBlobEncryptionKeyRecordSize())) : null);
                    this.sendInfoList.add(i, new SendInfo(headerFormat.getBlobRecordRelativeOffset(), headerFormat.getBlobRecordSize()));
                    if (enableDataPrefetch) {
                        this.readSet.doPrefetch(i, (long)headerFormat.getBlobRecordRelativeOffset(), headerFormat.getBlobRecordSize());
                    }
                    this.totalSizeToWrite += headerFormat.getBlobRecordSize();
                    this.logger.trace("Calculate offsets, get total size of blob time: {}", (Object)(SystemTime.getInstance().milliseconds() - startTime));
                    this.logger.trace("Sending data for message relativeOffset : {} size : {}", (Object)this.sendInfoList.get(i).relativeOffset(), (Object)this.sendInfoList.get(i).sizetoSend());
                    continue;
                }
                throw new MessageFormatException("Unknown flag in request " + this.flag, MessageFormatErrorCodes.IO_Error);
            }
        }
        catch (IOException e) {
            this.logger.trace("IOError when calculating offsets");
            throw new MessageFormatException("IOError when calculating offsets ", e, MessageFormatErrorCodes.IO_Error);
        }
    }

    private ByteBuffer extractEncryptionKey(int readSetIndex, int encryptionKeyRelativeOffset, int encryptionKeySize) throws IOException, MessageFormatException {
        ByteBuffer serializedEncryptionKeyRecord = ByteBuffer.allocate(encryptionKeySize);
        this.readSet.writeTo(readSetIndex, Channels.newChannel((OutputStream)new ByteBufferOutputStream(serializedEncryptionKeyRecord)), (long)encryptionKeyRelativeOffset, (long)encryptionKeySize);
        serializedEncryptionKeyRecord.flip();
        return MessageFormatRecord.deserializeBlobEncryptionKey((InputStream)new ByteBufferInputStream(serializedEncryptionKeyRecord));
    }

    public List<MessageMetadata> getMessageMetadataList() {
        return this.messageMetadataList;
    }

    public long writeTo(WritableByteChannel channel) throws IOException {
        long written = 0L;
        if (!this.isSendComplete()) {
            written = this.readSet.writeTo(this.currentWriteIndex, channel, this.sendInfoList.get(this.currentWriteIndex).relativeOffset() + this.sizeWrittenFromCurrentIndex, this.sendInfoList.get(this.currentWriteIndex).sizetoSend() - this.sizeWrittenFromCurrentIndex);
            this.logger.trace("writeindex {} relativeOffset {} maxSize {} written {}", new Object[]{this.currentWriteIndex, this.sendInfoList.get(this.currentWriteIndex).relativeOffset() + this.sizeWrittenFromCurrentIndex, this.sendInfoList.get(this.currentWriteIndex).sizetoSend() - this.sizeWrittenFromCurrentIndex, written});
            this.sizeWritten += written;
            this.sizeWrittenFromCurrentIndex += written;
            this.logger.trace("size written in this loop : {} size written till now : {}", (Object)written, (Object)this.sizeWritten);
            if (this.sizeWrittenFromCurrentIndex == this.sendInfoList.get(this.currentWriteIndex).sizetoSend()) {
                ++this.currentWriteIndex;
                this.sizeWrittenFromCurrentIndex = 0L;
            }
        }
        return written;
    }

    public void writeTo(AsyncWritableChannel channel, Callback<Long> callback) {
        this.readSet.writeTo(channel, callback);
    }

    public boolean isSendComplete() {
        return this.totalSizeToWrite == this.sizeWritten;
    }

    public long sizeInBytes() {
        return this.totalSizeToWrite;
    }

    private class SendInfo {
        private long relativeOffset;
        private long sizeToSend;

        public SendInfo(long relativeOffset, long sizeToSend) {
            this.relativeOffset = relativeOffset;
            this.sizeToSend = sizeToSend;
        }

        public long relativeOffset() {
            return this.relativeOffset;
        }

        public long sizetoSend() {
            return this.sizeToSend;
        }
    }
}

