/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.common.utils;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.ArrayType;
import org.apache.flink.cdc.common.types.BigIntType;
import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.BooleanType;
import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DateType;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.DoubleType;
import org.apache.flink.cdc.common.types.FloatType;
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.MapType;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.types.SmallIntType;
import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.TinyIntType;
import org.apache.flink.cdc.common.types.VarBinaryType;
import org.apache.flink.cdc.common.types.VarCharType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.shaded.guava31.com.google.common.collect.ArrayListMultimap;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava31.com.google.common.collect.Streams;
import org.apache.flink.shaded.guava31.com.google.common.io.BaseEncoding;

@PublicEvolving
public class SchemaMergingUtils {
    private static final Map<Class<? extends DataType>, List<DataType>> TYPE_MERGING_TREE = SchemaMergingUtils.getTypeMergingTree();

    public static boolean isSchemaCompatible(@Nullable Schema currentSchema, Schema upcomingSchema) {
        if (currentSchema == null) {
            return false;
        }
        Map<String, DataType> currentColumnTypes = currentSchema.getColumns().stream().collect(Collectors.toMap(Column::getName, Column::getType));
        List<Column> upcomingColumns = upcomingSchema.getColumns();
        for (Column upcomingColumn : upcomingColumns) {
            String columnName = upcomingColumn.getName();
            DataType upcomingColumnType = upcomingColumn.getType();
            DataType currentColumnType = currentColumnTypes.get(columnName);
            if (SchemaMergingUtils.isDataTypeCompatible(currentColumnType, upcomingColumnType)) continue;
            return false;
        }
        return true;
    }

    public static Schema getLeastCommonSchema(@Nullable Schema currentSchema, Schema upcomingSchema) {
        if (currentSchema == null) {
            return upcomingSchema;
        }
        if (SchemaMergingUtils.isSchemaCompatible(currentSchema, upcomingSchema)) {
            return currentSchema;
        }
        HashMap<String, DataType> newTypeMapping = new HashMap<String, DataType>();
        Map<String, Column> currentColumns = currentSchema.getColumns().stream().collect(Collectors.toMap(Column::getName, col -> col));
        List<Column> upcomingColumns = upcomingSchema.getColumns();
        ArrayList<Column> appendedColumns = new ArrayList<Column>();
        for (Column upcomingColumn : upcomingColumns) {
            String columnName = upcomingColumn.getName();
            DataType upcomingColumnType = upcomingColumn.getType();
            if (currentColumns.containsKey(columnName)) {
                Column currentColumn = currentColumns.get(columnName);
                DataType currentColumnType = currentColumn.getType();
                DataType leastCommonType = SchemaMergingUtils.getLeastCommonType(currentColumnType, upcomingColumnType);
                if (Objects.equals(leastCommonType, currentColumnType)) continue;
                newTypeMapping.put(columnName, leastCommonType);
                continue;
            }
            appendedColumns.add(upcomingColumn);
        }
        ArrayList<Column> commonColumns = new ArrayList<Column>();
        for (Column column : currentSchema.getColumns()) {
            if (newTypeMapping.containsKey(column.getName())) {
                commonColumns.add(column.copy((DataType)newTypeMapping.get(column.getName())));
                continue;
            }
            commonColumns.add(column);
        }
        commonColumns.addAll(appendedColumns);
        return currentSchema.copy(commonColumns);
    }

    public static Schema getCommonSchema(List<Schema> schemas) {
        if (schemas.isEmpty()) {
            return null;
        }
        if (schemas.size() == 1) {
            return schemas.get(0);
        }
        Schema outputSchema = null;
        for (Schema schema : schemas) {
            outputSchema = SchemaMergingUtils.getLeastCommonSchema(outputSchema, schema);
        }
        return outputSchema;
    }

