/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.debezium.event;

import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.types.DataField;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.event.SchemaDataTypeInference;
import org.apache.flink.cdc.debezium.event.SourceRecordEventDeserializer;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
import org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverter;
import org.apache.flink.cdc.debezium.utils.TemporalConversions;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class DebeziumEventDeserializationSchema
extends SourceRecordEventDeserializer
implements DebeziumDeserializationSchema<Event> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumEventDeserializationSchema.class);
    private static final Map<DataType, DeserializationRuntimeConverter> CONVERTERS = new ConcurrentHashMap<DataType, DeserializationRuntimeConverter>();
    protected final SchemaDataTypeInference schemaDataTypeInference;
    protected final DebeziumChangelogMode changelogMode;

    public DebeziumEventDeserializationSchema(SchemaDataTypeInference schemaDataTypeInference, DebeziumChangelogMode changelogMode) {
        this.schemaDataTypeInference = schemaDataTypeInference;
        this.changelogMode = changelogMode;
    }

    @Override
    public void deserialize(SourceRecord record, Collector<Event> out) throws Exception {
        this.deserialize(record).forEach(arg_0 -> out.collect(arg_0));
    }

    @Override
    public List<DataChangeEvent> deserializeDataChangeRecord(SourceRecord record) throws Exception {
        Envelope.Operation op = Envelope.operationFor((SourceRecord)record);
        TableId tableId = this.getTableId(record);
        Struct value = (Struct)record.value();
        Schema valueSchema = record.valueSchema();
        Map<String, String> meta = this.getMetadata(record);
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
            RecordData after = this.extractAfterDataRecord(value, valueSchema);
            return Collections.singletonList(DataChangeEvent.insertEvent((TableId)tableId, (RecordData)after, meta));
        }
        if (op == Envelope.Operation.DELETE) {
            RecordData before = this.extractBeforeDataRecord(value, valueSchema);
            return Collections.singletonList(DataChangeEvent.deleteEvent((TableId)tableId, (RecordData)before, meta));
        }
        if (op == Envelope.Operation.UPDATE) {
            RecordData after = this.extractAfterDataRecord(value, valueSchema);
            if (this.changelogMode == DebeziumChangelogMode.ALL) {
                RecordData before = this.extractBeforeDataRecord(value, valueSchema);
                return Collections.singletonList(DataChangeEvent.updateEvent((TableId)tableId, (RecordData)before, (RecordData)after, meta));
            }
            return Collections.singletonList(DataChangeEvent.updateEvent((TableId)tableId, null, (RecordData)after, meta));
        }
        LOG.trace("Received {} operation, skip", (Object)op);
        return Collections.emptyList();
    }

    public TypeInformation<Event> getProducedType() {
        return new EventTypeInfo();
    }

    private RecordData extractBeforeDataRecord(Struct value, Schema valueSchema) throws Exception {
        Schema beforeSchema = DebeziumEventDeserializationSchema.fieldSchema(valueSchema, "before");
        Struct beforeValue = DebeziumEventDeserializationSchema.fieldStruct(value, "before");
        return this.extractDataRecord(beforeValue, beforeSchema);
    }

    private RecordData extractAfterDataRecord(Struct value, Schema valueSchema) throws Exception {
        Schema afterSchema = DebeziumEventDeserializationSchema.fieldSchema(valueSchema, "after");
        Struct afterValue = DebeziumEventDeserializationSchema.fieldStruct(value, "after");
        return this.extractDataRecord(afterValue, afterSchema);
    }

    private RecordData extractDataRecord(Struct value, Schema valueSchema) throws Exception {
        DataType dataType = this.schemaDataTypeInference.infer(value, valueSchema);
        return (RecordData)this.getOrCreateConverter(dataType).convert(value, valueSchema);
    }

    private DeserializationRuntimeConverter getOrCreateConverter(DataType type) {
        return CONVERTERS.computeIfAbsent(type, this::createConverter);
    }

    private DeserializationRuntimeConverter createConverter(DataType type) {
        return DebeziumEventDeserializationSchema.wrapIntoNullableConverter(this.createNotNullConverter(type));
    }

    protected DeserializationRuntimeConverter createNotNullConverter(final DataType type) {
        switch (type.getTypeRoot()) {
            case BOOLEAN: {
                return this::convertToBoolean;
            }
            case TINYINT: {
                return this::convertToByte;
            }
            case SMALLINT: {
                return this::convertToShort;
            }
            case INTEGER: {
                return this::convertToInt;
            }
            case BIGINT: {
                return this::convertToLong;
            }
            case DATE: {
                return this::convertToDate;
            }
            case TIME_WITHOUT_TIME_ZONE: {
                return this::convertToTime;
            }
            case TIMESTAMP_WITHOUT_TIME_ZONE: {
                return this::convertToTimestamp;
            }
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE: {
                return this::convertToLocalTimeZoneTimestamp;
            }
            case FLOAT: {
                return this::convertToFloat;
            }
            case DOUBLE: {
                return this::convertToDouble;
            }
            case CHAR: 
            case VARCHAR: {
                return this::convertToString;
            }
            case BINARY: 
            case VARBINARY: {
                return this::convertToBinary;
            }
            case DECIMAL: {
                return new DeserializationRuntimeConverter(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object convert(Object dbzObj, Schema schema) {
                        return DebeziumEventDeserializationSchema.this.convertToDecimal((DecimalType)type, dbzObj, schema);
                    }
                };
            }
            case ROW: {
                return new DeserializationRuntimeConverter(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object convert(Object dbzObj, Schema schema) throws Exception {
                        return DebeziumEventDeserializationSchema.this.convertToRecord((RowType)type, dbzObj, schema);
                    }
                };
            }
        }
        throw new UnsupportedOperationException("Unsupported type: " + type);
    }

    protected Object convertToBoolean(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Boolean) {
            return dbzObj;
        }
        if (dbzObj instanceof Byte) {
            return (Byte)dbzObj == 1;
        }
        if (dbzObj instanceof Short) {
            return (Short)dbzObj == 1;
        }
        return Boolean.parseBoolean(dbzObj.toString());
    }

    protected Object convertToByte(Object dbzObj, Schema schema) {
        return Byte.parseByte(dbzObj.toString());
    }

    protected Object convertToShort(Object dbzObj, Schema schema) {
        return Short.parseShort(dbzObj.toString());
    }

    protected Object convertToInt(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Integer) {
            return dbzObj;
        }
        if (dbzObj instanceof Long) {
            return ((Long)dbzObj).intValue();
        }
        return Integer.parseInt(dbzObj.toString());
    }

    protected Object convertToLong(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Integer) {
            return ((Integer)dbzObj).longValue();
        }
        if (dbzObj instanceof Long) {
            return dbzObj;
        }
        return Long.parseLong(dbzObj.toString());
    }

    protected Object convertToDouble(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Float) {
            return ((Float)dbzObj).doubleValue();
        }
        if (dbzObj instanceof Double) {
            return dbzObj;
        }
        return Double.parseDouble(dbzObj.toString());
    }

    protected Object convertToFloat(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Float) {
            return dbzObj;
        }
        if (dbzObj instanceof Double) {
            return Float.valueOf(((Double)dbzObj).floatValue());
        }
        return Float.valueOf(Float.parseFloat(dbzObj.toString()));
    }

    protected Object convertToDate(Object dbzObj, Schema schema) {
        return (int)TemporalConversions.toLocalDate(dbzObj).toEpochDay();
    }

    protected Object convertToTime(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Long) {
            switch (schema.name()) {
                case "io.debezium.time.MicroTime": {
                    return (int)((Long)dbzObj / 1000L);
                }
                case "io.debezium.time.NanoTime": {
                    return (int)((Long)dbzObj / 1000000L);
                }
            }
        } else if (dbzObj instanceof Integer) {
            return dbzObj;
        }
        return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
    }

    protected Object convertToTimestamp(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Long) {
            switch (schema.name()) {
                case "io.debezium.time.Timestamp": {
                    return TimestampData.fromMillis((long)((Long)dbzObj));
                }
                case "io.debezium.time.MicroTimestamp": {
                    long micro = (Long)dbzObj;
                    return TimestampData.fromMillis((long)(micro / 1000L), (int)((int)(micro % 1000L * 1000L)));
                }
                case "io.debezium.time.NanoTimestamp": {
                    long nano = (Long)dbzObj;
                    return TimestampData.fromMillis((long)(nano / 1000000L), (int)((int)(nano % 1000000L)));
                }
            }
        }
        throw new IllegalArgumentException("Unable to convert to TIMESTAMP from unexpected value '" + dbzObj + "' of type " + dbzObj.getClass().getName());
    }

    protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
        if (dbzObj instanceof String) {
            String str = (String)dbzObj;
            Instant instant = Instant.parse(str);
            return LocalZonedTimestampData.fromInstant((Instant)instant);
        }
        throw new IllegalArgumentException("Unable to convert to TIMESTAMP_LTZ from unexpected value '" + dbzObj + "' of type " + dbzObj.getClass().getName());
    }

    protected Object convertToString(Object dbzObj, Schema schema) {
        return BinaryStringData.fromString((String)dbzObj.toString());
    }

    protected Object convertToBinary(Object dbzObj, Schema schema) {
        if (dbzObj instanceof byte[]) {
            return dbzObj;
        }
        if (dbzObj instanceof ByteBuffer) {
            ByteBuffer byteBuffer = (ByteBuffer)dbzObj;
            byte[] bytes = new byte[byteBuffer.remaining()];
            byteBuffer.get(bytes);
            return bytes;
        }
        throw new UnsupportedOperationException("Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
    }

    protected Object convertToDecimal(DecimalType decimalType, Object dbzObj, Schema schema) {
        BigDecimal bigDecimal;
        int precision = decimalType.getPrecision();
        int scale = decimalType.getScale();
        if (dbzObj instanceof byte[]) {
            bigDecimal = Decimal.toLogical((Schema)schema, (byte[])((byte[])dbzObj));
        } else if (dbzObj instanceof String) {
            bigDecimal = new BigDecimal((String)dbzObj);
        } else if (dbzObj instanceof Double) {
            bigDecimal = BigDecimal.valueOf((Double)dbzObj);
        } else if ("io.debezium.data.VariableScaleDecimal".equals(schema.name())) {
            SpecialValueDecimal decimal = VariableScaleDecimal.toLogical((Struct)((Struct)dbzObj));
            bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
        } else {
            bigDecimal = new BigDecimal(dbzObj.toString());
        }
        return DecimalData.fromBigDecimal((BigDecimal)bigDecimal, (int)precision, (int)scale);
    }

    protected Object convertToRecord(RowType rowType, Object dbzObj, Schema schema) throws Exception {
        DeserializationRuntimeConverter[] fieldConverters = (DeserializationRuntimeConverter[])rowType.getFields().stream().map(DataField::getType).map(this::createConverter).toArray(DeserializationRuntimeConverter[]::new);
        String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
        BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
        Struct struct = (Struct)dbzObj;
        int arity = fieldNames.length;
        Object[] fields = new Object[arity];
        for (int i = 0; i < arity; ++i) {
            Object convertedField;
            String fieldName = fieldNames[i];
            Field field = schema.field(fieldName);
            if (field == null) {
                fields[i] = null;
                continue;
            }
            Object fieldValue = struct.getWithoutDefault(fieldName);
            Schema fieldSchema = schema.field(fieldName).schema();
            fields[i] = convertedField = DebeziumEventDeserializationSchema.convertField(fieldConverters[i], fieldValue, fieldSchema);
        }
        return generator.generate(fields);
    }

    private static Object convertField(DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema) throws Exception {
        if (fieldValue == null) {
            return null;
        }
        return fieldConverter.convert(fieldValue, fieldSchema);
    }

    private static DeserializationRuntimeConverter wrapIntoNullableConverter(final DeserializationRuntimeConverter converter) {
        return new DeserializationRuntimeConverter(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) throws Exception {
                if (dbzObj == null) {
                    return null;
                }
                return converter.convert(dbzObj, schema);
            }
        };
    }
}

