/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;

public class SchemaDerivation {
    private final SchemaManager schemaManager;
    private final Map<TableId, Set<TableId>> derivationMapping;
    private transient List<Tuple3<Selectors, String, String>> routes;

    public SchemaDerivation(SchemaManager schemaManager, List<RouteRule> routeRules, Map<TableId, Set<TableId>> derivationMapping) {
        this.schemaManager = schemaManager;
        this.routes = routeRules.stream().map(rule -> {
            String tableInclusions = rule.sourceTable;
            Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
            return new Tuple3((Object)selectors, (Object)rule.sinkTable, (Object)rule.replaceSymbol);
        }).collect(Collectors.toList());
        this.derivationMapping = derivationMapping;
    }

    public List<SchemaChangeEvent> applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
        ArrayList<SchemaChangeEvent> events = new ArrayList<SchemaChangeEvent>();
        TableId originalTable = schemaChangeEvent.tableId();
        boolean noRouteMatched = true;
        for (Tuple3<Selectors, String, String> route : this.routes) {
            if (!((Selectors)route.f0).isMatch(originalTable)) continue;
            noRouteMatched = false;
            TableId derivedTable = this.resolveReplacement(originalTable, route);
            Set originalTables = this.derivationMapping.computeIfAbsent(derivedTable, t -> new HashSet());
            originalTables.add(originalTable);
            if (originalTables.size() == 1) {
                SchemaChangeEvent derivedSchemaChangeEvent = ChangeEventUtils.recreateSchemaChangeEvent(schemaChangeEvent, derivedTable);
                events.add(derivedSchemaChangeEvent);
                continue;
            }
            Schema derivedTableSchema = this.schemaManager.getLatestEvolvedSchema(derivedTable).get();
            if (schemaChangeEvent instanceof CreateTableEvent) {
                events.addAll(this.handleCreateTableEvent((CreateTableEvent)schemaChangeEvent, derivedTableSchema, derivedTable));
                continue;
            }
            if (schemaChangeEvent instanceof AddColumnEvent) {
                events.addAll(this.handleAddColumnEvent((AddColumnEvent)schemaChangeEvent, derivedTableSchema, derivedTable));
                continue;
            }
            if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
                events.addAll(this.handleAlterColumnTypeEvent((AlterColumnTypeEvent)schemaChangeEvent, derivedTableSchema, derivedTable));
                continue;
            }
            if (schemaChangeEvent instanceof DropColumnEvent) continue;
            if (schemaChangeEvent instanceof RenameColumnEvent) {
                events.addAll(this.handleRenameColumnEvent((RenameColumnEvent)schemaChangeEvent, derivedTableSchema, derivedTable));
                continue;
            }
            throw new IllegalStateException(String.format("Unrecognized SchemaChangeEvent type: %s", schemaChangeEvent));
        }
        if (noRouteMatched) {
            return Collections.singletonList(schemaChangeEvent);
        }
        return events;
    }

    private TableId resolveReplacement(TableId originalTable, Tuple3<Selectors, String, String> route) {
        if (route.f2 != null) {
            return TableId.parse(((String)route.f1).replace((CharSequence)route.f2, originalTable.getTableName()));
        }
        return TableId.parse((String)route.f1);
    }

    public Map<TableId, Set<TableId>> getDerivationMapping() {
        return this.derivationMapping;
    }

    public static void serializeDerivationMapping(SchemaDerivation schemaDerivation, DataOutputStream out) throws IOException {
        TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
        Map<TableId, Set<TableId>> derivationMapping = schemaDerivation.getDerivationMapping();
        out.writeInt(derivationMapping.size());
        for (Map.Entry<TableId, Set<TableId>> entry : derivationMapping.entrySet()) {
            TableId routedTableId = entry.getKey();
            tableIdSerializer.serialize(routedTableId, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)out));
            Set<TableId> originalTableIds = entry.getValue();
            out.writeInt(originalTableIds.size());
            for (TableId originalTableId : originalTableIds) {
                tableIdSerializer.serialize(originalTableId, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)out));
            }
        }
    }

    public static Map<TableId, Set<TableId>> deserializerDerivationMapping(DataInputStream in) throws IOException {
        TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
        int derivationMappingSize = in.readInt();
        HashMap<TableId, Set<TableId>> derivationMapping = new HashMap<TableId, Set<TableId>>(derivationMappingSize);
        for (int i = 0; i < derivationMappingSize; ++i) {
            TableId routedTableId = tableIdSerializer.deserialize((DataInputView)new DataInputViewStreamWrapper((InputStream)in));
            int numOriginalTables = in.readInt();
            HashSet<TableId> originalTableIds = new HashSet<TableId>(numOriginalTables);
            for (int j = 0; j < numOriginalTables; ++j) {
                TableId originalTableId = tableIdSerializer.deserialize((DataInputView)new DataInputViewStreamWrapper((InputStream)in));
                originalTableIds.add(originalTableId);
            }
            derivationMapping.put(routedTableId, originalTableIds);
        }
        return derivationMapping;
    }

    private List<SchemaChangeEvent> handleRenameColumnEvent(RenameColumnEvent renameColumnEvent, Schema derivedTableSchema, TableId derivedTable) {
        ArrayList<AddColumnEvent.ColumnWithPosition> newColumns = new ArrayList<AddColumnEvent.ColumnWithPosition>();
        renameColumnEvent.getNameMapping().forEach((before, after) -> {
            if (derivedTableSchema.getColumn((String)after).isPresent()) {
                return;
            }
            Column existedColumn = derivedTableSchema.getColumn((String)before).get();
            newColumns.add(new AddColumnEvent.ColumnWithPosition(new PhysicalColumn((String)after, existedColumn.getType(), existedColumn.getComment())));
        });
        ArrayList<SchemaChangeEvent> schemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        if (!newColumns.isEmpty()) {
            AddColumnEvent derivedSchemaChangeEvent = new AddColumnEvent(derivedTable, newColumns);
            schemaChangeEvents.add(derivedSchemaChangeEvent);
        }
        return schemaChangeEvents;
    }

    private List<SchemaChangeEvent> handleAlterColumnTypeEvent(AlterColumnTypeEvent alterColumnTypeEvent, Schema derivedTableSchema, TableId derivedTable) {
        HashMap<String, DataType> typeDifference = new HashMap<String, DataType>();
        alterColumnTypeEvent.getTypeMapping().forEach((columnName, dataType) -> {
            DataType widerType;
            Column existedColumnInDerivedTable = derivedTableSchema.getColumn((String)columnName).get();
            if (!existedColumnInDerivedTable.getType().equals(dataType) && !(widerType = this.getWiderType(existedColumnInDerivedTable.getType(), (DataType)dataType)).equals(existedColumnInDerivedTable.getType())) {
                typeDifference.put(existedColumnInDerivedTable.getName(), widerType);
            }
        });
        ArrayList<SchemaChangeEvent> schemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        if (!typeDifference.isEmpty()) {
            AlterColumnTypeEvent derivedSchemaChangeEvent = new AlterColumnTypeEvent(derivedTable, typeDifference);
            schemaChangeEvents.add(derivedSchemaChangeEvent);
        }
        return schemaChangeEvents;
    }

    private List<SchemaChangeEvent> handleAddColumnEvent(AddColumnEvent addColumnEvent, Schema derivedTableSchema, TableId derivedTable) {
        ArrayList<AddColumnEvent.ColumnWithPosition> newColumns = new ArrayList<AddColumnEvent.ColumnWithPosition>();
        HashMap<String, DataType> newTypeMapping = new HashMap<String, DataType>();
        for (AddColumnEvent.ColumnWithPosition addedColumn : addColumnEvent.getAddedColumns()) {
            DataType widerType;
            Optional<Column> optionalColumnInDerivedTable = derivedTableSchema.getColumn(addedColumn.getAddColumn().getName());
            if (!optionalColumnInDerivedTable.isPresent()) {
                newColumns.add(new AddColumnEvent.ColumnWithPosition(addedColumn.getAddColumn()));
                continue;
            }
            Column existedColumnInDerivedTable = optionalColumnInDerivedTable.get();
            if (existedColumnInDerivedTable.getType().equals(addedColumn.getAddColumn().getType()) || (widerType = this.getWiderType(existedColumnInDerivedTable.getType(), addedColumn.getAddColumn().getType())).equals(existedColumnInDerivedTable.getType())) continue;
            newTypeMapping.put(existedColumnInDerivedTable.getName(), widerType);
        }
        ArrayList<SchemaChangeEvent> schemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        if (!newColumns.isEmpty()) {
            schemaChangeEvents.add(new AddColumnEvent(derivedTable, newColumns));
        }
        if (!newTypeMapping.isEmpty()) {
            schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping));
        }
        return schemaChangeEvents;
    }

    private List<SchemaChangeEvent> handleCreateTableEvent(CreateTableEvent createTableEvent, Schema derivedTableSchema, TableId derivedTable) {
        ArrayList<AddColumnEvent.ColumnWithPosition> newColumns = new ArrayList<AddColumnEvent.ColumnWithPosition>();
        HashMap<String, DataType> newTypeMapping = new HashMap<String, DataType>();
        for (Column column : createTableEvent.getSchema().getColumns()) {
            DataType widerType;
            Optional<Column> optionalColumnInDerivedTable = derivedTableSchema.getColumn(column.getName());
            if (!optionalColumnInDerivedTable.isPresent()) {
                newColumns.add(new AddColumnEvent.ColumnWithPosition(column));
                continue;
            }
            Column existedColumnInDerivedTable = optionalColumnInDerivedTable.get();
            if (existedColumnInDerivedTable.getType().equals(column.getType()) || (widerType = this.getWiderType(existedColumnInDerivedTable.getType(), column.getType())).equals(existedColumnInDerivedTable.getType())) continue;
            newTypeMapping.put(existedColumnInDerivedTable.getName(), widerType);
        }
        ArrayList<SchemaChangeEvent> schemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        if (!newColumns.isEmpty()) {
            schemaChangeEvents.add(new AddColumnEvent(derivedTable, newColumns));
        }
        if (!newTypeMapping.isEmpty()) {
            schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping));
        }
        return schemaChangeEvents;
    }

    private DataType getWiderType(DataType thisType, DataType thatType) {
        if (thisType.equals(thatType)) {
            return thisType;
        }
        if (thisType.is(DataTypeFamily.INTEGER_NUMERIC) && thatType.is(DataTypeFamily.INTEGER_NUMERIC)) {
            return DataTypes.BIGINT();
        }
        if (thisType.is(DataTypeFamily.CHARACTER_STRING) && thatType.is(DataTypeFamily.CHARACTER_STRING)) {
            return DataTypes.STRING();
        }
        if (thisType.is(DataTypeFamily.APPROXIMATE_NUMERIC) && thatType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
            return DataTypes.DOUBLE();
        }
        throw new IllegalStateException(String.format("Incompatible types: \"%s\" and \"%s\"", thisType, thatType));
    }
}