    public static List<SchemaChangeEvent> getSchemaDifference(TableId tableId, @Nullable Schema beforeSchema, Schema afterSchema) {
        if (beforeSchema == null) {
            return Collections.singletonList(new CreateTableEvent(tableId, afterSchema));
        }
        Map<String, Column> beforeColumns = beforeSchema.getColumns().stream().collect(Collectors.toMap(Column::getName, col -> col));
        HashMap<String, DataType> oldTypeMapping = new HashMap<String, DataType>();
        HashMap<String, DataType> newTypeMapping = new HashMap<String, DataType>();
        ArrayList<AddColumnEvent.ColumnWithPosition> appendedColumns = new ArrayList<AddColumnEvent.ColumnWithPosition>();
        String afterWhichColumnPosition = null;
        for (Column afterColumn : afterSchema.getColumns()) {
            String columnName = afterColumn.getName();
            DataType afterType = afterColumn.getType();
            if (beforeColumns.containsKey(columnName)) {
                DataType beforeType = beforeColumns.get(columnName).getType();
                if (!Objects.equals(beforeType, afterType)) {
                    oldTypeMapping.put(columnName, beforeType);
                    newTypeMapping.put(columnName, afterType);
                }
            } else if (afterWhichColumnPosition == null) {
                appendedColumns.add(new AddColumnEvent.ColumnWithPosition(afterColumn, AddColumnEvent.ColumnPosition.FIRST, null));
            } else {
                appendedColumns.add(new AddColumnEvent.ColumnWithPosition(afterColumn, AddColumnEvent.ColumnPosition.AFTER, afterWhichColumnPosition));
            }
            afterWhichColumnPosition = afterColumn.getName();
        }
        ArrayList<SchemaChangeEvent> schemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        if (!appendedColumns.isEmpty()) {
            schemaChangeEvents.add(new AddColumnEvent(tableId, appendedColumns));
        }
        if (!newTypeMapping.isEmpty()) {
            schemaChangeEvents.add(new AlterColumnTypeEvent(tableId, newTypeMapping, oldTypeMapping));
        }
        return schemaChangeEvents;
    }

    public static Object[] coerceRow(String timezone, Schema currentSchema, Schema upcomingSchema, List<Object> upcomingRow) {
        return SchemaMergingUtils.coerceRow(timezone, currentSchema, upcomingSchema, upcomingRow, true);
    }

    public static Object[] coerceRow(String timezone, Schema currentSchema, Schema upcomingSchema, List<Object> upcomingRow, boolean toleranceMode) {
        List<Column> currentColumns = currentSchema.getColumns();
        Map<String, DataType> upcomingColumnTypes = upcomingSchema.getColumns().stream().collect(Collectors.toMap(Column::getName, Column::getType));
        Map<String, Object> upcomingColumnObjects = Streams.zip(upcomingSchema.getColumnNames().stream(), upcomingRow.stream(), Tuple2::of).filter(t -> t.f1 != null).collect(Collectors.toMap(t -> (String)t.f0, t -> t.f1));
        Object[] coercedRow = new Object[currentSchema.getColumnCount()];
        for (int i = 0; i < currentSchema.getColumnCount(); ++i) {
            Column currentColumn = currentColumns.get(i);
            String columnName = currentColumn.getName();
            if (upcomingColumnTypes.containsKey(columnName)) {
                DataType currentType;
                DataType upcomingType = upcomingColumnTypes.get(columnName);
                if (Objects.equals(upcomingType, currentType = currentColumn.getType())) {
                    coercedRow[i] = upcomingColumnObjects.get(columnName);
                    continue;
                }
                try {
                    coercedRow[i] = SchemaMergingUtils.coerceObject(timezone, upcomingColumnObjects.get(columnName), upcomingColumnTypes.get(columnName), currentColumn.getType());
                    continue;
                }
                catch (IllegalArgumentException e) {
                    if (toleranceMode) continue;
                    throw e;
                }
            }
            coercedRow[i] = null;
        }
        return coercedRow;
    }

