package org.apache.flink.table.planner.connectors;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
import org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
import org.apache.flink.table.planner.utils.RowLevelModificationContextUtils;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;

@Internal
/* loaded from: input_file:org/apache/flink/table/planner/connectors/DynamicSourceUtils.class */
public final class DynamicSourceUtils {
    public static RelNode convertDataStreamToRel(boolean z, ReadableConfig readableConfig, FlinkRelBuilder flinkRelBuilder, ContextResolvedTable contextResolvedTable, DataStream<?> dataStream, DataType dataType, boolean z2, ChangelogMode changelogMode) {
        return convertSourceToRel(z, readableConfig, flinkRelBuilder, contextResolvedTable, FlinkStatistic.unknown(contextResolvedTable.getResolvedSchema()).build(), Collections.emptyList(), new ExternalDynamicSource(contextResolvedTable.getIdentifier(), dataStream, dataType, z2, changelogMode));
    }

    public static RelNode convertSourceToRel(boolean z, ReadableConfig readableConfig, FlinkRelBuilder flinkRelBuilder, ContextResolvedTable contextResolvedTable, FlinkStatistic flinkStatistic, List<RelHint> list, DynamicTableSource dynamicTableSource) {
        String asSummaryString = contextResolvedTable.getIdentifier().asSummaryString();
        ResolvedCatalogTable resolvedTable = contextResolvedTable.getResolvedTable();
        ArrayList arrayList = new ArrayList();
        prepareDynamicSource(asSummaryString, resolvedTable, dynamicTableSource, z, readableConfig, arrayList);
        pushTableScan(z, flinkRelBuilder, contextResolvedTable, flinkStatistic, list, dynamicTableSource, arrayList);
        ResolvedSchema resolvedSchema = contextResolvedTable.getResolvedSchema();
        if (!resolvedSchema.getColumns().stream().allMatch((v0) -> {
            return v0.isPhysical();
        })) {
            pushMetadataProjection(flinkRelBuilder, resolvedSchema);
            pushGeneratedProjection(flinkRelBuilder, resolvedSchema);
        }
        if (!z && !resolvedSchema.getWatermarkSpecs().isEmpty()) {
            pushWatermarkAssigner(flinkRelBuilder, resolvedSchema);
        }
        return flinkRelBuilder.build();
    }

    public static void prepareDynamicSource(String str, ResolvedCatalogTable resolvedCatalogTable, DynamicTableSource dynamicTableSource, boolean z, ReadableConfig readableConfig, List<SourceAbilitySpec> list) {
        ResolvedSchema resolvedSchema = resolvedCatalogTable.getResolvedSchema();
        validateAndApplyMetadata(str, resolvedSchema, dynamicTableSource, list);
        if (dynamicTableSource instanceof ScanTableSource) {
            validateScanSource(str, resolvedSchema, (ScanTableSource) dynamicTableSource, z, readableConfig);
            prepareRowLevelModificationScan(dynamicTableSource);
        }
    }

    public static List<Column.MetadataColumn> createRequiredMetadataColumns(ResolvedSchema resolvedSchema, DynamicTableSource dynamicTableSource) {
        Map<String, Column.MetadataColumn> createMetadataKeysToMetadataColumnsMap = createMetadataKeysToMetadataColumnsMap(resolvedSchema);
        Stream<String> stream = extractMetadataMap(dynamicTableSource).keySet().stream();
        createMetadataKeysToMetadataColumnsMap.getClass();
        Stream<String> filter = stream.filter((v1) -> {
            return r1.containsKey(v1);
        });
        createMetadataKeysToMetadataColumnsMap.getClass();
        return (List) filter.map((v1) -> {
            return r1.get(v1);
        }).collect(Collectors.toList());
    }

    public static Map<String, Column.MetadataColumn> createMetadataKeysToMetadataColumnsMap(ResolvedSchema resolvedSchema) {
        List<Column.MetadataColumn> extractMetadataColumns = extractMetadataColumns(resolvedSchema);
        HashMap hashMap = new HashMap();
        for (Column.MetadataColumn metadataColumn : extractMetadataColumns) {
            hashMap.put((String) metadataColumn.getMetadataKey().orElse(metadataColumn.getName()), metadataColumn);
        }
        return hashMap;
    }

