/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.data;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.iceberg.flink.FlinkRowData;
import org.apache.iceberg.flink.data.ParquetWithFlinkSchemaVisitor;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.shaded.org.apache.parquet.column.ColumnDescriptor;
import org.apache.iceberg.shaded.org.apache.parquet.io.api.Binary;
import org.apache.iceberg.shaded.org.apache.parquet.schema.GroupType;
import org.apache.iceberg.shaded.org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.iceberg.shaded.org.apache.parquet.schema.MessageType;
import org.apache.iceberg.shaded.org.apache.parquet.schema.PrimitiveType;
import org.apache.iceberg.shaded.org.apache.parquet.schema.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DecimalUtil;

public class FlinkParquetWriters {
    private FlinkParquetWriters() {
    }

    public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema, MessageType type) {
        return (ParquetValueWriter)ParquetWithFlinkSchemaVisitor.visit(schema, type, new WriteBuilder(type));
    }

    private static ParquetValueWriter<?> ints(LogicalType type, ColumnDescriptor desc) {
        if (type instanceof TinyIntType) {
            return ParquetValueWriters.tinyints(desc);
        }
        if (type instanceof SmallIntType) {
            return ParquetValueWriters.shorts(desc);
        }
        return ParquetValueWriters.ints(desc);
    }

    private static ParquetValueWriter<StringData> strings(ColumnDescriptor desc) {
        return new StringDataWriter(desc);
    }

    private static ParquetValueWriter<Integer> timeMicros(ColumnDescriptor desc) {
        return new TimeMicrosWriter(desc);
    }

    private static ParquetValueWriter<DecimalData> decimalAsInteger(ColumnDescriptor desc, int precision, int scale) {
        Preconditions.checkArgument(precision <= 9, "Cannot write decimal value as integer with precision larger than 9, wrong precision %s", precision);
        return new IntegerDecimalWriter(desc, precision, scale);
    }

    private static ParquetValueWriter<DecimalData> decimalAsLong(ColumnDescriptor desc, int precision, int scale) {
        Preconditions.checkArgument(precision <= 18, "Cannot write decimal value as long with precision larger than 18,  wrong precision %s", precision);
        return new LongDecimalWriter(desc, precision, scale);
    }

    private static ParquetValueWriter<DecimalData> decimalAsFixed(ColumnDescriptor desc, int precision, int scale) {
        return new FixedDecimalWriter(desc, precision, scale);
    }

    private static ParquetValueWriter<TimestampData> timestamps(ColumnDescriptor desc) {
        return new TimestampDataWriter(desc);
    }

    private static ParquetValueWriter<TimestampData> timestampNanos(ColumnDescriptor desc) {
        return new TimestampNanoDataWriter(desc);
    }

    private static ParquetValueWriter<byte[]> byteArrays(ColumnDescriptor desc) {
        return new ByteArrayWriter(desc);
    }

    private static class WriteBuilder
    extends ParquetWithFlinkSchemaVisitor<ParquetValueWriter<?>> {
        private final MessageType type;

        WriteBuilder(MessageType type) {
            this.type = type;
        }

        @Override
        public ParquetValueWriter<?> message(RowType sStruct, MessageType message, List<ParquetValueWriter<?>> fields) {
            return this.struct(sStruct, message.asGroupType(), (List)fields);
        }

        @Override
        public ParquetValueWriter<?> struct(RowType sStruct, GroupType struct, List<ParquetValueWriter<?>> fieldWriters) {
            List flinkFields = sStruct.getFields();
            ArrayList<ParquetValueWriter<?>> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size());
            ArrayList<LogicalType> flinkTypes = Lists.newArrayList();
            int[] fieldIndexes = new int[fieldWriters.size()];
            int fieldIndex = 0;
            for (int i = 0; i < flinkFields.size(); ++i) {
                LogicalType flinkType = ((RowType.RowField)flinkFields.get(i)).getType();
                if (flinkType.is(LogicalTypeRoot.NULL)) continue;
                writers.add(this.newOption(struct.getType(fieldIndex), fieldWriters.get(fieldIndex)));
                flinkTypes.add(flinkType);
                fieldIndexes[fieldIndex] = i;
                ++fieldIndex;
            }
            return new RowDataWriter(fieldIndexes, writers, flinkTypes);
        }

        @Override
        public ParquetValueWriter<?> list(ArrayType sArray, GroupType array, ParquetValueWriter<?> elementWriter) {
            GroupType repeated = array.getFields().get(0).asGroupType();
            String[] repeatedPath = this.currentPath();
            int repeatedD = this.type.getMaxDefinitionLevel(repeatedPath);
            int repeatedR = this.type.getMaxRepetitionLevel(repeatedPath);
            return new ArrayDataWriter(repeatedD, repeatedR, this.newOption(repeated.getType(0), elementWriter), sArray.getElementType());
        }

        @Override
        public ParquetValueWriter<?> map(MapType sMap, GroupType map, ParquetValueWriter<?> keyWriter, ParquetValueWriter<?> valueWriter) {
            GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
            String[] repeatedPath = this.currentPath();
            int repeatedD = this.type.getMaxDefinitionLevel(repeatedPath);
            int repeatedR = this.type.getMaxRepetitionLevel(repeatedPath);
            return new MapDataWriter(repeatedD, repeatedR, this.newOption(repeatedKeyValue.getType(0), keyWriter), this.newOption(repeatedKeyValue.getType(1), valueWriter), sMap.getKeyType(), sMap.getValueType());
        }

        private ParquetValueWriter<?> newOption(Type fieldType, ParquetValueWriter<?> writer) {
            int maxD = this.type.getMaxDefinitionLevel(this.path(fieldType.getName()));
            return ParquetValueWriters.option(fieldType, maxD, writer);
        }

        @Override
        public ParquetValueWriter<?> primitive(LogicalType fType, PrimitiveType primitive) {
            ColumnDescriptor desc = this.type.getColumnDescription(this.currentPath());
            LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation();
            if (annotation != null) {
                Optional writer = annotation.accept(new LogicalTypeWriterBuilder(fType, desc));
                if (writer.isPresent()) {
                    return (ParquetValueWriter)writer.get();
                }
                throw new UnsupportedOperationException("Unsupported logical type: " + String.valueOf((Object)primitive.getOriginalType()));
            }
            switch (primitive.getPrimitiveTypeName()) {
                case FIXED_LEN_BYTE_ARRAY: 
                case BINARY: {
                    return FlinkParquetWriters.byteArrays(desc);
                }
                case BOOLEAN: {
                    return ParquetValueWriters.booleans(desc);
                }
                case INT32: {
                    return FlinkParquetWriters.ints(fType, desc);
                }
                case INT64: {
                    return ParquetValueWriters.longs(desc);
                }
                case FLOAT: {
                    return ParquetValueWriters.floats(desc);
                }
                case DOUBLE: {
                    return ParquetValueWriters.doubles(desc);
                }
            }
            throw new UnsupportedOperationException("Unsupported type: " + String.valueOf(primitive));
        }
    }

    private static class StringDataWriter
    extends ParquetValueWriters.PrimitiveWriter<StringData> {
        private StringDataWriter(ColumnDescriptor desc) {
            super(desc);
        }

        @Override
        public void write(int repetitionLevel, StringData value) {
            this.column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value.toBytes()));
        }
    }

    private static class TimeMicrosWriter
    extends ParquetValueWriters.PrimitiveWriter<Integer> {
        private TimeMicrosWriter(ColumnDescriptor desc) {
            super(desc);
        }

        @Override
        public void write(int repetitionLevel, Integer value) {
            long micros = value.longValue() * 1000L;
            this.column.writeLong(repetitionLevel, micros);
        }
    }

    private static class IntegerDecimalWriter
    extends ParquetValueWriters.PrimitiveWriter<DecimalData> {
        private final int precision;
        private final int scale;

        private IntegerDecimalWriter(ColumnDescriptor desc, int precision, int scale) {
            super(desc);
            this.precision = precision;
            this.scale = scale;
        }

        @Override
        public void write(int repetitionLevel, DecimalData decimal) {
            Preconditions.checkArgument(decimal.scale() == this.scale, "Cannot write value as decimal(%s,%s), wrong scale: %s", (Object)this.precision, (Object)this.scale, (Object)decimal);
            Preconditions.checkArgument(decimal.precision() <= this.precision, "Cannot write value as decimal(%s,%s), too large: %s", (Object)this.precision, (Object)this.scale, (Object)decimal);
            this.column.writeInteger(repetitionLevel, (int)decimal.toUnscaledLong());
        }
    }

    private static class LongDecimalWriter
    extends ParquetValueWriters.PrimitiveWriter<DecimalData> {
        private final int precision;
        private final int scale;

        private LongDecimalWriter(ColumnDescriptor desc, int precision, int scale) {
            super(desc);
            this.precision = precision;
            this.scale = scale;
        }

        @Override
        public void write(int repetitionLevel, DecimalData decimal) {
            Preconditions.checkArgument(decimal.scale() == this.scale, "Cannot write value as decimal(%s,%s), wrong scale: %s", (Object)this.precision, (Object)this.scale, (Object)decimal);
            Preconditions.checkArgument(decimal.precision() <= this.precision, "Cannot write value as decimal(%s,%s), too large: %s", (Object)this.precision, (Object)this.scale, (Object)decimal);
            this.column.writeLong(repetitionLevel, decimal.toUnscaledLong());
        }
    }

    private static class FixedDecimalWriter
    extends ParquetValueWriters.PrimitiveWriter<DecimalData> {
        private final int precision;
        private final int scale;
        private final ThreadLocal<byte[]> bytes;

        private FixedDecimalWriter(ColumnDescriptor desc, int precision, int scale) {
            super(desc);
            this.precision = precision;
            this.scale = scale;
            this.bytes = ThreadLocal.withInitial(() -> new byte[TypeUtil.decimalRequiredBytes(precision)]);
        }

        @Override
        public void write(int repetitionLevel, DecimalData decimal) {
            byte[] binary = DecimalUtil.toReusedFixLengthBytes(this.precision, this.scale, decimal.toBigDecimal(), this.bytes.get());
            this.column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(binary));
        }
    }

    private static class TimestampDataWriter
    extends ParquetValueWriters.PrimitiveWriter<TimestampData> {
        private TimestampDataWriter(ColumnDescriptor desc) {
            super(desc);
        }

        @Override
        public void write(int repetitionLevel, TimestampData value) {
            this.column.writeLong(repetitionLevel, value.getMillisecond() * 1000L + (long)(value.getNanoOfMillisecond() / 1000));
        }
    }

    private static class TimestampNanoDataWriter
    extends ParquetValueWriters.PrimitiveWriter<TimestampData> {
        private TimestampNanoDataWriter(ColumnDescriptor desc) {
            super(desc);
        }

        @Override
        public void write(int repetitionLevel, TimestampData value) {
            this.column.writeLong(repetitionLevel, value.getMillisecond() * 1000000L + (long)value.getNanoOfMillisecond());
        }
    }

    private static class ByteArrayWriter
    extends ParquetValueWriters.PrimitiveWriter<byte[]> {
        private ByteArrayWriter(ColumnDescriptor desc) {
            super(desc);
        }

        @Override
        public void write(int repetitionLevel, byte[] bytes) {
            this.column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(bytes));
        }
    }

    private static class RowDataWriter
    extends ParquetValueWriters.StructWriter<RowData> {
        private final RowData.FieldGetter[] fieldGetter;

        RowDataWriter(int[] fieldIndexes, List<ParquetValueWriter<?>> writers, List<LogicalType> types) {
            super(writers);
            this.fieldGetter = new RowData.FieldGetter[types.size()];
            for (int i = 0; i < types.size(); ++i) {
                this.fieldGetter[i] = FlinkRowData.createFieldGetter(types.get(i), fieldIndexes[i]);
            }
        }

        @Override
        protected Object get(RowData struct, int index) {
            return this.fieldGetter[index].getFieldOrNull(struct);
        }
    }

    private static class MapDataWriter<K, V>
    extends ParquetValueWriters.RepeatedKeyValueWriter<MapData, K, V> {
        private final LogicalType keyType;
        private final LogicalType valueType;

        private MapDataWriter(int definitionLevel, int repetitionLevel, ParquetValueWriter<K> keyWriter, ParquetValueWriter<V> valueWriter, LogicalType keyType, LogicalType valueType) {
            super(definitionLevel, repetitionLevel, keyWriter, valueWriter);
            this.keyType = keyType;
            this.valueType = valueType;
        }

        @Override
        protected Iterator<Map.Entry<K, V>> pairs(MapData map) {
            return new EntryIterator(map);
        }

        private class EntryIterator<K, V>
        implements Iterator<Map.Entry<K, V>> {
            private final int size;
            private final ArrayData keys;
            private final ArrayData values;
            private final ParquetValueReaders.ReusableEntry<K, V> entry;
            private final ArrayData.ElementGetter keyGetter;
            private final ArrayData.ElementGetter valueGetter;
            private int index;

            private EntryIterator(MapData map) {
                this.size = map.size();
                this.keys = map.keyArray();
                this.values = map.valueArray();
                this.entry = new ParquetValueReaders.ReusableEntry();
                this.keyGetter = ArrayData.createElementGetter((LogicalType)MapDataWriter.this.keyType);
                this.valueGetter = ArrayData.createElementGetter((LogicalType)MapDataWriter.this.valueType);
                this.index = 0;
            }

            @Override
            public boolean hasNext() {
                return this.index != this.size;
            }

            @Override
            public Map.Entry<K, V> next() {
                if (this.index >= this.size) {
                    throw new NoSuchElementException();
                }
                this.entry.set(this.keyGetter.getElementOrNull(this.keys, this.index), this.valueGetter.getElementOrNull(this.values, this.index));
                ++this.index;
                return this.entry;
            }
        }
    }

    private static class ArrayDataWriter<E>
    extends ParquetValueWriters.RepeatedWriter<ArrayData, E> {
        private final LogicalType elementType;

        private ArrayDataWriter(int definitionLevel, int repetitionLevel, ParquetValueWriter<E> writer, LogicalType elementType) {
            super(definitionLevel, repetitionLevel, writer);
            this.elementType = elementType;
        }

        @Override
        protected Iterator<E> elements(ArrayData list) {
            return new ElementIterator(list);
        }

        private class ElementIterator<E>
        implements Iterator<E> {
            private final int size;
            private final ArrayData list;
            private final ArrayData.ElementGetter getter;
            private int index;

            private ElementIterator(ArrayData list) {
                this.list = list;
                this.size = list.size();
                this.getter = ArrayData.createElementGetter((LogicalType)ArrayDataWriter.this.elementType);
                this.index = 0;
            }

            @Override
            public boolean hasNext() {
                return this.index != this.size;
            }

            @Override
            public E next() {
                if (this.index >= this.size) {
                    throw new NoSuchElementException();
                }
                Object element = this.getter.getElementOrNull(this.list, this.index);
                ++this.index;
                return (E)element;
            }
        }
    }

    private static class LogicalTypeWriterBuilder
    implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueWriter<?>> {
        private final LogicalType flinkType;
        private final ColumnDescriptor desc;

        private LogicalTypeWriterBuilder(LogicalType flinkType, ColumnDescriptor desc) {
            this.flinkType = flinkType;
            this.desc = desc;
        }

        @Override
        public Optional<ParquetValueWriter<?>> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation strings) {
            return Optional.of(FlinkParquetWriters.strings(this.desc));
        }

        @Override
        public Optional<ParquetValueWriter<?>> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enums) {
            return Optional.of(FlinkParquetWriters.strings(this.desc));
        }

        @Override
        public Optional<ParquetValueWriter<?>> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal) {
            ParquetValueWriter<DecimalData> writer;
            switch (this.desc.getPrimitiveType().getPrimitiveTypeName()) {
                case INT32: {
                    writer = FlinkParquetWriters.decimalAsInteger(this.desc, decimal.getPrecision(), decimal.getScale());
                    break;
                }
                case INT64: {
                    writer = FlinkParquetWriters.decimalAsLong(this.desc, decimal.getPrecision(), decimal.getScale());
                    break;
                }
                case FIXED_LEN_BYTE_ARRAY: 
                case BINARY: {
                    writer = FlinkParquetWriters.decimalAsFixed(this.desc, decimal.getPrecision(), decimal.getScale());
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported base type for decimal: " + String.valueOf((Object)this.desc.getPrimitiveType().getPrimitiveTypeName()));
                }
            }
            return Optional.of(writer);
        }

        @Override
        public Optional<ParquetValueWriter<?>> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dates) {
            return Optional.of(FlinkParquetWriters.ints(this.flinkType, this.desc));
        }

        @Override
        public Optional<ParquetValueWriter<?>> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation times) {
            Preconditions.checkArgument(LogicalTypeAnnotation.TimeUnit.MICROS.equals((Object)times.getUnit()), "Cannot write time in %s, only MICROS is supported", (Object)times.getUnit());
            return Optional.of(FlinkParquetWriters.timeMicros(this.desc));
        }

        @Override
        public Optional<ParquetValueWriter<?>> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamps) {
            ParquetValueWriter<TimestampData> writer;
            switch (timestamps.getUnit()) {
                case NANOS: {
                    writer = FlinkParquetWriters.timestampNanos(this.desc);
                    break;
                }
                case MICROS: {
                    writer = FlinkParquetWriters.timestamps(this.desc);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported timestamp type: " + String.valueOf(timestamps));
                }
            }
            return Optional.of(writer);
        }

        @Override
        public Optional<ParquetValueWriter<?>> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation type) {
            Preconditions.checkArgument(type.isSigned(), "Cannot write unsigned integer type: %s", (Object)type);
            ParquetValueWriter<Long> writer = type.getBitWidth() < 64 ? FlinkParquetWriters.ints(this.flinkType, this.desc) : ParquetValueWriters.longs(this.desc);
            return Optional.of(writer);
        }

        @Override
        public Optional<ParquetValueWriter<?>> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation ignored) {
            return Optional.of(FlinkParquetWriters.strings(this.desc));
        }

        @Override
        public Optional<ParquetValueWriter<?>> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation ignored) {
            return Optional.of(FlinkParquetWriters.byteArrays(this.desc));
        }
    }
}

