/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.message;

import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.kafka.message.KafkaBrokerMessage;
import org.joyqueue.broker.kafka.message.serializer.AbstractKafkaMessageSerializer;
import org.joyqueue.broker.kafka.message.serializer.KafkaMessageV0Serializer;
import org.joyqueue.broker.kafka.message.serializer.KafkaMessageV1Serializer;
import org.joyqueue.broker.kafka.message.serializer.KafkaMessageV2Serializer;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.SourceType;

public class KafkaMessageSerializer
extends AbstractKafkaMessageSerializer {
    public static void writeExtension(BrokerMessage brokerMessage, KafkaBrokerMessage kafkaBrokerMessage) {
        if (kafkaBrokerMessage.getMagic() == 0) {
            KafkaMessageV0Serializer.writeExtension(brokerMessage, kafkaBrokerMessage);
        } else if (kafkaBrokerMessage.getMagic() == 1) {
            KafkaMessageV1Serializer.writeExtension(brokerMessage, kafkaBrokerMessage);
        } else if (kafkaBrokerMessage.getMagic() == 2) {
            KafkaMessageV2Serializer.writeExtension(brokerMessage, kafkaBrokerMessage);
        } else {
            throw new UnsupportedOperationException(String.format("writeExtension unsupported magic, magic: %s", kafkaBrokerMessage.getMagic()));
        }
    }

    public static void readExtension(BrokerMessage brokerMessage, KafkaBrokerMessage kafkaBrokerMessage) {
        if (brokerMessage.getSource() != SourceType.KAFKA.getValue()) {
            return;
        }
        byte magic = KafkaMessageSerializer.getExtensionMagic(brokerMessage.getExtension());
        kafkaBrokerMessage.setMagic(magic);
        if (magic == -1) {
            return;
        }
        if (magic == 0) {
            KafkaMessageV0Serializer.readExtension(brokerMessage, kafkaBrokerMessage);
        } else if (magic == 1) {
            KafkaMessageV1Serializer.readExtension(brokerMessage, kafkaBrokerMessage);
        } else if (magic == 2) {
            KafkaMessageV2Serializer.readExtension(brokerMessage, kafkaBrokerMessage);
        } else {
            throw new UnsupportedOperationException(String.format("readExtension unsupported magic, magic: %s", magic));
        }
    }

    public static void writeMessages(ByteBuf buffer, List<KafkaBrokerMessage> messages, short version) throws Exception {
        if (CollectionUtils.isEmpty(messages)) {
            return;
        }
        byte supportedMagic = KafkaMessageSerializer.getSupportedMagic(version);
        for (KafkaBrokerMessage message : messages) {
            byte currentMagic = message.getMagic();
            byte requiredMagic = (byte)Math.min(currentMagic, supportedMagic);
            if (requiredMagic < 2 && currentMagic == 2) {
                List<KafkaBrokerMessage> batchMessages = KafkaMessageV2Serializer.readMessages(message);
                for (KafkaBrokerMessage batchMessage : batchMessages) {
                    KafkaMessageSerializer.writeMessage(buffer, batchMessage, requiredMagic);
                }
                continue;
            }
            KafkaMessageSerializer.writeMessage(buffer, message, requiredMagic);
        }
    }

    protected static byte getSupportedMagic(short version) {
        if (version <= 1) {
            return 0;
        }
        if (version <= 3) {
            return 1;
        }
        return 2;
    }

    protected static void writeMessage(ByteBuf buffer, KafkaBrokerMessage message, byte magic) throws Exception {
        if (magic == 0) {
            KafkaMessageV0Serializer.writeMessage(buffer, message);
        } else if (magic == 1) {
            KafkaMessageV1Serializer.writeMessage(buffer, message);
        } else if (magic == 2) {
            KafkaMessageV2Serializer.writeMessage(buffer, message);
        } else if (magic == -1) {
            KafkaMessageV0Serializer.writeMessage(buffer, message);
        } else {
            throw new UnsupportedOperationException(String.format("writeMessage unsupported magic, magic: %s", magic));
        }
    }

    public static List<KafkaBrokerMessage> readMessages(ByteBuffer buffer) throws Exception {
        byte magic = buffer.get(16);
        if (magic == 0) {
            return KafkaMessageV0Serializer.readMessages(buffer);
        }
        if (magic == 1) {
            return KafkaMessageV1Serializer.readMessages(buffer);
        }
        if (magic == 2) {
            return KafkaMessageV2Serializer.readMessages(buffer);
        }
        throw new UnsupportedOperationException(String.format("readMessages unsupported magic, magic: %s", magic));
    }
}

