/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.connector.flink.table;

import com.oceanbase.connector.flink.table.AbstractRecordSerializationSchema;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.Record;
import com.oceanbase.connector.flink.table.SerializationRuntimeConverter;
import com.oceanbase.connector.flink.table.TableInfo;
import java.sql.Date;
import java.sql.Time;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;

public class OceanBaseRowDataSerializationSchema
extends AbstractRecordSerializationSchema<RowData> {
    private static final long serialVersionUID = 1L;
    private final TableInfo tableInfo;
    private final RowData.FieldGetter[] fieldGetters;
    private final SerializationRuntimeConverter[] fieldConverters;

    public OceanBaseRowDataSerializationSchema(TableInfo tableInfo) {
        this.tableInfo = tableInfo;
        this.fieldGetters = (RowData.FieldGetter[])IntStream.range(0, tableInfo.getDataTypes().size()).boxed().map(i -> RowData.createFieldGetter((LogicalType)tableInfo.getDataTypes().get((int)i), (int)i)).toArray(RowData.FieldGetter[]::new);
        this.fieldConverters = (SerializationRuntimeConverter[])tableInfo.getDataTypes().stream().map(this::getOrCreateConverter).toArray(SerializationRuntimeConverter[]::new);
    }

    @Override
    public Record serialize(RowData rowData) {
        Object[] values = new Object[this.fieldGetters.length];
        for (int i = 0; i < this.fieldGetters.length; ++i) {
            values[i] = this.fieldConverters[i].convert(this.fieldGetters[i].getFieldOrNull(rowData));
        }
        return new DataChangeRecord(this.tableInfo, rowData.getRowKind() == RowKind.INSERT || rowData.getRowKind() == RowKind.UPDATE_AFTER ? DataChangeRecord.Type.UPSERT : DataChangeRecord.Type.DELETE, values);
    }

    @Override
    protected SerializationRuntimeConverter createNotNullConverter(LogicalType type) {
        switch (type.getTypeRoot()) {
            case BOOLEAN: 
            case TINYINT: 
            case SMALLINT: 
            case INTEGER: 
            case INTERVAL_YEAR_MONTH: 
            case BIGINT: 
            case INTERVAL_DAY_TIME: 
            case FLOAT: 
            case DOUBLE: 
            case BINARY: 
            case VARBINARY: {
                return data -> data;
            }
            case CHAR: 
            case VARCHAR: {
                return Object::toString;
            }
            case DATE: {
                return data -> Date.valueOf(LocalDate.ofEpochDay(((Integer)data).intValue()));
            }
            case TIME_WITHOUT_TIME_ZONE: {
                return data -> Time.valueOf(LocalTime.ofNanoOfDay((long)((Integer)data).intValue() * 1000000L));
            }
            case TIMESTAMP_WITHOUT_TIME_ZONE: {
                return data -> ((TimestampData)data).toTimestamp();
            }
            case DECIMAL: {
                return data -> ((DecimalData)data).toBigDecimal();
            }
            case ARRAY: {
                return data -> {
                    ArrayData arrayData = (ArrayData)data;
                    return IntStream.range(0, arrayData.size()).mapToObj(i -> arrayData.getString(i).toString()).collect(Collectors.joining(","));
                };
            }
        }
        throw new UnsupportedOperationException("Unsupported type:" + type);
    }
}