    public static Schema strictlyMergeSchemas(List<Schema> schemas) {
        Preconditions.checkArgument(!schemas.isEmpty(), "Trying to merge transformed schemas %s, but got empty list", new Object[0]);
        if (schemas.size() == 1) {
            return schemas.get(0);
        }
        List primaryKeys = schemas.stream().map(Schema::primaryKeys).filter(p -> !p.isEmpty()).distinct().collect(Collectors.toList());
        List partitionKeys = schemas.stream().map(Schema::partitionKeys).filter(p -> !p.isEmpty()).distinct().collect(Collectors.toList());
        List options = schemas.stream().map(Schema::options).filter(p -> !p.isEmpty()).distinct().collect(Collectors.toList());
        List columnNames = schemas.stream().map(Schema::getColumnNames).distinct().collect(Collectors.toList());
        Preconditions.checkArgument(primaryKeys.size() <= 1, "Trying to merge transformed schemas %s, but got more than one primary key configurations: %s", schemas, primaryKeys);
        Preconditions.checkArgument(partitionKeys.size() <= 1, "Trying to merge transformed schemas %s, but got more than one partition key configurations: %s", schemas, partitionKeys);
        Preconditions.checkArgument(options.size() <= 1, "Trying to merge transformed schemas %s, but got more than one option configurations: %s", schemas, options);
        Preconditions.checkArgument(columnNames.size() == 1, "Trying to merge transformed schemas %s, but got more than one column name views: %s", schemas, columnNames);
        int arity = ((List)columnNames.get(0)).size();
        ArrayListMultimap toBeMergedColumnTypes = ArrayListMultimap.create((int)arity, (int)1);
        for (Schema schema : schemas) {
            List<DataType> columnTypes = schema.getColumnDataTypes();
            for (int colIndex = 0; colIndex < columnTypes.size(); ++colIndex) {
                toBeMergedColumnTypes.put((Object)colIndex, (Object)columnTypes.get(colIndex));
            }
        }
        List mergedColumnNames = (List)columnNames.iterator().next();
        ArrayList<DataType> mergedColumnTypes = new ArrayList<DataType>(arity);
        for (int i = 0; i < arity; ++i) {
            mergedColumnTypes.add(SchemaMergingUtils.strictlyMergeDataTypes(toBeMergedColumnTypes.get((Object)i)));
        }
        ArrayList<Column> mergedColumns = new ArrayList<Column>();
        for (int i = 0; i < mergedColumnNames.size(); ++i) {
            mergedColumns.add(Column.physicalColumn((String)mergedColumnNames.get(i), (DataType)mergedColumnTypes.get(i)));
        }
        return Schema.newBuilder().primaryKey(primaryKeys.isEmpty() ? Collections.emptyList() : (List)primaryKeys.get(0)).partitionKey(partitionKeys.isEmpty() ? Collections.emptyList() : (List)partitionKeys.get(0)).options(options.isEmpty() ? Collections.emptyMap() : (Map)options.get(0)).setColumns(mergedColumns).build();
    }

    private static DataType strictlyMergeDataTypes(List<DataType> dataTypes) {
        Preconditions.checkArgument(!dataTypes.isEmpty(), "Trying to merge transformed data types %s, but got empty list", new Object[0]);
        List simpleMergeTypes = dataTypes.stream().distinct().collect(Collectors.toList());
        if (simpleMergeTypes.size() == 1) {
            return (DataType)simpleMergeTypes.get(0);
        }
        List typeRoots = dataTypes.stream().map(DataType::getTypeRoot).distinct().collect(Collectors.toList());
        Preconditions.checkArgument(typeRoots.size() == 1, "Trying to merge types %s, but got more than one type root: %s", dataTypes, typeRoots);
        DataType type = dataTypes.get(0);
        if (type.is(DataTypeRoot.CHAR)) {
            return DataTypes.CHAR(Integer.MAX_VALUE);
        }
        if (type.is(DataTypeRoot.VARCHAR)) {
            return DataTypes.STRING();
        }
        if (type.is(DataTypeRoot.BINARY)) {
            return DataTypes.BINARY(Integer.MAX_VALUE);
        }
        if (type.is(DataTypeRoot.VARBINARY)) {
            return DataTypes.VARBINARY(Integer.MAX_VALUE);
        }
        if (type.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
            return DataTypes.TIMESTAMP(9);
        }
        if (type.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) {
            return DataTypes.TIMESTAMP_TZ(9);
        }
        if (type.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
            return DataTypes.TIMESTAMP_LTZ(9);
        }
        throw new IllegalArgumentException("Unable to merge data types with different precision: " + dataTypes);
    }

