/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.common;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaDerivator {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaDerivator.class);
    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1L);
    private final LoadingCache<Schema, List<RecordData.FieldGetter>> upstreamRecordGetterCache = CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build(new CacheLoader<Schema, List<RecordData.FieldGetter>>(){

        @Override
        @Nonnull
        public List<RecordData.FieldGetter> load(@Nonnull Schema schema) {
            return SchemaUtils.createFieldGetters(schema);
        }
    });
    private final LoadingCache<Schema, BinaryRecordDataGenerator> evolvedRecordWriterCache = CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build(new CacheLoader<Schema, BinaryRecordDataGenerator>(){

        @Override
        @Nonnull
        public BinaryRecordDataGenerator load(@Nonnull Schema schema) {
            return new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
        }
    });

    public static Set<TableId> getAffectedEvolvedTables(TableIdRouter tableIdRouter, Set<TableId> changedUpstreamTables) {
        return changedUpstreamTables.stream().flatMap(cut -> tableIdRouter.route((TableId)cut).stream()).collect(Collectors.toSet());
    }

    public static Set<TableId> reverseLookupDependingUpstreamTables(TableIdRouter tableIdRouter, TableId evolvedTableId, Set<TableId> upstreamSchemaTables) {
        return upstreamSchemaTables.stream().filter(kut -> tableIdRouter.route((TableId)kut).contains(evolvedTableId)).collect(Collectors.toSet());
    }

    public static Set<TableId> reverseLookupDependingUpstreamTables(TableIdRouter tableIdRouter, TableId evolvedTableId, Table<TableId, Integer, Schema> upstreamSchemaTable) {
        return upstreamSchemaTable.rowKeySet().stream().filter(kut -> tableIdRouter.route((TableId)kut).contains(evolvedTableId)).collect(Collectors.toSet());
    }

    public static Set<Schema> reverseLookupDependingUpstreamSchemas(TableIdRouter tableIdRouter, TableId evolvedTableId, SchemaManager schemaManager) {
        return SchemaDerivator.reverseLookupDependingUpstreamTables(tableIdRouter, evolvedTableId, schemaManager.getAllOriginalTables()).stream().map(utid -> schemaManager.getLatestOriginalSchema((TableId)utid).get()).collect(Collectors.toSet());
    }

    public static Set<Schema> reverseLookupDependingUpstreamSchemas(TableIdRouter tableIdRouter, TableId evolvedTableId, Table<TableId, Integer, Schema> upstreamSchemaTable) {
        return SchemaDerivator.reverseLookupDependingUpstreamTables(tableIdRouter, evolvedTableId, upstreamSchemaTable).stream().flatMap(utid -> upstreamSchemaTable.row((TableId)utid).values().stream()).collect(Collectors.toSet());
    }

    public static List<SchemaChangeEvent> normalizeSchemaChangeEvents(Schema oldSchema, List<SchemaChangeEvent> schemaChangeEvents, SchemaChangeBehavior schemaChangeBehavior, MetadataApplier metadataApplier) {
        List<SchemaChangeEvent> rewrittenSchemaChangeEvents = SchemaDerivator.rewriteSchemaChangeEvents(oldSchema, schemaChangeEvents, schemaChangeBehavior);
        rewrittenSchemaChangeEvents.forEach(evt -> {
            SchemaChangeEventWithPreSchema eventNeedsPreSchema;
            if (evt instanceof SchemaChangeEventWithPreSchema && !(eventNeedsPreSchema = (SchemaChangeEventWithPreSchema)evt).hasPreSchema()) {
                eventNeedsPreSchema.fillPreSchema(oldSchema);
            }
        });
        ArrayList<SchemaChangeEvent> finalSchemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        for (SchemaChangeEvent schemaChangeEvent : rewrittenSchemaChangeEvents) {
            if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())) {
                finalSchemaChangeEvents.add(schemaChangeEvent);
                continue;
            }
            LOG.info("Ignored schema change {}.", (Object)schemaChangeEvent);
        }
        return finalSchemaChangeEvents;
    }

    private static List<SchemaChangeEvent> rewriteSchemaChangeEvents(Schema oldSchema, List<SchemaChangeEvent> schemaChangeEvents, SchemaChangeBehavior schemaChangeBehavior) {
        switch (schemaChangeBehavior) {
            case EVOLVE: 
            case TRY_EVOLVE: 
            case EXCEPTION: {
                return schemaChangeEvents;
            }
            case LENIENT: {
                return schemaChangeEvents.stream().flatMap(evt -> SchemaDerivator.lenientizeSchemaChangeEvent(oldSchema, evt)).collect(Collectors.toList());
            }
            case IGNORE: {
                return schemaChangeEvents.stream().filter(e -> e instanceof CreateTableEvent).collect(Collectors.toList());
            }
        }
        throw new IllegalArgumentException("Unexpected schema change behavior: " + (Object)((Object)schemaChangeBehavior));
    }

    private static Stream<SchemaChangeEvent> lenientizeSchemaChangeEvent(Schema oldSchema, SchemaChangeEvent schemaChangeEvent) {
        TableId tableId = schemaChangeEvent.tableId();
        switch (schemaChangeEvent.getType()) {
            case ADD_COLUMN: {
                return SchemaDerivator.lenientizeAddColumnEvent((AddColumnEvent)schemaChangeEvent, tableId);
            }
            case DROP_COLUMN: {
                return SchemaDerivator.lenientizeDropColumnEvent(oldSchema, (DropColumnEvent)schemaChangeEvent, tableId);
            }
            case RENAME_COLUMN: {
                return SchemaDerivator.lenientizeRenameColumnEvent(oldSchema, (RenameColumnEvent)schemaChangeEvent, tableId);
            }
        }
        return Stream.of(schemaChangeEvent);
    }

    private static Stream<SchemaChangeEvent> lenientizeRenameColumnEvent(Schema oldSchema, RenameColumnEvent schemaChangeEvent, TableId tableId) {
        ArrayList<AddColumnEvent.ColumnWithPosition> appendColumns = new ArrayList<AddColumnEvent.ColumnWithPosition>();
        HashMap<String, DataType> convertNullableColumns = new HashMap<String, DataType>();
        schemaChangeEvent.getNameMapping().forEach((oldColName, newColName) -> {
            Column column = oldSchema.getColumn((String)oldColName).orElseThrow(() -> new IllegalArgumentException("Non-existed column " + oldColName + " in evolved schema."));
            if (!column.getType().isNullable()) {
                convertNullableColumns.put((String)oldColName, column.getType().nullable());
            }
            appendColumns.add(new AddColumnEvent.ColumnWithPosition(column.copy((String)newColName).copy(column.getType().nullable())));
        });
        ArrayList<SchemaChangeEvent> events = new ArrayList<SchemaChangeEvent>();
        events.add(new AddColumnEvent(tableId, appendColumns));
        if (!convertNullableColumns.isEmpty()) {
            events.add(new AlterColumnTypeEvent(tableId, convertNullableColumns));
        }
        return events.stream();
    }

    private static Stream<SchemaChangeEvent> lenientizeDropColumnEvent(Schema oldSchema, DropColumnEvent schemaChangeEvent, TableId tableId) {
        Map<String, DataType> convertNullableColumns = schemaChangeEvent.getDroppedColumnNames().stream().map(oldSchema::getColumn).flatMap(e -> e.map(Stream::of).orElse(Stream.empty())).filter(col -> !col.getType().isNullable()).collect(Collectors.toMap(Column::getName, column -> column.getType().nullable()));
        if (convertNullableColumns.isEmpty()) {
            return Stream.empty();
        }
        return Stream.of(new AlterColumnTypeEvent(tableId, convertNullableColumns));
    }

    private static Stream<SchemaChangeEvent> lenientizeAddColumnEvent(AddColumnEvent schemaChangeEvent, TableId tableId) {
        return Stream.of(new AddColumnEvent(tableId, schemaChangeEvent.getAddedColumns().stream().map(col -> new AddColumnEvent.ColumnWithPosition(Column.physicalColumn(col.getAddColumn().getName(), col.getAddColumn().getType().nullable(), col.getAddColumn().getComment(), col.getAddColumn().getDefaultValueExpression()))).collect(Collectors.toList())));
    }

    public Optional<DataChangeEvent> coerceDataRecord(String timezone, DataChangeEvent dataChangeEvent, Schema upstreamSchema, @Nullable Schema evolvedSchema) {
        Object[] coercedRow;
        List<Object> upstreamFields;
        if (evolvedSchema == null) {
            return Optional.empty();
        }
        if (upstreamSchema.equals(evolvedSchema)) {
            return Optional.of(dataChangeEvent);
        }
        List<RecordData.FieldGetter> upstreamSchemaReader = this.upstreamRecordGetterCache.getUnchecked(upstreamSchema);
        BinaryRecordDataGenerator evolvedSchemaWriter = this.evolvedRecordWriterCache.getUnchecked(evolvedSchema);
        if (dataChangeEvent.before() != null) {
            upstreamFields = SchemaUtils.restoreOriginalData(dataChangeEvent.before(), upstreamSchemaReader);
            coercedRow = SchemaMergingUtils.coerceRow(timezone, evolvedSchema, upstreamSchema, upstreamFields);
            dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, evolvedSchemaWriter.generate(coercedRow));
        }
        if (dataChangeEvent.after() != null) {
            upstreamFields = SchemaUtils.restoreOriginalData(dataChangeEvent.after(), upstreamSchemaReader);
            coercedRow = SchemaMergingUtils.coerceRow(timezone, evolvedSchema, upstreamSchema, upstreamFields);
            dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, evolvedSchemaWriter.generate(coercedRow));
        }
        return Optional.of(dataChangeEvent);
    }

    public static List<CreateTableEvent> deduceMergedCreateTableEvent(TableIdRouter router, List<CreateTableEvent> createTableEvents) {
        Set<TableId> originalTables = createTableEvents.stream().map(CreateTableEvent::tableId).collect(Collectors.toSet());
        List<Set<TableId>> sourceTablesByRouteRule = router.groupSourceTablesByRouteRule(originalTables);
        Map<TableId, Schema> sourceTableIdToSchemaMap = createTableEvents.stream().collect(Collectors.toMap(CreateTableEvent::tableId, CreateTableEvent::getSchema));
        HashMap<TableId, Schema> sinkTableIdToSchemaMap = new HashMap<TableId, Schema>();
        HashSet<TableId> routedTables = new HashSet<TableId>();
        for (Set<TableId> sourceTables : sourceTablesByRouteRule) {
            ArrayList<Schema> toBeMergedSchemas = new ArrayList<Schema>();
            for (TableId tableId : sourceTables) {
                toBeMergedSchemas.add(sourceTableIdToSchemaMap.get(tableId));
                routedTables.add(tableId);
            }
            if (toBeMergedSchemas.isEmpty()) continue;
            Schema mergedSchema = SchemaMergingUtils.getCommonSchema(toBeMergedSchemas);
            for (TableId tableId : sourceTables) {
                List<TableId> sinkTableIds = router.route(tableId);
                for (TableId sinkTableId : sinkTableIds) {
                    sinkTableIdToSchemaMap.put(sinkTableId, mergedSchema);
                }
            }
        }
        for (TableId tableId : originalTables) {
            if (sinkTableIdToSchemaMap.containsKey(tableId) || routedTables.contains(tableId)) continue;
            sinkTableIdToSchemaMap.put(tableId, sourceTableIdToSchemaMap.get(tableId));
        }
        return sinkTableIdToSchemaMap.entrySet().stream().map(entry -> new CreateTableEvent((TableId)entry.getKey(), (Schema)entry.getValue())).collect(Collectors.toList());
    }
}

