/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.reader;

import io.streamthoughts.kafka.connect.filepulse.annotation.VisibleForTesting;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.internal.Silent;
import io.streamthoughts.kafka.connect.filepulse.reader.AbstractFileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.AbstractFileInputReader;
import io.streamthoughts.kafka.connect.filepulse.reader.AvroRecordOffset;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.IteratorManager;
import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecordOffset;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffset;
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.common.utils.Time;

public class AvroFileInputReader
extends AbstractFileInputReader {
    @Override
    protected FileInputIterator<FileRecord<TypedStruct>> newIterator(FileContext context, IteratorManager iteratorManager) {
        return new AvroFileIterator(iteratorManager, context);
    }

    @VisibleForTesting
    public static class TypedStructConverter {
        private static final Map<Schema.Type, BiFunction<Schema, Object, TypedValue>> AVRO_TYPES_TO_CONVERTER = new HashMap<Schema.Type, BiFunction<Schema, Object, TypedValue>>();

        static TypedStruct fromGenericRecord(GenericRecord record) {
            TypedStruct struct = TypedStruct.create();
            Schema schema = record.getSchema();
            for (Schema.Field field : schema.getFields()) {
                String name = field.name();
                Object value = record.get(name);
                struct = struct.put(name, TypedStructConverter.fromSchemaAndValue(field.schema(), value));
            }
            return struct;
        }

        private static TypedValue fromSchemaAndValue(Schema schema, Object value) {
            Schema.Type fieldType = schema.getType();
            BiFunction<Schema, Object, TypedValue> converter = AVRO_TYPES_TO_CONVERTER.get(fieldType);
            if (converter == null) {
                throw new ReaderException("Unsupported avro type : " + fieldType);
            }
            return converter.apply(schema, value);
        }

        private static TypedValue convertEnum(Schema schema, Object value) {
            String stringValue = value != null ? ((Enum)value).name() : null;
            return TypedValue.string((String)stringValue);
        }

        private static TypedValue convertUnion(Schema schema, Object value) {
            List types = schema.getTypes();
            Optional<Schema> nonNullSchema = types.stream().filter(s -> s.getType() != Schema.Type.NULL).findFirst();
            return TypedStructConverter.fromSchemaAndValue(nonNullSchema.get(), value);
        }

        private static TypedValue convertString(Schema schema, Object value) {
            String stringValue = value != null ? value.toString() : null;
            return TypedValue.string((String)stringValue);
        }

        private static TypedValue convertBytes(Schema schema, Object value) {
            return value != null ? TypedValue.any((Object)value).as(Type.BYTES) : TypedValue.of(null, (Type)Type.BYTES);
        }

        private static TypedValue convertMap(Schema schema, Object value) {
            Map map = (Map)value;
            Schema valueSchema = schema.getValueType();
            Type mapValueType = null;
            HashMap<String, Object> converted = new HashMap<String, Object>();
            for (Map.Entry o : map.entrySet()) {
                TypedValue element = TypedStructConverter.fromSchemaAndValue(valueSchema, o.getValue());
                converted.put(o.getKey().toString(), element.value());
                mapValueType = element.type();
            }
            return mapValueType != null ? TypedValue.map(converted, mapValueType) : TypedValue.of(converted, (Type)Type.MAP);
        }

        private static TypedValue convertCollection(Schema schema, Object value) {
            Collection array = (Collection)value;
            Schema elementSchema = schema.getElementType();
            Type arrayType = null;
            ArrayList<Object> converted = new ArrayList<Object>(array.size());
            for (Object o : array) {
                TypedValue element = TypedStructConverter.fromSchemaAndValue(elementSchema, o);
                converted.add(element.value());
                arrayType = element.type();
            }
            return arrayType != null ? TypedValue.array(converted, arrayType) : TypedValue.of(converted, (Type)Type.ARRAY);
        }

        static {
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.BYTES, TypedStructConverter::convertBytes);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.ENUM, TypedStructConverter::convertEnum);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.STRING, TypedStructConverter::convertString);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.UNION, TypedStructConverter::convertUnion);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.ARRAY, TypedStructConverter::convertCollection);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.MAP, TypedStructConverter::convertMap);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.BOOLEAN, (schema, value) -> TypedValue.bool((Boolean)((Boolean)value)));
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.INT, (schema, value) -> TypedValue.int32((Integer)((Integer)value)));
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.LONG, (schema, value) -> TypedValue.int64((Long)((Long)value)));
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.FLOAT, (schema, value) -> TypedValue.float32((Float)((Float)value)));
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.DOUBLE, (schema, value) -> TypedValue.float64((Double)((Double)value)));
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.RECORD, (schema, value) -> TypedValue.struct((TypedStruct)TypedStructConverter.fromGenericRecord((GenericRecord)value)));
        }
    }

    public static class AvroFileIterator
    extends AbstractFileInputIterator<TypedStruct> {
        private long recordsReadSinceLastSync = 0L;
        private long lastSync = -1L;
        private final GenericDatumReader reader = new GenericDatumReader();
        private DataFileReader<GenericRecord> dataFileReader = (DataFileReader)Silent.unchecked(() -> new DataFileReader(context.file(), (DatumReader)this.reader), ReaderException::new);

        AvroFileIterator(IteratorManager iteratorManager, FileContext context) {
            super(iteratorManager, context);
        }

        public void seekTo(SourceOffset offset) {
            Objects.requireNonNull(offset, "offset can't be null");
            if (offset.position() != -1L) {
                Silent.unchecked(() -> this.dataFileReader.seek(offset.position()), ReaderException::new);
                this.recordsReadSinceLastSync = 0L;
                this.lastSync = this.dataFileReader.previousSync();
                this.skipRecordsUntil(offset.rows());
            }
        }

        private void skipRecordsUntil(long records) {
            while (this.recordsReadSinceLastSync < records) {
                this.nextRecord();
            }
        }

        private void updateContext() {
            SourceOffset offset = new SourceOffset(this.lastSync, this.recordsReadSinceLastSync, Time.SYSTEM.milliseconds());
            this.context = this.context.withOffset(offset);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public RecordsIterable<FileRecord<TypedStruct>> next() {
            try {
                GenericRecord record = this.nextRecord();
                TypedStruct struct = TypedStructConverter.fromGenericRecord(record);
                AvroRecordOffset offset = new AvroRecordOffset(this.lastSync, this.position(), this.recordsReadSinceLastSync);
                RecordsIterable recordsIterable = RecordsIterable.of((Object[])new FileRecord[]{new TypedFileRecord((FileRecordOffset)offset, struct)});
                return recordsIterable;
            }
            finally {
                this.updateContext();
            }
        }

        private GenericRecord nextRecord() {
            if (this.dataFileReader.previousSync() != this.lastSync) {
                this.lastSync = this.dataFileReader.previousSync();
                this.recordsReadSinceLastSync = 0L;
            }
            GenericRecord record = (GenericRecord)this.dataFileReader.next();
            ++this.recordsReadSinceLastSync;
            return record;
        }

        private long position() {
            return (Long)Silent.unchecked(() -> this.dataFileReader.tell(), ReaderException::new);
        }

        public boolean hasNext() {
            return this.dataFileReader.hasNext();
        }

        @Override
        public void close() {
            if (!this.isClose()) {
                Silent.unchecked(() -> this.dataFileReader.close(), ReaderException::new);
                super.close();
            }
        }
    }
}