    @VisibleForTesting
    static boolean isDataTypeCompatible(@Nullable DataType currentType, DataType upcomingType) {
        if (Objects.equals(currentType, upcomingType)) {
            return true;
        }
        if (currentType == null) {
            return false;
        }
        return TYPE_MERGING_TREE.get(upcomingType.getClass()).contains(currentType);
    }

    @VisibleForTesting
    static DataType getLeastCommonType(DataType currentType, DataType targetType) {
        boolean nullable = currentType.isNullable() || targetType.isNullable();
        if (Objects.equals(currentType = currentType.notNull(), targetType = targetType.notNull())) {
            return currentType.copy(nullable);
        }
        if (currentType.is(DataTypeFamily.TIMESTAMP) && targetType.is(DataTypeFamily.TIMESTAMP)) {
            return SchemaMergingUtils.mergeTimestampType(currentType, targetType).copy(nullable);
        }
        if (currentType instanceof DecimalType || targetType instanceof DecimalType) {
            return SchemaMergingUtils.mergeDecimalType(currentType, targetType).copy(nullable);
        }
        List<DataType> currentTypeTree = TYPE_MERGING_TREE.get(currentType.getClass());
        List<DataType> targetTypeTree = TYPE_MERGING_TREE.get(targetType.getClass());
        for (DataType type : currentTypeTree) {
            if (!targetTypeTree.contains(type)) continue;
            return type.copy(nullable);
        }
        return DataTypes.STRING().copy(nullable);
    }

    @VisibleForTesting
    static DataType mergeTimestampType(DataType lType, DataType rType) {
        int rightPrecision;
        int rightTypeLevel;
        int leftPrecision;
        int leftTypeLevel;
        if (lType instanceof TimestampType) {
            leftTypeLevel = 0;
            leftPrecision = ((TimestampType)lType).getPrecision();
        } else if (lType instanceof LocalZonedTimestampType) {
            leftTypeLevel = 1;
            leftPrecision = ((LocalZonedTimestampType)lType).getPrecision();
        } else if (lType instanceof ZonedTimestampType) {
            leftTypeLevel = 2;
            leftPrecision = ((ZonedTimestampType)lType).getPrecision();
        } else {
            throw new IllegalArgumentException("Unknown TIMESTAMP type: " + lType);
        }
        if (rType instanceof TimestampType) {
            rightTypeLevel = 0;
            rightPrecision = ((TimestampType)rType).getPrecision();
        } else if (rType instanceof LocalZonedTimestampType) {
            rightTypeLevel = 1;
            rightPrecision = ((LocalZonedTimestampType)rType).getPrecision();
        } else if (rType instanceof ZonedTimestampType) {
            rightTypeLevel = 2;
            rightPrecision = ((ZonedTimestampType)rType).getPrecision();
        } else {
            throw new IllegalArgumentException("Unknown TIMESTAMP type: " + lType);
        }
        int precision = Math.max(leftPrecision, rightPrecision);
        switch (Math.max(leftTypeLevel, rightTypeLevel)) {
            case 0: {
                return DataTypes.TIMESTAMP(precision);
            }
            case 1: {
                return DataTypes.TIMESTAMP_LTZ(precision);
            }
            case 2: {
                return DataTypes.TIMESTAMP_TZ(precision);
            }
        }
        throw new IllegalArgumentException("Unreachable");
    }

