/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.codec.avro;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.util.Utf8;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.LogEventBuilder;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name="avro", pluginType=InputCodec.class)
public class AvroInputCodec
implements InputCodec {
    private static final Logger LOG = LoggerFactory.getLogger(AvroInputCodec.class);
    private final EventFactory eventFactory;

    @DataPrepperPluginConstructor
    public AvroInputCodec(EventFactory eventFactory) {
        this.eventFactory = eventFactory;
    }

    public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
        Objects.requireNonNull(inputStream);
        Objects.requireNonNull(eventConsumer);
        this.parseAvroStream(inputStream, eventConsumer);
    }

    private void parseAvroStream(InputStream inputStream, Consumer<Record<Event>> eventConsumer) {
        try {
            byte[] avroData = inputStream.readAllBytes();
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(avroData);
            DataFileStream stream = new DataFileStream((InputStream)byteArrayInputStream, (DatumReader)new GenericDatumReader());
            Schema schema = stream.getSchema();
            while (stream.hasNext()) {
                GenericRecord avroRecord = (GenericRecord)stream.next();
                Map<String, Object> eventData = AvroInputCodec.convertRecordToMap(avroRecord, schema);
                Event event = ((LogEventBuilder)this.eventFactory.eventBuilder(LogEventBuilder.class)).withData(eventData).build();
                eventConsumer.accept((Record<Event>)new Record((Object)event));
            }
        }
        catch (Exception avroException) {
            LOG.error("An exception has occurred while parsing avro InputStream ", (Throwable)avroException);
        }
    }

    private static Map<String, Object> convertRecordToMap(GenericRecord record, Schema schema) throws Exception {
        HashMap<String, Object> eventData = new HashMap<String, Object>();
        for (Schema.Field field : schema.getFields()) {
            Map<String, Object> value = record.get(field.name());
            if (value instanceof GenericRecord) {
                Schema schemaOfNestedRecord = ((GenericRecord)value).getSchema();
                value = AvroInputCodec.convertRecordToMap((GenericRecord)value, schemaOfNestedRecord);
            } else if (value instanceof GenericEnumSymbol || value instanceof GenericData.EnumSymbol) {
                value = value.toString();
            } else if (value instanceof Utf8) {
                byte[] utf8Bytes = value.toString().getBytes("UTF-8");
                value = new String(utf8Bytes, "UTF-8");
            }
            eventData.put(field.name(), value);
        }
        return eventData;
    }
}

