/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.message.serializer;

import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
import org.joyqueue.broker.kafka.message.KafkaBrokerMessage;
import org.joyqueue.broker.kafka.message.compressor.KafkaCompressionCodec;
import org.joyqueue.broker.kafka.message.serializer.AbstractKafkaMessageSerializer;
import org.joyqueue.broker.kafka.util.KafkaBufferUtils;
import org.joyqueue.message.BrokerMessage;

public class KafkaMessageV1Serializer
extends AbstractKafkaMessageSerializer {
    private static final int EXTENSION_V0_LENGTH = 9;
    private static final int EXTENSION_V1_LENGTH = 17;
    private static final int CURRENT_EXTENSION_LENGTH = 17;
    private static final byte CURRENT_MAGIC = 1;

    public static void writeExtension(BrokerMessage brokerMessage, KafkaBrokerMessage kafkaBrokerMessage) {
        byte[] extension = new byte[17];
        KafkaMessageV1Serializer.writeExtensionMagic(extension, (byte)1);
        KafkaMessageV1Serializer.writeExtensionTimestamp(extension, kafkaBrokerMessage.getTimestamp());
        KafkaMessageV1Serializer.writeExtensionAttribute(extension, kafkaBrokerMessage.getAttribute());
        brokerMessage.setExtension(extension);
    }

    public static void readExtension(BrokerMessage brokerMessage, KafkaBrokerMessage kafkaBrokerMessage) {
        byte[] extension = brokerMessage.getExtension();
        if (ArrayUtils.isEmpty((byte[])extension)) {
            return;
        }
        if (extension.length == 9) {
            kafkaBrokerMessage.setTimestamp(KafkaMessageV1Serializer.readExtensionTimestamp(extension));
        } else if (extension.length == 17) {
            kafkaBrokerMessage.setTimestamp(KafkaMessageV1Serializer.readExtensionTimestamp(extension));
            kafkaBrokerMessage.setAttribute(KafkaMessageV1Serializer.readExtensionAttribute(extension));
        }
    }

    public static void writeMessages(ByteBuf buffer, List<KafkaBrokerMessage> messages) throws Exception {
        for (KafkaBrokerMessage message : messages) {
            KafkaMessageV1Serializer.writeMessage(buffer, message);
        }
    }

    public static void writeMessage(ByteBuf buffer, KafkaBrokerMessage message) throws Exception {
        buffer.writeLong(message.getOffset());
        int startIndex = buffer.writerIndex();
        buffer.writeInt(0);
        buffer.writeInt(0);
        buffer.writeByte(1);
        buffer.writeByte((int)message.getAttribute());
        buffer.writeLong(message.getTimestamp());
        KafkaBufferUtils.writeBytes(message.getKey(), buffer);
        KafkaBufferUtils.writeBytes(message.getValue(), buffer);
        int length = buffer.writerIndex() - startIndex;
        byte[] bytes = new byte[length];
        buffer.getBytes(startIndex, bytes);
        long crc = KafkaBufferUtils.crc32(bytes, 8, bytes.length - 4 - 4);
        buffer.setInt(startIndex, length - 4);
        buffer.setInt(startIndex + 4, (int)(crc & 0xFFFFFFFFL));
    }

    public static List<KafkaBrokerMessage> readMessages(ByteBuffer buffer) throws Exception {
        LinkedList result = Lists.newLinkedList();
        while (buffer.hasRemaining()) {
            KafkaBrokerMessage message = KafkaMessageV1Serializer.readMessage(buffer);
            result.add(message);
        }
        return result;
    }

    public static KafkaBrokerMessage readMessage(ByteBuffer buffer) throws Exception {
        byte attribute = buffer.get(17);
        KafkaCompressionCodec compressionCodec = KafkaCompressionCodec.valueOf(KafkaMessageV1Serializer.getCompressionCodecType(attribute));
        if (!compressionCodec.equals((Object)KafkaCompressionCodec.NoCompressionCodec)) {
            buffer.position(34);
            buffer = ByteBuffer.wrap(KafkaMessageV1Serializer.decompress(compressionCodec, buffer, (byte)1));
        }
        return KafkaMessageV1Serializer.doReadMessage(buffer);
    }

    protected static KafkaBrokerMessage doReadMessage(ByteBuffer buffer) throws Exception {
        KafkaBrokerMessage message = new KafkaBrokerMessage();
        message.setOffset(buffer.getLong());
        message.setSize(buffer.getInt());
        message.setCrc(buffer.getInt());
        message.setMagic(buffer.get());
        message.setAttribute(buffer.get());
        message.setAttribute((short)0);
        message.setTimestamp(buffer.getLong());
        message.setKey(KafkaBufferUtils.readBytes(buffer));
        message.setValue(KafkaBufferUtils.readBytes(buffer));
        return message;
    }
}