    @VisibleForTesting
    static DataType mergeDecimalType(DataType lType, DataType rType) {
        if (lType instanceof DecimalType && rType instanceof DecimalType) {
            int resultScale;
            DecimalType lhsDecimal = (DecimalType)lType;
            DecimalType rhsDecimal = (DecimalType)rType;
            int resultIntDigits = Math.max(lhsDecimal.getPrecision() - lhsDecimal.getScale(), rhsDecimal.getPrecision() - rhsDecimal.getScale());
            Preconditions.checkArgument(resultIntDigits + (resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale())) <= 38, String.format("Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available", lType, rType, resultIntDigits + resultScale, 38), new Object[0]);
            return DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
        }
        if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
            return SchemaMergingUtils.mergeExactNumericsIntoDecimal((DecimalType)lType, rType);
        }
        if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
            return SchemaMergingUtils.mergeExactNumericsIntoDecimal((DecimalType)rType, lType);
        }
        return DataTypes.STRING();
    }

    private static DataType mergeExactNumericsIntoDecimal(DecimalType decimalType, DataType otherType) {
        int resultPrecision = Math.max(decimalType.getPrecision(), decimalType.getScale() + SchemaMergingUtils.getNumericPrecision(otherType));
        if (resultPrecision <= 38) {
            return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
        }
        return DataTypes.STRING();
    }

    @VisibleForTesting
    public static int getNumericPrecision(DataType dataType) {
        if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {
            if (dataType.is(DataTypeRoot.TINYINT)) {
                return 3;
            }
            if (dataType.is(DataTypeRoot.SMALLINT)) {
                return 5;
            }
            if (dataType.is(DataTypeRoot.INTEGER)) {
                return 10;
            }
            if (dataType.is(DataTypeRoot.BIGINT)) {
                return 19;
            }
            if (dataType.is(DataTypeRoot.DECIMAL)) {
                return ((DecimalType)dataType).getPrecision();
            }
        }
        throw new IllegalArgumentException("Failed to get precision of non-exact decimal type " + dataType);
    }

    @VisibleForTesting
    static Object coerceObject(String timezone, Object originalField, DataType originalType, DataType destinationType) {
        if (originalField == null) {
            return null;
        }
        if (destinationType instanceof BooleanType) {
            return Boolean.valueOf(originalField.toString());
        }
        if (destinationType instanceof TinyIntType) {
            return SchemaMergingUtils.coerceToByte(originalField);
        }
        if (destinationType instanceof SmallIntType) {
            return SchemaMergingUtils.coerceToShort(originalField);
        }
        if (destinationType instanceof IntType) {
            return SchemaMergingUtils.coerceToInt(originalField);
        }
        if (destinationType instanceof BigIntType) {
            return SchemaMergingUtils.coerceToLong(originalField);
        }
        if (destinationType instanceof DecimalType) {
            DecimalType decimalType = (DecimalType)destinationType;
            return SchemaMergingUtils.coerceToDecimal(originalField, decimalType.getPrecision(), decimalType.getScale());
        }
        if (destinationType instanceof FloatType) {
            return Float.valueOf(SchemaMergingUtils.coerceToFloat(originalField));
        }
        if (destinationType instanceof DoubleType) {
            return SchemaMergingUtils.coerceToDouble(originalField);
        }
        if (destinationType instanceof CharType) {
            return SchemaMergingUtils.coerceToString(originalField, originalType);
        }
        if (destinationType instanceof VarCharType) {
            return SchemaMergingUtils.coerceToString(originalField, originalType);
        }
        if (destinationType instanceof BinaryType) {
            return SchemaMergingUtils.coerceToBytes(originalField);
        }
        if (destinationType instanceof VarBinaryType) {
            return SchemaMergingUtils.coerceToBytes(originalField);
        }
        if (destinationType instanceof DateType) {
            try {
                return SchemaMergingUtils.coerceToLong(originalField);
            }
            catch (IllegalArgumentException e) {
                throw new IllegalArgumentException(String.format("Cannot fit \"%s\" into a DATE column.", originalField));
            }
        }
        if (destinationType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) && originalType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
            return originalField;
        }
        if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE) && originalType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) {
            return originalField;
        }
        if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) && originalType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
            return originalField;
        }
        if (destinationType instanceof TimestampType) {
            return SchemaMergingUtils.coerceToTimestamp(originalField, timezone);
        }
        if (destinationType instanceof LocalZonedTimestampType) {
            return SchemaMergingUtils.coerceToLocalZonedTimestamp(originalField, timezone);
        }
        if (destinationType instanceof ZonedTimestampType) {
            return SchemaMergingUtils.coerceToZonedTimestamp(originalField, timezone);
        }
        throw new IllegalArgumentException(String.format("Column type \"%s\" doesn't support type coercion to \"%s\"", originalType, destinationType));
    }

    private static Object coerceToString(Object originalField, DataType originalType) {
        if (originalField == null) {
            return BinaryStringData.fromString("null");
        }
        if (originalType instanceof DateType) {
            long epochOfDay = SchemaMergingUtils.coerceToLong(originalField);
            return BinaryStringData.fromString(LocalDate.ofEpochDay(epochOfDay).toString());
        }
        if (originalField instanceof StringData) {
            return originalField;
        }
        if (originalField instanceof byte[]) {
            return BinaryStringData.fromString(SchemaMergingUtils.hexlify((byte[])originalField));
        }
        return BinaryStringData.fromString(originalField.toString());
    }

    private static Object coerceToBytes(Object originalField) {
        if (originalField instanceof byte[]) {
            return originalField;
        }
        return originalField.toString().getBytes();
    }

    private static byte coerceToByte(Object o) {
        if (o instanceof Byte) {
            return (Byte)o;
        }
        throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a TINYINT column. ", o.getClass()));
    }

    private static short coerceToShort(Object o) {
        if (o instanceof Byte) {
            return ((Byte)o).shortValue();
        }
        if (o instanceof Short) {
            return (Short)o;
        }
        throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a SMALLINT column. Currently only TINYINT can be accepted by a SMALLINT column", o.getClass()));
    }

    private static int coerceToInt(Object o) {
        if (o instanceof Byte) {
            return ((Byte)o).intValue();
        }
        if (o instanceof Short) {
            return ((Short)o).intValue();
        }
        if (o instanceof Integer) {
            return (Integer)o;
        }
        throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a INT column. Currently only TINYINT / SMALLINT can be accepted by a INT column", o.getClass()));
    }

    private static long coerceToLong(Object o) {
        if (o instanceof Byte) {
            return ((Byte)o).longValue();
        }
        if (o instanceof Short) {
            return ((Short)o).longValue();
        }
        if (o instanceof Integer) {
            return ((Integer)o).longValue();
        }
        if (o instanceof Long) {
            return (Long)o;
        }
        throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a BIGINT column. Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column", o.getClass()));
    }

    private static DecimalData coerceToDecimal(Object o, int precision, int scale) {
        BigDecimal decimalValue;
        if (o instanceof Byte) {
            decimalValue = BigDecimal.valueOf(((Byte)o).longValue(), 0);
        } else if (o instanceof Short) {
            decimalValue = BigDecimal.valueOf(((Short)o).longValue(), 0);
        } else if (o instanceof Integer) {
            decimalValue = BigDecimal.valueOf(((Integer)o).longValue(), 0);
        } else if (o instanceof Long) {
            decimalValue = BigDecimal.valueOf((Long)o, 0);
        } else if (o instanceof DecimalData) {
            decimalValue = ((DecimalData)o).toBigDecimal();
        } else {
            throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a DECIMAL column. Currently only TINYINT / SMALLINT / INT / BIGINT / DECIMAL can be accepted by a DECIMAL column", o.getClass()));
        }
        return decimalValue != null ? DecimalData.fromBigDecimal(decimalValue, precision, scale) : null;
    }

    private static float coerceToFloat(Object o) {
        if (o instanceof Byte) {
            return ((Byte)o).floatValue();
        }
        if (o instanceof Short) {
            return ((Short)o).floatValue();
        }
        if (o instanceof Integer) {
            return ((Integer)o).floatValue();
        }
        if (o instanceof Long) {
            return ((Long)o).floatValue();
        }
        if (o instanceof DecimalData) {
            return ((DecimalData)o).toBigDecimal().floatValue();
        }
        if (o instanceof Float) {
            return ((Float)o).floatValue();
        }
        throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a FLOAT column. Currently only TINYINT / SMALLINT / INT / BIGINT / DECIMAL can be accepted by a FLOAT column", o.getClass()));
    }

    private static double coerceToDouble(Object o) {
        if (o instanceof Byte) {
            return ((Byte)o).doubleValue();
        }
        if (o instanceof Short) {
            return ((Short)o).doubleValue();
        }
        if (o instanceof Integer) {
            return ((Integer)o).doubleValue();
        }
        if (o instanceof Long) {
            return ((Long)o).doubleValue();
        }
        if (o instanceof DecimalData) {
            return ((DecimalData)o).toBigDecimal().doubleValue();
        }
        if (o instanceof Float) {
            return ((Float)o).doubleValue();
        }
        if (o instanceof Double) {
            return (Double)o;
        }
        throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a DOUBLE column. Currently only TINYINT / SMALLINT / INT / BIGINT / DECIMAL / FLOAT can be accepted by a DOUBLE column", o.getClass()));
    }

    private static TimestampData coerceToTimestamp(Object object, String timezone) {
        if (object == null) {
            return null;
        }
        if (object instanceof Long) {
            return TimestampData.fromLocalDateTime(LocalDate.ofEpochDay((Long)object).atStartOfDay());
        }
        if (object instanceof LocalZonedTimestampData) {
            return TimestampData.fromLocalDateTime(LocalDateTime.ofInstant(((LocalZonedTimestampData)object).toInstant(), ZoneId.of(timezone)));
        }
        if (object instanceof ZonedTimestampData) {
            return TimestampData.fromLocalDateTime(LocalDateTime.ofInstant(((ZonedTimestampData)object).toInstant(), ZoneId.of(timezone)));
        }
        if (object instanceof TimestampData) {
            return (TimestampData)object;
        }
        throw new IllegalArgumentException(String.format("Unable to implicitly coerce object `%s` as a TIMESTAMP.", object));
    }

    private static LocalZonedTimestampData coerceToLocalZonedTimestamp(Object object, String timezone) {
        if (object == null) {
            return null;
        }
        TimestampData timestampData = SchemaMergingUtils.coerceToTimestamp(object, timezone);
        return LocalZonedTimestampData.fromEpochMillis(timestampData.getMillisecond(), timestampData.getNanoOfMillisecond());
    }

    private static ZonedTimestampData coerceToZonedTimestamp(Object object, String timezone) {
        if (object == null) {
            return null;
        }
        TimestampData timestampData = SchemaMergingUtils.coerceToTimestamp(object, timezone);
        return ZonedTimestampData.fromZonedDateTime(ZonedDateTime.ofInstant(timestampData.toLocalDateTime().toInstant(ZoneOffset.UTC), ZoneId.of(timezone)));
    }

    private static String hexlify(byte[] bytes) {
        return BaseEncoding.base64().encode(bytes);
    }

    private static Map<Class<? extends DataType>, List<DataType>> getTypeMergingTree() {
        DataType stringType = DataTypes.STRING();
        DoubleType doubleType = DataTypes.DOUBLE();
        FloatType floatType = DataTypes.FLOAT();
        DecimalType decimalType = DataTypes.DECIMAL(38, 0);
        BigIntType bigIntType = DataTypes.BIGINT();
        IntType intType = DataTypes.INT();
        SmallIntType smallIntType = DataTypes.SMALLINT();
        TinyIntType tinyIntType = DataTypes.TINYINT();
        ZonedTimestampType timestampTzType = DataTypes.TIMESTAMP_TZ(9);
        LocalZonedTimestampType timestampLtzType = DataTypes.TIMESTAMP_LTZ(9);
        TimestampType timestampType = DataTypes.TIMESTAMP(9);
        DateType dateType = DataTypes.DATE();
        HashMap<Class<? extends DataType>, List<DataType>> mergingTree = new HashMap<Class<? extends DataType>, List<DataType>>();
        mergingTree.put((Class<? extends DataType>)VarCharType.class, (List<DataType>)ImmutableList.of((Object)stringType));
        mergingTree.put((Class<? extends DataType>)CharType.class, (List<DataType>)ImmutableList.of((Object)stringType));
        mergingTree.put((Class<? extends DataType>)BooleanType.class, (List<DataType>)ImmutableList.of((Object)stringType));
        mergingTree.put((Class<? extends DataType>)BinaryType.class, (List<DataType>)ImmutableList.of((Object)stringType));
        mergingTree.put((Class<? extends DataType>)VarBinaryType.class, (List<DataType>)ImmutableList.of((Object)stringType));
        mergingTree.put((Class<? extends DataType>)DoubleType.class, (List<DataType>)ImmutableList.of((Object)doubleType, (Object)stringType));
        mergingTree.put((Class<? extends DataType>)FloatType.class, (List<DataType>)ImmutableList.of((Object)floatType, (Object)doubleType, (Object)stringType));
        mergingTree.put((Class<? extends DataType>)DecimalType.class, (List<DataType>)ImmutableList.of((Object)stringType));
        mergingTree.put((Class<? extends DataType>)BigIntType.class, (List<DataType>)ImmutableList.of((Object)bigIntType, (Object)decimalType, (Object)doubleType, (Object)stringType));
        mergingTree.put((Class<? extends DataType>)IntType.class, (List<DataType>)ImmutableList.of((Object)intType, (Object)bigIntType, (Object)decimalType, (Object)doubleType, (Object)stringType));
        mergingTree.put((Class<? extends DataType>)SmallIntType.class, (List<DataType>)ImmutableList.of((Object)smallIntType, (Object)intType, (Object)bigIntType, (Object)decimalType, (Object)floatType, (Object)doubleType, (Object)stringType));
        mergingTree.put((Class<? extends DataType>)TinyIntType.class, (List<DataType>)ImmutableList.of((Object)tinyIntType, (Object)smallIntType, (Object)intType, (Object)bigIntType, (Object)decimalType, (Object)floatType, (Object)doubleType, (Object)stringType));
        mergingTree.put((Class<? extends DataType>)ZonedTimestampType.class, (List<DataType>)ImmutableList.of((Object)timestampTzType, (Object)stringType));
        mergingTree.put((Class<? extends DataType>)LocalZonedTimestampType.class, (List<DataType>)ImmutableList.of((Object)timestampLtzType, (Object)timestampTzType, (Object)stringType));
        mergingTree.put((Class<? extends DataType>)TimestampType.class, (List<DataType>)ImmutableList.of((Object)timestampType, (Object)timestampLtzType, (Object)timestampTzType, (Object)stringType));
        mergingTree.put((Class<? extends DataType>)DateType.class, (List<DataType>)ImmutableList.of((Object)dateType, (Object)timestampType, (Object)timestampLtzType, (Object)timestampTzType, (Object)stringType));
        mergingTree.put((Class<? extends DataType>)TimeType.class, (List<DataType>)ImmutableList.of((Object)stringType));
        mergingTree.put((Class<? extends DataType>)RowType.class, (List<DataType>)ImmutableList.of((Object)stringType));
        mergingTree.put((Class<? extends DataType>)ArrayType.class, (List<DataType>)ImmutableList.of((Object)stringType));
        mergingTree.put((Class<? extends DataType>)MapType.class, (List<DataType>)ImmutableList.of((Object)stringType));
        return mergingTree;
    }
}

