/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.message.serializer;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
import org.joyqueue.broker.kafka.message.KafkaBrokerMessage;
import org.joyqueue.broker.kafka.message.compressor.KafkaCompressionCodec;
import org.joyqueue.broker.kafka.message.serializer.AbstractKafkaMessageSerializer;
import org.joyqueue.broker.kafka.util.KafkaBufferUtils;
import org.joyqueue.broker.kafka.util.PureJavaCrc32C;
import org.joyqueue.message.BrokerMessage;

public class KafkaMessageV2Serializer
extends AbstractKafkaMessageSerializer {
    private static final int EXTENSION_V0_LENGTH = 1;
    private static final int EXTENSION_V1_LENGTH = 17;
    private static final int CURRENT_EXTENSION_LENGTH = 17;
    private static final byte CURRENT_MAGIC = 2;

    public static void writeExtension(BrokerMessage brokerMessage, KafkaBrokerMessage kafkaBrokerMessage) {
        byte[] extension = new byte[17];
        KafkaMessageV2Serializer.writeExtensionMagic(extension, (byte)2);
        KafkaMessageV2Serializer.writeExtensionTimestamp(extension, kafkaBrokerMessage.getTimestamp());
        KafkaMessageV2Serializer.writeExtensionAttribute(extension, kafkaBrokerMessage.getAttribute());
        brokerMessage.setExtension(extension);
    }

    public static void readExtension(BrokerMessage brokerMessage, KafkaBrokerMessage kafkaBrokerMessage) {
        byte[] extension = brokerMessage.getExtension();
        if (ArrayUtils.isEmpty((byte[])extension)) {
            return;
        }
        if (extension.length == 17) {
            kafkaBrokerMessage.setTimestamp(KafkaMessageV2Serializer.readExtensionTimestamp(extension));
            kafkaBrokerMessage.setAttribute(KafkaMessageV2Serializer.readExtensionAttribute(extension));
        }
    }

    public static void writeMessages(ByteBuf buffer, List<KafkaBrokerMessage> messages) throws Exception {
        for (KafkaBrokerMessage message : messages) {
            KafkaMessageV2Serializer.writeMessage(buffer, message);
        }
    }

    public static void writeMessage(ByteBuf buffer, KafkaBrokerMessage message) throws Exception {
        buffer.writeLong(message.getOffset());
        int sizeIndex = buffer.writerIndex();
        buffer.writeInt(0);
        buffer.writeInt(-1);
        buffer.writeByte(2);
        int crcIndex = buffer.writerIndex();
        buffer.writeInt(0);
        buffer.writeShort((int)message.getAttribute());
        int offsetIndex = buffer.writerIndex();
        buffer.writeInt(message.getFlag() - 1);
        buffer.writeLong(message.getTimestamp());
        buffer.writeLong(message.getTimestamp());
        buffer.writeLong(-1L);
        buffer.writeShort(-1);
        buffer.writeInt(0);
        buffer.writeInt((int)message.getFlag());
        buffer.writeBytes(message.getValue());
        int endIndex = buffer.writerIndex();
        buffer.setInt(sizeIndex, endIndex - sizeIndex - 4);
        byte[] bytes = new byte[endIndex - crcIndex - 4];
        buffer.getBytes(crcIndex + 4, bytes);
        PureJavaCrc32C crc32c = new PureJavaCrc32C();
        crc32c.update(bytes, 0, bytes.length);
        long crc = crc32c.getValue();
        buffer.setInt(crcIndex, (int)(crc & 0xFFFFFFFFL));
    }

    public static List<KafkaBrokerMessage> readMessages(ByteBuffer buffer) throws Exception {
        long baseOffset = buffer.getLong();
        int size = buffer.getInt();
        int partitionLeaderEpoch = buffer.getInt();
        byte magic = buffer.get();
        int crc = buffer.getInt();
        short attribute = buffer.getShort();
        int lastOffsetDeltaLength = buffer.getInt();
        long firstTimestamp = buffer.getLong();
        long maxTimestamp = buffer.getLong();
        long producerId = buffer.getLong();
        short producerEpoch = buffer.getShort();
        int baseSequence = buffer.getInt();
        int messageCount = buffer.getInt();
        byte[] value = new byte[buffer.remaining()];
        buffer.get(value);
        KafkaBrokerMessage message = new KafkaBrokerMessage();
        message.setMagic(magic);
        message.setAttribute(attribute);
        message.setTimestamp(firstTimestamp);
        message.setOffset(baseOffset);
        message.setValue(value);
        message.setBatch(true);
        message.setMagic((byte)2);
        message.setFlag((short)messageCount);
        message.setCrc(crc);
        message.setTransaction(KafkaMessageV2Serializer.isTransactionl(attribute));
        message.setProducerId(producerId);
        message.setProducerEpoch(producerEpoch);
        message.setBaseSequence(baseSequence);
        return Lists.newArrayList((Object[])new KafkaBrokerMessage[]{message});
    }

    public static List<KafkaBrokerMessage> readMessages(KafkaBrokerMessage message) throws Exception {
        short attribute = message.getAttribute();
        KafkaCompressionCodec compressionType = KafkaCompressionCodec.valueOf(KafkaMessageV2Serializer.getCompressionCodecType(attribute));
        byte[] body = message.getValue();
        if (!compressionType.equals((Object)KafkaCompressionCodec.NoCompressionCodec)) {
            body = KafkaMessageV2Serializer.decompress(compressionType, ByteBuffer.wrap(body), (byte)2);
        }
        ByteBuffer bodyBuffer = ByteBuffer.wrap(body);
        ArrayList result = Lists.newArrayListWithCapacity((int)message.getFlag());
        for (int i = 0; i < message.getFlag(); ++i) {
            KafkaBrokerMessage brokerMessage = KafkaMessageV2Serializer.readMessage(message, bodyBuffer);
            result.add(brokerMessage);
        }
        return result;
    }

    protected static KafkaBrokerMessage readMessage(KafkaBrokerMessage message, ByteBuffer buffer) throws Exception {
        KafkaBrokerMessage result = new KafkaBrokerMessage();
        result.setSize(KafkaBufferUtils.readVarint(buffer));
        buffer.get();
        long timestamp = KafkaBufferUtils.readVarlong(buffer);
        result.setOffset((long)KafkaBufferUtils.readVarint(buffer) + message.getOffset());
        byte[] businessId = KafkaBufferUtils.readVarBytes(buffer);
        result.setKey(ArrayUtils.isEmpty((byte[])businessId) ? null : businessId);
        result.setValue(KafkaBufferUtils.readVarBytes(buffer));
        result.setTimestamp(timestamp + message.getTimestamp());
        result.setAttribute((short)0);
        result.setFlag(message.getFlag());
        result.setBatch(true);
        int headerCount = KafkaBufferUtils.readVarint(buffer);
        if (headerCount != 0) {
            HashMap headers = Maps.newHashMap();
            for (int i = 0; i < headerCount; ++i) {
                headers.put(KafkaBufferUtils.readVarBytes(buffer), KafkaBufferUtils.readVarBytes(buffer));
            }
            result.setHeader(headers);
        }
        return result;
    }

    public static KafkaBrokerMessage readMessage(long baseOffset, long firstTimestamp, ByteBuffer buffer) throws Exception {
        KafkaBrokerMessage message = new KafkaBrokerMessage();
        message.setSize(KafkaBufferUtils.readVarint(buffer));
        message.setAttribute(buffer.get());
        message.setTimestamp(KafkaBufferUtils.readVarlong(buffer) + firstTimestamp);
        message.setOffset((long)KafkaBufferUtils.readVarint(buffer) + baseOffset);
        message.setKey(KafkaBufferUtils.readVarBytes(buffer));
        message.setValue(KafkaBufferUtils.readVarBytes(buffer));
        message.setBatch(true);
        message.setMagic((byte)2);
        message.setCrc(0);
        int headerCount = KafkaBufferUtils.readVarint(buffer);
        if (headerCount != 0) {
            HashMap headers = Maps.newHashMap();
            for (int i = 0; i < headerCount; ++i) {
                byte[] headerKey = KafkaBufferUtils.readVarBytes(buffer);
                byte[] headerValue = KafkaBufferUtils.readVarBytes(buffer);
                headers.put(headerKey, headerValue);
            }
            message.setHeader(headers);
        }
        return message;
    }
}