    public static RowType createProducedType(ResolvedSchema resolvedSchema, DynamicTableSource dynamicTableSource) {
        Map<String, DataType> extractMetadataMap = extractMetadataMap(dynamicTableSource);
        return new RowType(false, (List) Stream.concat(resolvedSchema.toPhysicalRowDataType().getLogicalType().getFields().stream(), createRequiredMetadataColumns(resolvedSchema, dynamicTableSource).stream().map(metadataColumn -> {
            return new RowType.RowField(metadataColumn.getName(), ((DataType) extractMetadataMap.get(metadataColumn.getMetadataKey().orElse(metadataColumn.getName()))).getLogicalType());
        })).collect(Collectors.toList()));
    }

    public static boolean isUpsertSource(ResolvedSchema resolvedSchema, DynamicTableSource dynamicTableSource) {
        if (!(dynamicTableSource instanceof ScanTableSource)) {
            return false;
        }
        ChangelogMode changelogMode = ((ScanTableSource) dynamicTableSource).getChangelogMode();
        return (changelogMode.contains(RowKind.UPDATE_AFTER) && !changelogMode.contains(RowKind.UPDATE_BEFORE)) && resolvedSchema.getPrimaryKey().isPresent();
    }

    public static boolean isSourceChangeEventsDuplicate(ResolvedSchema resolvedSchema, DynamicTableSource dynamicTableSource, TableConfig tableConfig) {
        if (dynamicTableSource instanceof ScanTableSource) {
            return (!((ScanTableSource) dynamicTableSource).getChangelogMode().containsOnly(RowKind.INSERT) && !isUpsertSource(resolvedSchema, dynamicTableSource)) && ((Boolean) tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE)).booleanValue() && resolvedSchema.getPrimaryKey().isPresent();
        }
        return false;
    }

    public static boolean changelogNormalizeEnabled(boolean z, ResolvedSchema resolvedSchema, DynamicTableSource dynamicTableSource, TableConfig tableConfig) {
        return !z && (isUpsertSource(resolvedSchema, dynamicTableSource) || isSourceChangeEventsDuplicate(resolvedSchema, dynamicTableSource, tableConfig));
    }

    private static void pushWatermarkAssigner(FlinkRelBuilder flinkRelBuilder, ResolvedSchema resolvedSchema) {
        ExpressionConverter expressionConverter = new ExpressionConverter(flinkRelBuilder);
        RelDataType rowType = flinkRelBuilder.peek().getRowType();
        WatermarkSpec watermarkSpec = (WatermarkSpec) resolvedSchema.getWatermarkSpecs().get(0);
        flinkRelBuilder.watermark(rowType.getFieldNames().indexOf(watermarkSpec.getRowtimeAttribute()), (RexNode) watermarkSpec.getWatermarkExpression().accept(expressionConverter));
    }

    private static void pushGeneratedProjection(FlinkRelBuilder flinkRelBuilder, ResolvedSchema resolvedSchema) {
        ExpressionConverter expressionConverter = new ExpressionConverter(flinkRelBuilder);
        flinkRelBuilder.projectNamed((List) resolvedSchema.getColumns().stream().map(column -> {
            return column instanceof Column.ComputedColumn ? (RexNode) ((Column.ComputedColumn) column).getExpression().accept(expressionConverter) : flinkRelBuilder.field(column.getName());
        }).collect(Collectors.toList()), (Iterable) resolvedSchema.getColumns().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList()), true);
    }

    private static void pushMetadataProjection(FlinkRelBuilder flinkRelBuilder, ResolvedSchema resolvedSchema) {
        RexBuilder rexBuilder = flinkRelBuilder.getRexBuilder();
        flinkRelBuilder.projectNamed((List) resolvedSchema.getColumns().stream().filter(column -> {
            return !(column instanceof Column.ComputedColumn);
        }).map(column2 -> {
            return column2 instanceof Column.MetadataColumn ? rexBuilder.makeAbstractCast(flinkRelBuilder.getTypeFactory().createFieldTypeFromLogicalType(column2.getDataType().getLogicalType()), flinkRelBuilder.field(((Column.MetadataColumn) column2).getName())) : flinkRelBuilder.field(column2.getName());
        }).collect(Collectors.toList()), (List) resolvedSchema.getColumns().stream().filter(column3 -> {
            return !(column3 instanceof Column.ComputedColumn);
        }).map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList()), true);
    }

    private static void pushTableScan(boolean z, FlinkRelBuilder flinkRelBuilder, ContextResolvedTable contextResolvedTable, FlinkStatistic flinkStatistic, List<RelHint> list, DynamicTableSource dynamicTableSource, List<SourceAbilitySpec> list2) {
        flinkRelBuilder.push(LogicalTableScan.create(flinkRelBuilder.getCluster(), new TableSourceTable(flinkRelBuilder.getRelOptSchema(), flinkRelBuilder.getTypeFactory().buildRelNodeRowType(createProducedType(contextResolvedTable.getResolvedSchema(), dynamicTableSource)), flinkStatistic, dynamicTableSource, !z, contextResolvedTable, ShortcutUtils.unwrapContext(flinkRelBuilder), ShortcutUtils.unwrapTypeFactory(flinkRelBuilder), (SourceAbilitySpec[]) list2.toArray(new SourceAbilitySpec[0])), list));
    }

    private static Map<String, DataType> extractMetadataMap(DynamicTableSource dynamicTableSource) {
        return dynamicTableSource instanceof SupportsReadingMetadata ? ((SupportsReadingMetadata) dynamicTableSource).listReadableMetadata() : Collections.emptyMap();
    }

    public static List<Column.MetadataColumn> extractMetadataColumns(ResolvedSchema resolvedSchema) {
        Stream stream = resolvedSchema.getColumns().stream();
        Class<Column.MetadataColumn> cls = Column.MetadataColumn.class;
        Column.MetadataColumn.class.getClass();
        Stream filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<Column.MetadataColumn> cls2 = Column.MetadataColumn.class;
        Column.MetadataColumn.class.getClass();
        return (List) filter.map((v1) -> {
            return r1.cast(v1);
        }).collect(Collectors.toList());
    }

    public static void validateAndApplyMetadata(String str, ResolvedSchema resolvedSchema, DynamicTableSource dynamicTableSource, List<SourceAbilitySpec> list) {
        List<Column.MetadataColumn> extractMetadataColumns = extractMetadataColumns(resolvedSchema);
        if (extractMetadataColumns.isEmpty()) {
            return;
        }
        if (!(dynamicTableSource instanceof SupportsReadingMetadata)) {
            throw new ValidationException(String.format("Table '%s' declares metadata columns, but the underlying %s doesn't implement the %s interface. Therefore, metadata cannot be read from the given source.", dynamicTableSource.asSummaryString(), DynamicTableSource.class.getSimpleName(), SupportsReadingMetadata.class.getSimpleName()));
        }
        SupportsReadingMetadata supportsReadingMetadata = (SupportsReadingMetadata) dynamicTableSource;
        Map listReadableMetadata = supportsReadingMetadata.listReadableMetadata();
        extractMetadataColumns.forEach(metadataColumn -> {
            String str2 = (String) metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
            LogicalType logicalType = metadataColumn.getDataType().getLogicalType();
            DataType dataType = (DataType) listReadableMetadata.get(str2);
            if (dataType == null) {
                throw new ValidationException(String.format("Invalid metadata key '%s' in column '%s' of table '%s'. The %s class '%s' supports the following metadata keys for reading:\n%s", str2, metadataColumn.getName(), str, DynamicTableSource.class.getSimpleName(), dynamicTableSource.getClass().getName(), String.join("\n", listReadableMetadata.keySet())));
            }
            if (LogicalTypeCasts.supportsExplicitCast(dataType.getLogicalType(), logicalType)) {
                return;
            }
            if (!str2.equals(metadataColumn.getName())) {
                throw new ValidationException(String.format("Invalid data type for metadata column '%s' with metadata key '%s' of table '%s'. The column cannot be declared as '%s' because the type must be castable from metadata type '%s'.", metadataColumn.getName(), str2, str, dataType.getLogicalType(), logicalType));
            }
            throw new ValidationException(String.format("Invalid data type for metadata column '%s' of table '%s'. The column cannot be declared as '%s' because the type must be castable from metadata type '%s'.", metadataColumn.getName(), str, dataType.getLogicalType(), logicalType));
        });
        List list2 = (List) createRequiredMetadataColumns(resolvedSchema, dynamicTableSource).stream().map(metadataColumn2 -> {
            return (String) metadataColumn2.getMetadataKey().orElse(metadataColumn2.getName());
        }).collect(Collectors.toList());
        DataType fromLogicalToDataType = TypeConversions.fromLogicalToDataType(createProducedType(resolvedSchema, dynamicTableSource));
        list.add(new ReadingMetadataSpec(list2, fromLogicalToDataType.getLogicalType()));
        supportsReadingMetadata.applyReadableMetadata(list2, fromLogicalToDataType);
    }

    private static void validateScanSource(String str, ResolvedSchema resolvedSchema, ScanTableSource scanTableSource, boolean z, ReadableConfig readableConfig) {
        ChangelogMode changelogMode = scanTableSource.getChangelogMode();
        validateWatermarks(str, resolvedSchema);
        if (z) {
            validateScanSourceForBatch(str, scanTableSource, changelogMode);
        } else {
            validateScanSourceForStreaming(str, resolvedSchema, scanTableSource, changelogMode, readableConfig);
        }
    }

    private static void validateScanSourceForStreaming(String str, ResolvedSchema resolvedSchema, ScanTableSource scanTableSource, ChangelogMode changelogMode, ReadableConfig readableConfig) {
        boolean z = changelogMode != null;
        boolean z2 = z && changelogMode.contains(RowKind.UPDATE_BEFORE);
        boolean z3 = z && changelogMode.contains(RowKind.UPDATE_AFTER);
        if (!z2 && z3) {
            if (!resolvedSchema.getPrimaryKey().isPresent()) {
                throw new TableException(String.format("Table '%s' produces a changelog stream that contains UPDATE_AFTER but no UPDATE_BEFORE. This requires defining a primary key constraint on the table.", str));
            }
        } else {
            if (z2 && !z3) {
                throw new ValidationException(String.format("Invalid source for table '%s'. A %s doesn't support a changelog stream that contains UPDATE_BEFORE but no UPDATE_AFTER. Please adapt the implementation of class '%s'.", str, ScanTableSource.class.getSimpleName(), scanTableSource.getClass().getName()));
            }
            if (z && !changelogMode.containsOnly(RowKind.INSERT) && ((Boolean) readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE)).booleanValue() && !resolvedSchema.getPrimaryKey().isPresent()) {
                throw new TableException(String.format("Configuration '%s' is enabled which requires the changelog sources to define a PRIMARY KEY. However, table '%s' doesn't have a primary key.", ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE.key(), str));
            }
        }
    }

    private static void validateScanSourceForBatch(String str, ScanTableSource scanTableSource, ChangelogMode changelogMode) {
        if (!scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).isBounded()) {
            throw new ValidationException(String.format("Querying an unbounded table '%s' in batch mode is not allowed. The table source is unbounded.", str));
        }
        if (!changelogMode.containsOnly(RowKind.INSERT)) {
            throw new TableException(String.format("Querying a table in batch mode is currently only possible for INSERT-only table sources. But the source for table '%s' produces other changelog messages than just INSERT.", str));
        }
    }

    private static void validateWatermarks(String str, ResolvedSchema resolvedSchema) {
        if (resolvedSchema.getWatermarkSpecs().isEmpty()) {
            return;
        }
        if (resolvedSchema.getWatermarkSpecs().size() > 1) {
            throw new TableException(String.format("Currently only at most one WATERMARK declaration is supported for table '%s'.", str));
        }
        String rowtimeAttribute = ((WatermarkSpec) resolvedSchema.getWatermarkSpecs().get(0)).getRowtimeAttribute();
        if (rowtimeAttribute.contains(".")) {
            throw new TableException(String.format("A nested field '%s' cannot be declared as rowtime attribute for table '%s' right now.", rowtimeAttribute, str));
        }
    }

    private static void prepareRowLevelModificationScan(DynamicTableSource dynamicTableSource) {
        if (RowLevelModificationContextUtils.getModificationType() == null || !(dynamicTableSource instanceof SupportsRowLevelModificationScan)) {
            return;
        }
        RowLevelModificationContextUtils.setScanContext(((SupportsRowLevelModificationScan) dynamicTableSource).applyRowLevelModificationScan(RowLevelModificationContextUtils.getModificationType(), RowLevelModificationContextUtils.getScanContext()));
    }

    private DynamicSourceUtils() {
    }
}
