/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.oceanbase.sink;

import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.Record;
import com.oceanbase.connector.flink.table.RecordSerializationSchema;
import com.oceanbase.connector.flink.table.TableInfo;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.oceanbase.sink.OceanBaseRowConvert;
import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;

public class OceanBaseEventSerializationSchema
implements RecordSerializationSchema<Event> {
    private final Cache<Schema, List<OceanBaseRowConvert.SerializationConverter>> cache = CacheBuilder.newBuilder().build();
    private final Map<TableId, Schema> schemaMaps = new HashMap<TableId, Schema>();
    public final ZoneId pipelineZoneId;

    public OceanBaseEventSerializationSchema(ZoneId zoneId) {
        this.pipelineZoneId = zoneId;
    }

    @Override
    public Record serialize(Event event) {
        if (event instanceof DataChangeEvent) {
            return this.applyDataChangeEvent((DataChangeEvent)event);
        }
        if (event instanceof SchemaChangeEvent) {
            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent)event;
            TableId tableId = schemaChangeEvent.tableId();
            if (event instanceof CreateTableEvent) {
                this.schemaMaps.put(tableId, ((CreateTableEvent)event).getSchema());
            } else {
                if (!this.schemaMaps.containsKey(tableId)) {
                    throw new RuntimeException("schema of " + tableId + " is not existed.");
                }
                this.schemaMaps.put(tableId, SchemaUtils.applySchemaChangeEvent((Schema)this.schemaMaps.get(tableId), (SchemaChangeEvent)schemaChangeEvent));
            }
        }
        return null;
    }

    private Record applyDataChangeEvent(DataChangeEvent event) {
        Object[] values;
        TableId tableId = event.tableId();
        Schema schema = this.schemaMaps.get(tableId);
        Preconditions.checkNotNull((Object)schema, (String)(event.tableId() + " is not existed"));
        OperationType op = event.op();
        boolean isDelete = false;
        switch (op) {
            case INSERT: 
            case UPDATE: 
            case REPLACE: {
                values = this.serializerRecord(event.after(), schema);
                break;
            }
            case DELETE: {
                values = this.serializerRecord(event.before(), schema);
                isDelete = true;
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unsupported Operation " + op);
            }
        }
        return this.buildDataChangeRecord(tableId, schema, values, isDelete);
    }

    private DataChangeRecord buildDataChangeRecord(TableId tableId, Schema schema, Object[] values, boolean isDelete) {
        Preconditions.checkState((boolean)Objects.nonNull(tableId.getSchemaName()), (Object)"Schema name cannot be null or empty.");
        com.oceanbase.connector.flink.table.TableId oceanBaseTableId = new com.oceanbase.connector.flink.table.TableId(tableId.getSchemaName(), tableId.getTableName());
        TableInfo tableInfo = new TableInfo(oceanBaseTableId, schema.primaryKeys(), schema.getColumnNames(), Lists.newArrayList(), null);
        return new DataChangeRecord(tableInfo, isDelete ? DataChangeRecord.Type.DELETE : DataChangeRecord.Type.UPSERT, values);
    }

    public Object[] serializerRecord(RecordData recordData, Schema schema) {
        List columns = schema.getColumns();
        Preconditions.checkState((columns.size() == recordData.getArity() ? 1 : 0) != 0, (Object)"Column size does not match the data size");
        Object[] values = new Object[columns.size()];
        List converters = null;
        try {
            converters = (List)this.cache.get((Object)schema, () -> columns.stream().map(column -> OceanBaseRowConvert.createNullableExternalConverter(column.getType(), this.pipelineZoneId)).collect(Collectors.toList()));
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Failed to obtain SerializationConverter cache", e);
        }
        for (int i = 0; i < recordData.getArity(); ++i) {
            Object field;
            values[i] = field = ((OceanBaseRowConvert.SerializationConverter)converters.get(i)).serialize(i, recordData);
        }
        return values;
    }
}

