/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.vitess.table;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.cdc.connectors.vitess.config.SchemaAdjustmentMode;
import org.apache.flink.cdc.connectors.vitess.config.TabletType;
import org.apache.flink.cdc.connectors.vitess.table.VitessTableSource;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;

public class VitessTableFactory
implements DynamicTableSourceFactory {
    private static final String IDENTIFIER = "vitess-cdc";
    private static final ConfigOption<String> HOSTNAME = ConfigOptions.key((String)"hostname").stringType().noDefaultValue().withDescription("Hostname of the VTGate\u2019s VStream server.");
    private static final ConfigOption<Integer> PORT = ConfigOptions.key((String)"port").intType().defaultValue((Object)15991).withDescription("Integer port number of the VTGate\u2019s VStream server.");
    private static final ConfigOption<String> KEYSPACE = ConfigOptions.key((String)"keyspace").stringType().noDefaultValue().withDescription("The name of the keyspace (a.k.a database). If no shard is specified, it reads change events from all shards in the keyspace.");
    private static final ConfigOption<String> USERNAME = ConfigOptions.key((String)"username").stringType().noDefaultValue().withDescription("The username of the Vitess database server (VTGate gRPC).");
    private static final ConfigOption<String> PASSWORD = ConfigOptions.key((String)"password").stringType().noDefaultValue().withDescription("The password of the Vitess database server (VTGate gRPC).");
    private static final ConfigOption<String> SHARD = ConfigOptions.key((String)"vitess.shard").stringType().noDefaultValue().withDescription("An optional name of the shard from which to stream the changes.");
    private static final ConfigOption<String> GTID = ConfigOptions.key((String)"vitess.gtid").stringType().defaultValue((Object)"current").withDescription("An optional GTID position for a shard to stream from.");
    private static final ConfigOption<Boolean> STOP_ON_RESHARD = ConfigOptions.key((String)"vitess.stop_on_reshard").booleanType().defaultValue((Object)false).withDescription("Controls Vitess flag stop_on_reshard.");
    private static final ConfigOption<Boolean> TOMBSTONES_ON_DELETE = ConfigOptions.key((String)"tombstones.on.delete").booleanType().defaultValue((Object)true).withDescription("Controls whether a delete event is followed by a tombstone event.");
    private static final ConfigOption<String> SCHEMA_NAME_ADJUSTMENT_MODE = ConfigOptions.key((String)"schema.name.adjustment.mode").stringType().defaultValue((Object)"avro").withDescription("Specifies how schema names should be adjusted for compatibility with the message converter used by the connector.");
    private static final ConfigOption<String> TABLET_TYPE = ConfigOptions.key((String)"tablet-type").stringType().defaultValue((Object)TabletType.RDONLY.name()).withDescription("The type of Tablet (hence MySQL) from which to stream the changes:");
    private static final ConfigOption<String> TABLE_NAME = ConfigOptions.key((String)"table-name").stringType().noDefaultValue().withDescription("Table name of the MYSQL database to monitor.");
    private static final ConfigOption<String> DECODING_PLUGIN_NAME = ConfigOptions.key((String)"decoding.plugin.name").stringType().defaultValue((Object)"decoderbufs").withDescription("The name of the Vitess logical decoding plug-in installed on the server.");
    private static final ConfigOption<String> NAME = ConfigOptions.key((String)"name").stringType().defaultValue((Object)"flink").withDescription("Unique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors. Default is flink.");

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validateExcept(new String[]{"debezium."});
        ReadableConfig config = helper.getOptions();
        String hostname = (String)config.get(HOSTNAME);
        int port = (Integer)config.get(PORT);
        String keyspace = (String)config.get(KEYSPACE);
        String tableName = (String)config.get(TABLE_NAME);
        String username = config.getOptional(USERNAME).orElse(null);
        String password = config.getOptional(PASSWORD).orElse(null);
        String shard = config.getOptional(SHARD).orElse(null);
        String gtid = (String)config.get(GTID);
        Boolean stopOnReshard = (Boolean)config.get(STOP_ON_RESHARD);
        Boolean tombstonesOnDelete = (Boolean)config.get(TOMBSTONES_ON_DELETE);
        SchemaAdjustmentMode schemaNameAdjustmentMode = SchemaAdjustmentMode.valueOf(((String)config.get(SCHEMA_NAME_ADJUSTMENT_MODE)).toUpperCase());
        TabletType tabletType = TabletType.valueOf((String)config.get(TABLET_TYPE));
        String pluginName = (String)config.get(DECODING_PLUGIN_NAME);
        String name = (String)config.get(NAME);
        ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
        return new VitessTableSource(physicalSchema, port, hostname, keyspace, tableName, username, password, shard, gtid, stopOnReshard, tombstonesOnDelete, schemaNameAdjustmentMode, tabletType, pluginName, name, DebeziumOptions.getDebeziumProperties((Map)context.getCatalogTable().getOptions()));
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet options = new HashSet();
        options.add(HOSTNAME);
        options.add(KEYSPACE);
        options.add(TABLE_NAME);
        return options;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet options = new HashSet();
        options.add(PORT);
        options.add(SHARD);
        options.add(GTID);
        options.add(STOP_ON_RESHARD);
        options.add(TOMBSTONES_ON_DELETE);
        options.add(SCHEMA_NAME_ADJUSTMENT_MODE);
        options.add(USERNAME);
        options.add(PASSWORD);
        options.add(TABLET_TYPE);
        options.add(DECODING_PLUGIN_NAME);
        options.add(NAME);
        return options;
    }
}

