/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.factories;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.utils.FilterUtils;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import scala.collection.Seq;

public final class TestValuesTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    private static final AtomicInteger idCounter = new AtomicInteger(0);
    private static final Map<String, Collection<Row>> registeredData = new HashMap<String, Collection<Row>>();
    private static final Map<String, Collection<RowData>> registeredRowData = new HashMap<String, Collection<RowData>>();
    public static final AtomicInteger RESOURCE_COUNTER = new AtomicInteger();
    public static final String IDENTIFIER = "values";
    private static final ConfigOption<String> DATA_ID = ConfigOptions.key((String)"data-id").stringType().noDefaultValue();
    private static final ConfigOption<Boolean> BOUNDED = ConfigOptions.key((String)"bounded").booleanType().defaultValue((Object)false);
    private static final ConfigOption<String> CHANGELOG_MODE = ConfigOptions.key((String)"changelog-mode").stringType().defaultValue((Object)"I");
    private static final ConfigOption<String> RUNTIME_SOURCE = ConfigOptions.key((String)"runtime-source").stringType().defaultValue((Object)"SourceFunction");
    private static final ConfigOption<Boolean> FAILING_SOURCE = ConfigOptions.key((String)"failing-source").booleanType().defaultValue((Object)false);
    private static final ConfigOption<String> RUNTIME_SINK = ConfigOptions.key((String)"runtime-sink").stringType().defaultValue((Object)"SinkFunction");
    private static final ConfigOption<String> TABLE_SOURCE_CLASS = ConfigOptions.key((String)"table-source-class").stringType().defaultValue((Object)"DEFAULT");
    private static final ConfigOption<String> TABLE_SINK_CLASS = ConfigOptions.key((String)"table-sink-class").stringType().defaultValue((Object)"DEFAULT");
    private static final ConfigOption<String> LOOKUP_FUNCTION_CLASS = ConfigOptions.key((String)"lookup-function-class").stringType().noDefaultValue();
    private static final ConfigOption<Boolean> ASYNC_ENABLED = ConfigOptions.key((String)"async").booleanType().defaultValue((Object)false);
    private static final ConfigOption<Boolean> DISABLE_LOOKUP = ConfigOptions.key((String)"disable-lookup").booleanType().defaultValue((Object)false);
    private static final ConfigOption<Boolean> SINK_INSERT_ONLY = ConfigOptions.key((String)"sink-insert-only").booleanType().defaultValue((Object)true);
    private static final ConfigOption<Integer> SINK_EXPECTED_MESSAGES_NUM = ConfigOptions.key((String)"sink-expected-messages-num").intType().defaultValue((Object)-1);
    private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED = ConfigOptions.key((String)"nested-projection-supported").booleanType().defaultValue((Object)false);
    private static final ConfigOption<List<String>> FILTERABLE_FIELDS = ConfigOptions.key((String)"filterable-fields").stringType().asList().noDefaultValue();
    private static final ConfigOption<Boolean> ENABLE_WATERMARK_PUSH_DOWN = ConfigOptions.key((String)"enable-watermark-push-down").booleanType().defaultValue((Object)false);
    private static final ConfigOption<Boolean> INTERNAL_DATA = ConfigOptions.key((String)"register-internal-data").booleanType().defaultValue((Object)false).withDescription("The registered data is internal type data, which can be collected by the source directly.");
    private static final ConfigOption<Map<String, String>> READABLE_METADATA = ConfigOptions.key((String)"readable-metadata").mapType().defaultValue(Collections.emptyMap()).withDescription("Optional map of 'metadata_key:data_type,...'. The order will be alphabetically. The metadata is part of the data when enabled.");
    private static final ConfigOption<Map<String, String>> WRITABLE_METADATA = ConfigOptions.key((String)"writable-metadata").mapType().defaultValue(Collections.emptyMap()).withDescription("Optional map of 'metadata_key:data_type'. The order will be alphabetically. The metadata is part of the data when enabled.");
    private static final ConfigOption<Boolean> SINK_DROP_LATE_EVENT = ConfigOptions.key((String)"sink.drop-late-event").booleanType().defaultValue((Object)false).withDeprecatedKeys(new String[]{"Option to determine whether to discard the late event."});
    private static final ConfigOption<Integer> SOURCE_NUM_ELEMENT_TO_SKIP = ConfigOptions.key((String)"source.num-element-to-skip").intType().defaultValue((Object)-1).withDeprecatedKeys(new String[]{"Option to define the number of elements to skip."});
    private static final ConfigOption<List<String>> PARTITION_LIST = ConfigOptions.key((String)"partition-list").stringType().asList().defaultValues((Object[])new String[0]);
    private static final ConfigOption<String> SINK_CHANGELOG_MODE_ENFORCED = ConfigOptions.key((String)"sink-changelog-mode-enforced").stringType().noDefaultValue();
    private static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

    public static String registerData(Collection<Row> data) {
        String id = String.valueOf(idCounter.incrementAndGet());
        registeredData.put(id, data);
        return id;
    }

    public static String registerData(Seq<Row> data) {
        return TestValuesTableFactory.registerData(JavaScalaConversionUtil.toJava(data));
    }

    public static String registerRowData(Collection<RowData> data) {
        String id = String.valueOf(idCounter.incrementAndGet());
        registeredRowData.put(id, data);
        return id;
    }

    public static String registerRowData(Seq<RowData> data) {
        return TestValuesTableFactory.registerRowData(JavaScalaConversionUtil.toJava(data));
    }

    public static List<String> getRawResults(String tableName) {
        return TestValuesRuntimeFunctions.getRawResults(tableName);
    }

    public static List<String> getOnlyRawResults() {
        return TestValuesRuntimeFunctions.getOnlyRawResults();
    }

    public static List<String> getResults(String tableName) {
        return TestValuesRuntimeFunctions.getResults(tableName);
    }

    public static List<Watermark> getWatermarkOutput(String tableName) {
        return TestValuesRuntimeFunctions.getWatermarks(tableName);
    }

    public static void clearAllData() {
        registeredData.clear();
        registeredRowData.clear();
        TestValuesRuntimeFunctions.clearResults();
    }

    public static Row changelogRow(String rowKind, Object ... values) {
        RowKind kind = TestValuesTableFactory.parseRowKind(rowKind);
        return Row.ofKind((RowKind)kind, (Object[])values);
    }

    private static RowKind parseRowKind(String rowKindShortString) {
        switch (rowKindShortString) {
            case "+I": {
                return RowKind.INSERT;
            }
            case "-U": {
                return RowKind.UPDATE_BEFORE;
            }
            case "+U": {
                return RowKind.UPDATE_AFTER;
            }
            case "-D": {
                return RowKind.DELETE;
            }
        }
        throw new IllegalArgumentException("Unsupported RowKind string: " + rowKindShortString);
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validate();
        ChangelogMode changelogMode = this.parseChangelogMode((String)helper.getOptions().get(CHANGELOG_MODE));
        String runtimeSource = (String)helper.getOptions().get(RUNTIME_SOURCE);
        boolean isBounded = (Boolean)helper.getOptions().get(BOUNDED);
        String dataId = (String)helper.getOptions().get(DATA_ID);
        String sourceClass = (String)helper.getOptions().get(TABLE_SOURCE_CLASS);
        boolean isAsync = (Boolean)helper.getOptions().get(ASYNC_ENABLED);
        String lookupFunctionClass = (String)helper.getOptions().get(LOOKUP_FUNCTION_CLASS);
        boolean disableLookup = (Boolean)helper.getOptions().get(DISABLE_LOOKUP);
        boolean nestedProjectionSupported = (Boolean)helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
        boolean enableWatermarkPushDown = (Boolean)helper.getOptions().get(ENABLE_WATERMARK_PUSH_DOWN);
        boolean failingSource = (Boolean)helper.getOptions().get(FAILING_SOURCE);
        int numElementToSkip = (Integer)helper.getOptions().get(SOURCE_NUM_ELEMENT_TO_SKIP);
        boolean internalData = (Boolean)helper.getOptions().get(INTERNAL_DATA);
        Optional filterableFields = helper.getOptions().getOptional(FILTERABLE_FIELDS);
        HashSet filterableFieldsSet = new HashSet();
        filterableFields.ifPresent(filterableFieldsSet::addAll);
        Map<String, DataType> readableMetadata = TestValuesTableFactory.convertToMetadataMap((Map)helper.getOptions().get(READABLE_METADATA), context.getClassLoader());
        if (sourceClass.equals("DEFAULT")) {
            Map<Map<String, String>, Collection<Row>> partition2Rows;
            if (internalData) {
                return new TestValuesScanTableSourceWithInternalData(dataId, isBounded);
            }
            Collection data = registeredData.getOrDefault(dataId, Collections.emptyList());
            List<Map<String, String>> partitions = TestValuesTableFactory.parsePartitionList((List)helper.getOptions().get(PARTITION_LIST));
            DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
            if (!partitions.isEmpty()) {
                partition2Rows = TestValuesTableFactory.mapPartitionToRow(producedDataType, data, partitions);
            } else {
                partitions = Collections.emptyList();
                partition2Rows = new HashMap<Map<String, String>, Collection<Row>>();
                partition2Rows.put(Collections.emptyMap(), data);
            }
            if (disableLookup) {
                if (enableWatermarkPushDown) {
                    return new TestValuesScanTableSourceWithWatermarkPushDown(producedDataType, changelogMode, runtimeSource, failingSource, partition2Rows, context.getObjectIdentifier().getObjectName(), nestedProjectionSupported, (int[][])null, Collections.emptyList(), filterableFieldsSet, numElementToSkip, Long.MAX_VALUE, partitions, readableMetadata, null);
                }
                return new TestValuesScanTableSource(producedDataType, changelogMode, isBounded, runtimeSource, failingSource, partition2Rows, nestedProjectionSupported, null, Collections.emptyList(), filterableFieldsSet, numElementToSkip, Long.MAX_VALUE, partitions, readableMetadata, null);
            }
            return new TestValuesScanLookupTableSource(producedDataType, changelogMode, isBounded, runtimeSource, failingSource, partition2Rows, isAsync, lookupFunctionClass, nestedProjectionSupported, null, Collections.emptyList(), filterableFieldsSet, numElementToSkip, Long.MAX_VALUE, partitions, readableMetadata, null);
        }
        try {
            return (DynamicTableSource)InstantiationUtil.instantiate((String)sourceClass, DynamicTableSource.class, (ClassLoader)Thread.currentThread().getContextClassLoader());
        }
        catch (FlinkException e) {
            throw new TableException("Can't instantiate class " + sourceClass, (Throwable)e);
        }
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validate();
        String sinkClass = (String)helper.getOptions().get(TABLE_SINK_CLASS);
        boolean isInsertOnly = (Boolean)helper.getOptions().get(SINK_INSERT_ONLY);
        String runtimeSink = (String)helper.getOptions().get(RUNTIME_SINK);
        int expectedNum = (Integer)helper.getOptions().get(SINK_EXPECTED_MESSAGES_NUM);
        Integer parallelism = (Integer)helper.getOptions().get(SINK_PARALLELISM);
        boolean dropLateEvent = (Boolean)helper.getOptions().get(SINK_DROP_LATE_EVENT);
        Map<String, DataType> writableMetadata = TestValuesTableFactory.convertToMetadataMap((Map)helper.getOptions().get(WRITABLE_METADATA), context.getClassLoader());
        ChangelogMode changelogMode = Optional.ofNullable(helper.getOptions().get(SINK_CHANGELOG_MODE_ENFORCED)).map(m -> this.parseChangelogMode((String)m)).orElse(null);
        DataType consumedType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
        int[] primaryKeyIndices = TableSchemaUtils.getPrimaryKeyIndices((TableSchema)context.getCatalogTable().getSchema());
        if (sinkClass.equals("DEFAULT")) {
            int rowTimeIndex = TestValuesTableFactory.validateAndExtractRowtimeIndex((CatalogTable)context.getCatalogTable(), dropLateEvent, isInsertOnly);
            return new TestValuesTableSink(consumedType, primaryKeyIndices, context.getObjectIdentifier().getObjectName(), isInsertOnly, runtimeSink, expectedNum, writableMetadata, parallelism, changelogMode, rowTimeIndex);
        }
        try {
            return (DynamicTableSink)InstantiationUtil.instantiate((String)sinkClass, DynamicTableSink.class, (ClassLoader)Thread.currentThread().getContextClassLoader());
        }
        catch (FlinkException e) {
            throw new TableException("Can't instantiate class " + sinkClass, (Throwable)e);
        }
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet(Arrays.asList(DATA_ID, CHANGELOG_MODE, BOUNDED, RUNTIME_SOURCE, TABLE_SOURCE_CLASS, FAILING_SOURCE, LOOKUP_FUNCTION_CLASS, ASYNC_ENABLED, DISABLE_LOOKUP, TABLE_SOURCE_CLASS, TABLE_SINK_CLASS, SINK_INSERT_ONLY, RUNTIME_SINK, SINK_EXPECTED_MESSAGES_NUM, NESTED_PROJECTION_SUPPORTED, FILTERABLE_FIELDS, PARTITION_LIST, READABLE_METADATA, SINK_PARALLELISM, SINK_CHANGELOG_MODE_ENFORCED, WRITABLE_METADATA, ENABLE_WATERMARK_PUSH_DOWN, SINK_DROP_LATE_EVENT, SOURCE_NUM_ELEMENT_TO_SKIP, INTERNAL_DATA));
    }

    private static int validateAndExtractRowtimeIndex(CatalogTable sinkTable, boolean dropLateEvent, boolean isInsertOnly) {
        if (!dropLateEvent) {
            return -1;
        }
        if (!isInsertOnly) {
            throw new ValidationException("Option 'sink.drop-late-event' only works for insert-only sink now.");
        }
        TableSchema schema = sinkTable.getSchema();
        List watermarkSpecs = schema.getWatermarkSpecs();
        if (watermarkSpecs.size() == 0) {
            throw new ValidationException("Please define the watermark in the schema that is used to indicate the rowtime column. The sink function will compare the rowtime and the current watermark to determine whether the event is late.");
        }
        String rowtimeName = ((WatermarkSpec)watermarkSpecs.get(0)).getRowtimeAttribute();
        return Arrays.asList(schema.getFieldNames()).indexOf(rowtimeName);
    }

    private static List<Map<String, String>> parsePartitionList(List<String> stringPartitions) {
        return stringPartitions.stream().map(partition -> {
            HashMap spec = new HashMap();
            Arrays.stream(partition.split(",")).forEach(pair -> {
                String[] split = pair.split(":");
                spec.put(split[0].trim(), split[1].trim());
            });
            return spec;
        }).collect(Collectors.toList());
    }

    private static Map<Map<String, String>, Collection<Row>> mapPartitionToRow(DataType producedDataType, Collection<Row> rows, List<Map<String, String>> partitions) {
        HashMap<Map<String, String>, Collection<Row>> map2 = new HashMap<Map<String, String>, Collection<Row>>();
        for (Map<String, String> partition : partitions) {
            map2.put(partition, new ArrayList());
        }
        List fieldNames = DataTypeUtils.flattenToNames((DataType)producedDataType);
        block1: for (Row row2 : rows) {
            for (Map<String, String> partition : partitions) {
                boolean match = true;
                for (Map.Entry<String, String> entry : partition.entrySet()) {
                    int index = fieldNames.indexOf(entry.getKey());
                    if (index < 0) {
                        throw new IllegalArgumentException(String.format("Illegal partition list: partition key %s is not found in schema.", entry.getKey()));
                    }
                    if (entry.getValue() != null) {
                        match = row2.getField(index) == null ? false : entry.getValue().equals(Objects.requireNonNull(row2.getField(index)).toString());
                    } else {
                        boolean bl = match = row2.getField(index) == null;
                    }
                    if (match) continue;
                    break;
                }
                if (!match) continue;
                ((Collection)map2.get(partition)).add(row2);
                continue block1;
            }
        }
        return map2;
    }

    private ChangelogMode parseChangelogMode(String string) {
        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
        block12: for (String split : string.split(",")) {
            switch (split.trim()) {
                case "I": {
                    builder.addContainedKind(RowKind.INSERT);
                    continue block12;
                }
                case "UB": {
                    builder.addContainedKind(RowKind.UPDATE_BEFORE);
                    continue block12;
                }
                case "UA": {
                    builder.addContainedKind(RowKind.UPDATE_AFTER);
                    continue block12;
                }
                case "D": {
                    builder.addContainedKind(RowKind.DELETE);
                    continue block12;
                }
                default: {
                    throw new IllegalArgumentException("Invalid ChangelogMode string: " + string);
                }
            }
        }
        return builder.build();
    }

    private static Map<String, DataType> convertToMetadataMap(Map<String, String> metadataOption, ClassLoader classLoader) {
        return metadataOption.keySet().stream().sorted().collect(Collectors.toMap(Function.identity(), key -> {
            String typeString = (String)metadataOption.get(key);
            LogicalType type = LogicalTypeParser.parse((String)typeString, (ClassLoader)classLoader);
            return TypeConversions.fromLogicalToDataType((LogicalType)type);
        }, (u, v) -> {
            throw new IllegalStateException();
        }, LinkedHashMap::new));
    }

    private static class FromRowDataSourceFunction
    implements SourceFunction<RowData> {
        private final String dataId;
        private volatile boolean isRunning = true;

        public FromRowDataSourceFunction(String dataId) {
            this.dataId = dataId;
        }

        public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception {
            Collection values = registeredRowData.getOrDefault(this.dataId, Collections.emptyList());
            Iterator valueIter = values.iterator();
            while (this.isRunning && valueIter.hasNext()) {
                ctx.collect(valueIter.next());
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    public static class TestSinkContextTableSink
    implements DynamicTableSink {
        public static final List<Long> ROWTIMES = new ArrayList<Long>();

        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return ChangelogMode.insertOnly();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            List<Long> list = ROWTIMES;
            synchronized (list) {
                ROWTIMES.clear();
            }
            SinkFunction<RowData> sinkFunction = new SinkFunction<RowData>(){
                private static final long serialVersionUID = -4871941979714977824L;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void invoke(RowData value, SinkFunction.Context context) throws Exception {
                    List<Long> list = ROWTIMES;
                    synchronized (list) {
                        ROWTIMES.add(context.timestamp());
                    }
                }
            };
            return SinkFunctionProvider.of((SinkFunction)sinkFunction);
        }

        public DynamicTableSink copy() {
            return new TestSinkContextTableSink();
        }

        public String asSummaryString() {
            return "TestSinkContextTableSink";
        }
    }

    private static class TestValuesTableSink
    implements DynamicTableSink,
    SupportsWritingMetadata,
    SupportsPartitioning {
        private DataType consumedDataType;
        private int[] primaryKeyIndices;
        private final String tableName;
        private final boolean isInsertOnly;
        private final String runtimeSink;
        private final int expectedNum;
        private final Map<String, DataType> writableMetadata;
        private final Integer parallelism;
        private final ChangelogMode changelogModeEnforced;
        private final int rowtimeIndex;

        private TestValuesTableSink(DataType consumedDataType, int[] primaryKeyIndices, String tableName, boolean isInsertOnly, String runtimeSink, int expectedNum, Map<String, DataType> writableMetadata, @Nullable Integer parallelism, @Nullable ChangelogMode changelogModeEnforced, int rowtimeIndex) {
            this.consumedDataType = consumedDataType;
            this.primaryKeyIndices = primaryKeyIndices;
            this.tableName = tableName;
            this.isInsertOnly = isInsertOnly;
            this.runtimeSink = runtimeSink;
            this.expectedNum = expectedNum;
            this.writableMetadata = writableMetadata;
            this.parallelism = parallelism;
            this.changelogModeEnforced = changelogModeEnforced;
            this.rowtimeIndex = rowtimeIndex;
        }

        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            if (this.changelogModeEnforced != null) {
                return this.changelogModeEnforced;
            }
            if (this.isInsertOnly) {
                return ChangelogMode.insertOnly();
            }
            if (this.primaryKeyIndices.length > 0) {
                return ChangelogMode.upsert();
            }
            return requestedMode;
        }

        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            TestValuesRuntimeFunctions.AbstractExactlyOnceSink sinkFunction;
            final DynamicTableSink.DataStructureConverter converter = context.createDataStructureConverter(this.consumedDataType);
            final Optional<Integer> parallelismOption = Optional.ofNullable(this.parallelism);
            Boolean isEnforcedInsertOnly = Optional.ofNullable(this.changelogModeEnforced).map(changelogMode -> changelogMode.equals((Object)ChangelogMode.insertOnly())).orElse(false);
            Boolean isInsertOnly = isEnforcedInsertOnly != false || this.isInsertOnly;
            if (isInsertOnly.booleanValue()) {
                Preconditions.checkArgument((this.expectedNum == -1 ? 1 : 0) != 0, (Object)("Appending Sink doesn't support '" + SINK_EXPECTED_MESSAGES_NUM.key() + "' yet."));
                switch (this.runtimeSink) {
                    case "SinkFunction": {
                        return new SinkFunctionProvider(){

                            public Optional<Integer> getParallelism() {
                                return parallelismOption;
                            }

                            public SinkFunction<RowData> createSinkFunction() {
                                return new TestValuesRuntimeFunctions.AppendingSinkFunction(tableName, converter, rowtimeIndex);
                            }
                        };
                    }
                    case "OutputFormat": {
                        return new OutputFormatProvider(){

                            public OutputFormat<RowData> createOutputFormat() {
                                return new TestValuesRuntimeFunctions.AppendingOutputFormat(tableName, converter);
                            }

                            public Optional<Integer> getParallelism() {
                                return parallelismOption;
                            }
                        };
                    }
                    case "DataStream": {
                        return new DataStreamSinkProvider(){

                            public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
                                return dataStream.addSink((SinkFunction)new TestValuesRuntimeFunctions.AppendingSinkFunction(tableName, converter, rowtimeIndex));
                            }

                            public Optional<Integer> getParallelism() {
                                return parallelismOption;
                            }
                        };
                    }
                }
                throw new IllegalArgumentException("Unsupported runtime sink class: " + this.runtimeSink);
            }
            assert (this.runtimeSink.equals("SinkFunction"));
            if (this.primaryKeyIndices.length > 0) {
                sinkFunction = new TestValuesRuntimeFunctions.KeyedUpsertingSinkFunction(this.tableName, converter, this.primaryKeyIndices, this.expectedNum);
            } else {
                Preconditions.checkArgument((this.expectedNum == -1 ? 1 : 0) != 0, (Object)("Retracting Sink doesn't support '" + SINK_EXPECTED_MESSAGES_NUM.key() + "' yet."));
                sinkFunction = new TestValuesRuntimeFunctions.RetractingSinkFunction(this.tableName, converter);
            }
            return SinkFunctionProvider.of((SinkFunction)sinkFunction);
        }

        public DynamicTableSink copy() {
            return new TestValuesTableSink(this.consumedDataType, this.primaryKeyIndices, this.tableName, this.isInsertOnly, this.runtimeSink, this.expectedNum, this.writableMetadata, this.parallelism, this.changelogModeEnforced, this.rowtimeIndex);
        }

        public String asSummaryString() {
            return "TestValues";
        }

        public Map<String, DataType> listWritableMetadata() {
            return this.writableMetadata;
        }

        public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
            this.consumedDataType = consumedDataType;
        }

        public void applyStaticPartition(Map<String, String> partition) {
        }

        public boolean requiresPartitionGrouping(boolean supportsGrouping) {
            return supportsGrouping;
        }
    }

    private static class TestValuesScanTableSourceWithInternalData
    implements ScanTableSource {
        private final String dataId;
        private final boolean bounded;

        public TestValuesScanTableSourceWithInternalData(String dataId, boolean bounded) {
            this.dataId = dataId;
            this.bounded = bounded;
        }

        public ChangelogMode getChangelogMode() {
            return ChangelogMode.insertOnly();
        }

        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
            FromRowDataSourceFunction sourceFunction = new FromRowDataSourceFunction(this.dataId);
            return SourceFunctionProvider.of((SourceFunction)sourceFunction, (boolean)this.bounded);
        }

        public DynamicTableSource copy() {
            return new TestValuesScanTableSourceWithInternalData(this.dataId, this.bounded);
        }

        public String asSummaryString() {
            return "TestValuesWithInternalData";
        }
    }

    public static class MockedLookupTableSource
    implements LookupTableSource {
        public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext context) {
            return null;
        }

        public DynamicTableSource copy() {
            return null;
        }

        public String asSummaryString() {
            return null;
        }
    }

    private static class TestValuesScanLookupTableSource
    extends TestValuesScanTableSource
    implements LookupTableSource {
        @Nullable
        private final String lookupFunctionClass;
        private final boolean isAsync;

        private TestValuesScanLookupTableSource(DataType producedDataType, ChangelogMode changelogMode, boolean bounded, String runtimeSource, boolean failingSource, Map<Map<String, String>, Collection<Row>> data, boolean isAsync, @Nullable String lookupFunctionClass, boolean nestedProjectionSupported, int[][] projectedFields, List<ResolvedExpression> filterPredicates, Set<String> filterableFields, int numElementToSkip, long limit, List<Map<String, String>> allPartitions, Map<String, DataType> readableMetadata, @Nullable int[] projectedMetadataFields) {
            super(producedDataType, changelogMode, bounded, runtimeSource, failingSource, data, nestedProjectionSupported, projectedFields, filterPredicates, filterableFields, numElementToSkip, limit, allPartitions, readableMetadata, projectedMetadataFields);
            this.lookupFunctionClass = lookupFunctionClass;
            this.isAsync = isAsync;
        }

        public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext context) {
            Collection rows;
            if (this.lookupFunctionClass != null) {
                try {
                    Class<?> clazz = Class.forName(this.lookupFunctionClass);
                    Object udtf = InstantiationUtil.instantiate(clazz);
                    if (udtf instanceof TableFunction) {
                        return TableFunctionProvider.of((TableFunction)((TableFunction)udtf));
                    }
                    return AsyncTableFunctionProvider.of((AsyncTableFunction)((AsyncTableFunction)udtf));
                }
                catch (ClassNotFoundException e) {
                    throw new IllegalArgumentException("Could not instantiate class: " + this.lookupFunctionClass);
                }
            }
            int[] lookupIndices = Arrays.stream(context.getKeys()).mapToInt(k -> k[0]).toArray();
            HashMap<Row, List<Row>> mapping = new HashMap<Row, List<Row>>();
            if (this.allPartitions.equals(Collections.EMPTY_LIST)) {
                rows = this.data.getOrDefault(Collections.EMPTY_MAP, Collections.EMPTY_LIST);
            } else {
                rows = new ArrayList();
                this.allPartitions.forEach(key -> rows.addAll(this.data.getOrDefault(key, new ArrayList())));
            }
            List data = new ArrayList(rows);
            if (this.numElementToSkip > 0) {
                data = this.numElementToSkip >= data.size() ? Collections.EMPTY_LIST : data.subList(this.numElementToSkip, data.size());
            }
            data.forEach(record -> {
                Row key = Row.of((Object[])Arrays.stream(lookupIndices).mapToObj(arg_0 -> ((Row)record).getField(arg_0)).toArray());
                ArrayList<Row> list = (ArrayList<Row>)mapping.get(key);
                if (list != null) {
                    list.add((Row)record);
                } else {
                    list = new ArrayList<Row>();
                    list.add((Row)record);
                    mapping.put(key, list);
                }
            });
            if (this.isAsync) {
                return AsyncTableFunctionProvider.of((AsyncTableFunction)new TestValuesRuntimeFunctions.AsyncTestValueLookupFunction(mapping));
            }
            return TableFunctionProvider.of((TableFunction)new TestValuesRuntimeFunctions.TestValuesLookupFunction(mapping));
        }

        @Override
        public DynamicTableSource copy() {
            return new TestValuesScanLookupTableSource(this.producedDataType, this.changelogMode, this.bounded, this.runtimeSource, this.failingSource, this.data, this.isAsync, this.lookupFunctionClass, this.nestedProjectionSupported, this.projectedPhysicalFields, this.filterPredicates, this.filterableFields, this.numElementToSkip, this.limit, this.allPartitions, this.readableMetadata, this.projectedMetadataFields);
        }
    }

    private static class TestValuesScanTableSourceWithWatermarkPushDown
    extends TestValuesScanTableSource
    implements SupportsWatermarkPushDown,
    SupportsSourceWatermark {
        private final String tableName;
        private WatermarkStrategy<RowData> watermarkStrategy;

        private TestValuesScanTableSourceWithWatermarkPushDown(DataType producedDataType, ChangelogMode changelogMode, String runtimeSource, boolean failingSource, Map<Map<String, String>, Collection<Row>> data, String tableName, boolean nestedProjectionSupported, @Nullable int[][] projectedPhysicalFields, List<ResolvedExpression> filterPredicates, Set<String> filterableFields, int numElementToSkip, long limit, List<Map<String, String>> allPartitions, Map<String, DataType> readableMetadata, @Nullable int[] projectedMetadataFields) {
            super(producedDataType, changelogMode, false, runtimeSource, failingSource, data, nestedProjectionSupported, projectedPhysicalFields, filterPredicates, filterableFields, numElementToSkip, limit, allPartitions, readableMetadata, projectedMetadataFields);
            this.tableName = tableName;
        }

        public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
            this.watermarkStrategy = watermarkStrategy;
        }

        public void applySourceWatermark() {
            this.watermarkStrategy = WatermarkStrategy.noWatermarks();
        }

        @Override
        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
            TypeInformation type = runtimeProviderContext.createTypeInformation(this.producedDataType);
            TypeSerializer serializer = type.createSerializer(new ExecutionConfig());
            DynamicTableSource.DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(this.producedDataType);
            converter.open(RuntimeConverter.Context.create((ClassLoader)TestValuesTableFactory.class.getClassLoader()));
            Collection<RowData> values = this.convertToRowData(converter);
            try {
                return SourceFunctionProvider.of((SourceFunction)new TestValuesRuntimeFunctions.FromElementSourceFunctionWithWatermark(this.tableName, (TypeSerializer<RowData>)serializer, values, this.watermarkStrategy), (boolean)false);
            }
            catch (IOException e) {
                throw new TableException("Fail to init source function", (Throwable)e);
            }
        }

        @Override
        public DynamicTableSource copy() {
            TestValuesScanTableSourceWithWatermarkPushDown newSource = new TestValuesScanTableSourceWithWatermarkPushDown(this.producedDataType, this.changelogMode, this.runtimeSource, this.failingSource, this.data, this.tableName, this.nestedProjectionSupported, this.projectedPhysicalFields, this.filterPredicates, this.filterableFields, this.numElementToSkip, this.limit, this.allPartitions, this.readableMetadata, this.projectedMetadataFields);
            newSource.watermarkStrategy = this.watermarkStrategy;
            return newSource;
        }
    }

    private static class TestValuesScanTableSource
    implements ScanTableSource,
    SupportsProjectionPushDown,
    SupportsFilterPushDown,
    SupportsLimitPushDown,
    SupportsPartitionPushDown,
    SupportsReadingMetadata {
        protected DataType producedDataType;
        protected final ChangelogMode changelogMode;
        protected final boolean bounded;
        protected final String runtimeSource;
        protected final boolean failingSource;
        protected Map<Map<String, String>, Collection<Row>> data;
        protected final boolean nestedProjectionSupported;
        @Nullable
        protected int[][] projectedPhysicalFields;
        protected List<ResolvedExpression> filterPredicates;
        protected final Set<String> filterableFields;
        protected long limit;
        protected int numElementToSkip;
        protected List<Map<String, String>> allPartitions;
        protected final Map<String, DataType> readableMetadata;
        @Nullable
        protected int[] projectedMetadataFields;

        private TestValuesScanTableSource(DataType producedDataType, ChangelogMode changelogMode, boolean bounded, String runtimeSource, boolean failingSource, Map<Map<String, String>, Collection<Row>> data, boolean nestedProjectionSupported, @Nullable int[][] projectedPhysicalFields, List<ResolvedExpression> filterPredicates, Set<String> filterableFields, int numElementToSkip, long limit, List<Map<String, String>> allPartitions, Map<String, DataType> readableMetadata, @Nullable int[] projectedMetadataFields) {
            this.producedDataType = producedDataType;
            this.changelogMode = changelogMode;
            this.bounded = bounded;
            this.runtimeSource = runtimeSource;
            this.failingSource = failingSource;
            this.data = data;
            this.nestedProjectionSupported = nestedProjectionSupported;
            this.projectedPhysicalFields = projectedPhysicalFields;
            this.filterPredicates = filterPredicates;
            this.filterableFields = filterableFields;
            this.numElementToSkip = numElementToSkip;
            this.limit = limit;
            this.allPartitions = allPartitions;
            this.readableMetadata = readableMetadata;
            this.projectedMetadataFields = projectedMetadataFields;
        }

        public ChangelogMode getChangelogMode() {
            return this.changelogMode;
        }

        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
            TypeInformation type = runtimeProviderContext.createTypeInformation(this.producedDataType);
            TypeSerializer serializer = type.createSerializer(new ExecutionConfig());
            DynamicTableSource.DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(this.producedDataType);
            converter.open(RuntimeConverter.Context.create((ClassLoader)TestValuesTableFactory.class.getClassLoader()));
            Collection<RowData> values = this.convertToRowData(converter);
            switch (this.runtimeSource) {
                case "SourceFunction": {
                    try {
                        Object sourceFunction = this.failingSource ? new FailingCollectionSource<RowData>(serializer, values, values.size() / 2) : new FromElementsFunction(serializer, values);
                        return SourceFunctionProvider.of((SourceFunction)sourceFunction, (boolean)this.bounded);
                    }
                    catch (IOException e) {
                        throw new TableException("Fail to init source function", (Throwable)e);
                    }
                }
                case "InputFormat": {
                    Preconditions.checkArgument((!this.failingSource ? 1 : 0) != 0, (Object)"Values InputFormat Source doesn't support as failing source.");
                    return InputFormatProvider.of((InputFormat)new CollectionInputFormat(values, serializer));
                }
                case "DataStream": {
                    Preconditions.checkArgument((!this.failingSource ? 1 : 0) != 0, (Object)"Values DataStream Source doesn't support as failing source.");
                    try {
                        final FromElementsFunction function = new FromElementsFunction(serializer, values);
                        return new DataStreamScanProvider(){

                            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
                                return execEnv.addSource((SourceFunction)function);
                            }

                            public boolean isBounded() {
                                return bounded;
                            }
                        };
                    }
                    catch (IOException e) {
                        throw new TableException("Fail to init data stream source", (Throwable)e);
                    }
                }
            }
            throw new IllegalArgumentException("Unsupported runtime source class: " + this.runtimeSource);
        }

        public boolean supportsNestedProjection() {
            return this.nestedProjectionSupported;
        }

        public void applyProjection(int[][] projectedFields) {
            this.producedDataType = DataTypeUtils.projectRow((DataType)this.producedDataType, (int[][])projectedFields);
            this.projectedPhysicalFields = projectedFields;
        }

        public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> filters) {
            ArrayList<ResolvedExpression> acceptedFilters = new ArrayList<ResolvedExpression>();
            ArrayList<ResolvedExpression> remainingFilters = new ArrayList<ResolvedExpression>();
            for (ResolvedExpression expr : filters) {
                if (FilterUtils.shouldPushDown(expr, this.filterableFields)) {
                    acceptedFilters.add(expr);
                    continue;
                }
                remainingFilters.add(expr);
            }
            this.filterPredicates = acceptedFilters;
            return SupportsFilterPushDown.Result.of(acceptedFilters, remainingFilters);
        }

        private Function<String, Comparable<?>> getValueGetter(Row row2) {
            List fieldNames = DataTypeUtils.flattenToNames((DataType)this.producedDataType);
            return fieldName -> {
                int idx = fieldNames.indexOf(fieldName);
                return (Comparable)row2.getField(idx);
            };
        }

        public DynamicTableSource copy() {
            return new TestValuesScanTableSource(this.producedDataType, this.changelogMode, this.bounded, this.runtimeSource, this.failingSource, this.data, this.nestedProjectionSupported, this.projectedPhysicalFields, this.filterPredicates, this.filterableFields, this.numElementToSkip, this.limit, this.allPartitions, this.readableMetadata, this.projectedMetadataFields);
        }

        public String asSummaryString() {
            return "TestValues";
        }

        protected Collection<RowData> convertToRowData(DynamicTableSource.DataStructureConverter converter) {
            ArrayList<RowData> result = new ArrayList<RowData>();
            List<Map<Object, Object>> keys = this.allPartitions.isEmpty() ? Collections.singletonList(Collections.emptyMap()) : this.allPartitions;
            int numRetained = 0;
            for (Map<Object, Object> partition : keys) {
                for (Row row2 : this.data.get(partition)) {
                    Row projectedRow;
                    RowData rowData;
                    if ((long)result.size() >= this.limit) {
                        return result;
                    }
                    boolean isRetained = FilterUtils.isRetainedAfterApplyingFilterPredicates(this.filterPredicates, this.getValueGetter(row2));
                    if (!isRetained || (rowData = (RowData)converter.toInternal((Object)(projectedRow = this.projectRow(row2)))) == null) continue;
                    if (numRetained >= this.numElementToSkip) {
                        rowData.setRowKind(row2.getKind());
                        result.add(rowData);
                    }
                    ++numRetained;
                }
            }
            return result;
        }

        private Row projectRow(Row row2) {
            int i;
            if (this.projectedPhysicalFields == null) {
                return row2;
            }
            int originPhysicalSize = row2.getArity() - this.readableMetadata.size();
            int newLength = this.projectedPhysicalFields.length + (this.projectedMetadataFields == null ? 0 : this.projectedMetadataFields.length);
            Object[] newValues = new Object[newLength];
            for (i = 0; i < this.projectedPhysicalFields.length; ++i) {
                Object field = row2;
                for (int dim : this.projectedPhysicalFields[i]) {
                    field = field.getField(dim);
                }
                newValues[i] = field;
            }
            for (i = this.projectedPhysicalFields.length; i < newValues.length; ++i) {
                newValues[i] = row2.getField(this.projectedMetadataFields[i - this.projectedPhysicalFields.length] + originPhysicalSize);
            }
            return Row.of((Object[])newValues);
        }

        public Optional<List<Map<String, String>>> listPartitions() {
            if (this.allPartitions.isEmpty()) {
                return Optional.empty();
            }
            return Optional.of(this.allPartitions);
        }

        public void applyPartitions(List<Map<String, String>> remainingPartitions) {
            if (this.allPartitions.isEmpty()) {
                if (!remainingPartitions.isEmpty()) {
                    this.allPartitions = remainingPartitions;
                    this.data = TestValuesTableFactory.mapPartitionToRow(this.producedDataType, this.data.get(Collections.EMPTY_MAP), remainingPartitions);
                } else {
                    this.data.put(Collections.emptyMap(), Collections.emptyList());
                }
            } else {
                this.allPartitions = remainingPartitions;
                if (remainingPartitions.isEmpty()) {
                    this.data.put(Collections.emptyMap(), Collections.emptyList());
                }
            }
        }

        public void applyLimit(long limit) {
            this.limit = limit;
        }

        public Map<String, DataType> listReadableMetadata() {
            return this.readableMetadata;
        }

        public void applyReadableMetadata(List<String> remainingMetadataKeys, DataType newProducedDataType) {
            this.producedDataType = newProducedDataType;
            ArrayList<String> allMetadataKeys = new ArrayList<String>(this.listReadableMetadata().keySet());
            this.projectedMetadataFields = remainingMetadataKeys.stream().mapToInt(allMetadataKeys::indexOf).toArray();
        }
    }
}

