/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.postgres.factory;

import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.connectors.base.options.SourceOptions;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions;
import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import org.apache.flink.cdc.connectors.postgres.table.PostgreSQLReadableMetadata;
import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PostgresDataSourceFactory
implements DataSourceFactory {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresDataSourceFactory.class);
    public static final String IDENTIFIER = "postgres";
    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
    private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
    private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
    private static final String SCAN_STARTUP_MODE_VALUE_COMMITTED_OFFSET = "committed-offset";

    public DataSource createDataSource(Factory.Context context) {
        FactoryHelper.createFactoryHelper((Factory)this, (Factory.Context)context).validateExcept(new String[]{"jdbc.properties.", "debezium."});
        Configuration config = context.getFactoryConfiguration();
        String hostname = (String)config.get(PostgresDataSourceOptions.HOSTNAME);
        int port = (Integer)config.get(PostgresDataSourceOptions.PG_PORT);
        String pluginName = (String)config.get(PostgresDataSourceOptions.DECODING_PLUGIN_NAME);
        String slotName = (String)config.get(PostgresDataSourceOptions.SLOT_NAME);
        String username = (String)config.get(PostgresDataSourceOptions.USERNAME);
        String password = (String)config.get(PostgresDataSourceOptions.PASSWORD);
        String chunkKeyColumn = (String)config.get(PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
        String tables = (String)config.get(PostgresDataSourceOptions.TABLES);
        ZoneId serverTimeZone = PostgresDataSourceFactory.getServerTimeZone(config);
        String tablesExclude = (String)config.get(PostgresDataSourceOptions.TABLES_EXCLUDE);
        Duration heartbeatInterval = (Duration)config.get(PostgresDataSourceOptions.HEARTBEAT_INTERVAL);
        StartupOptions startupOptions = PostgresDataSourceFactory.getStartupOptions(config);
        int fetchSize = (Integer)config.get(PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE);
        int splitSize = (Integer)config.get(PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
        int splitMetaGroupSize = (Integer)config.get(PostgresDataSourceOptions.CHUNK_META_GROUP_SIZE);
        double distributionFactorUpper = (Double)config.get(PostgresDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
        double distributionFactorLower = (Double)config.get(PostgresDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
        boolean closeIdleReaders = (Boolean)config.get(PostgresDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
        boolean isAssignUnboundedChunkFirst = (Boolean)config.get(PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
        Duration connectTimeout = (Duration)config.get(PostgresDataSourceOptions.CONNECT_TIMEOUT);
        int connectMaxRetries = (Integer)config.get(PostgresDataSourceOptions.CONNECT_MAX_RETRIES);
        int connectionPoolSize = (Integer)config.get(PostgresDataSourceOptions.CONNECTION_POOL_SIZE);
        boolean skipSnapshotBackfill = (Boolean)config.get(PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
        int lsnCommitCheckpointsDelay = (Integer)config.get(PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
        this.validateIntegerOption(PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
        this.validateIntegerOption(PostgresDataSourceOptions.CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
        this.validateIntegerOption(PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
        this.validateIntegerOption(PostgresDataSourceOptions.CONNECTION_POOL_SIZE, connectionPoolSize, 1);
        this.validateIntegerOption(PostgresDataSourceOptions.CONNECT_MAX_RETRIES, connectMaxRetries, 0);
        this.validateDistributionFactorUpper(distributionFactorUpper);
        this.validateDistributionFactorLower(distributionFactorLower);
        Map configMap = config.toMap();
        Optional<String> databaseName = this.getValidateDatabaseName(tables);
        PostgresSourceConfigFactory configFactory = PostgresSourceBuilder.PostgresIncrementalSource.builder().hostname(hostname).port(port).database(databaseName.get()).schemaList(".*").tableList(".*").username(username).password(password).decodingPluginName(pluginName).slotName(slotName).serverTimeZone(serverTimeZone.getId()).debeziumProperties(DebeziumOptions.getDebeziumProperties(configMap)).splitSize(splitSize).splitMetaGroupSize(splitMetaGroupSize).distributionFactorUpper(distributionFactorUpper).distributionFactorLower(distributionFactorLower).fetchSize(fetchSize).connectTimeout(connectTimeout).connectMaxRetries(connectMaxRetries).connectionPoolSize(connectionPoolSize).startupOptions(startupOptions).chunkKeyColumn(chunkKeyColumn).heartbeatInterval(heartbeatInterval).closeIdleReaders(closeIdleReaders).skipSnapshotBackfill(skipSnapshotBackfill).lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay).assignUnboundedChunkFirst(isAssignUnboundedChunkFirst).getConfigFactory();
        List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null);
        Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
        List<String> capturedTables = PostgresDataSourceFactory.getTableList(tableIds, selectors);
        if (capturedTables.isEmpty()) {
            throw new IllegalArgumentException("Cannot find any table by the option 'tables' = " + tables);
        }
        if (tablesExclude != null) {
            Selectors selectExclude = new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
            List<String> excludeTables = PostgresDataSourceFactory.getTableList(tableIds, selectExclude);
            if (!excludeTables.isEmpty()) {
                capturedTables.removeAll(excludeTables);
            }
            if (capturedTables.isEmpty()) {
                throw new IllegalArgumentException("Cannot find any table with by the option 'tables.exclude'  = " + tablesExclude);
            }
        }
        configFactory.tableList(capturedTables.toArray(new String[0]));
        String metadataList = (String)config.get(PostgresDataSourceOptions.METADATA_LIST);
        List<PostgreSQLReadableMetadata> readableMetadataList = this.listReadableMetadata(metadataList);
        return new PostgresDataSource(configFactory, readableMetadataList);
    }

    private List<PostgreSQLReadableMetadata> listReadableMetadata(String metadataList) {
        if (StringUtils.isNullOrWhitespaceOnly((String)metadataList)) {
            return new ArrayList<PostgreSQLReadableMetadata>();
        }
        Set readableMetadataList = Arrays.stream(metadataList.split(",")).map(String::trim).collect(Collectors.toSet());
        ArrayList<PostgreSQLReadableMetadata> foundMetadata = new ArrayList<PostgreSQLReadableMetadata>();
        for (PostgreSQLReadableMetadata metadata : PostgreSQLReadableMetadata.values()) {
            if (!readableMetadataList.contains(metadata.getKey())) continue;
            foundMetadata.add(metadata);
            readableMetadataList.remove(metadata.getKey());
        }
        if (readableMetadataList.isEmpty()) {
            return foundMetadata;
        }
        throw new IllegalArgumentException(String.format("[%s] cannot be found in postgresSQL metadata.", String.join((CharSequence)", ", readableMetadataList)));
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet options = new HashSet();
        options.add(PostgresDataSourceOptions.HOSTNAME);
        options.add(PostgresDataSourceOptions.USERNAME);
        options.add(PostgresDataSourceOptions.PASSWORD);
        options.add(PostgresDataSourceOptions.TABLES);
        options.add(PostgresDataSourceOptions.SLOT_NAME);
        return options;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet options = new HashSet();
        options.add(PostgresDataSourceOptions.PG_PORT);
        options.add(PostgresDataSourceOptions.TABLES_EXCLUDE);
        options.add(PostgresDataSourceOptions.DECODING_PLUGIN_NAME);
        options.add(PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
        options.add(PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE);
        options.add(PostgresDataSourceOptions.SCAN_STARTUP_MODE);
        options.add(PostgresDataSourceOptions.SERVER_TIME_ZONE);
        options.add(PostgresDataSourceOptions.CONNECT_TIMEOUT);
        options.add(PostgresDataSourceOptions.CONNECT_MAX_RETRIES);
        options.add(PostgresDataSourceOptions.CONNECTION_POOL_SIZE);
        options.add(PostgresDataSourceOptions.HEARTBEAT_INTERVAL);
        options.add(PostgresDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
        options.add(PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
        options.add(PostgresDataSourceOptions.CHUNK_META_GROUP_SIZE);
        options.add(PostgresDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
        options.add(PostgresDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
        options.add(PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
        options.add(PostgresDataSourceOptions.METADATA_LIST);
        options.add(PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
        return options;
    }

    public String identifier() {
        return IDENTIFIER;
    }

    private static List<String> getTableList(@Nullable List<TableId> tableIdList, Selectors selectors) {
        return tableIdList.stream().filter(arg_0 -> ((Selectors)selectors).isMatch(arg_0)).map(TableId::toString).collect(Collectors.toList());
    }

    private void validateIntegerOption(ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
        Preconditions.checkState((optionValue > exclusiveMin ? 1 : 0) != 0, (Object)String.format("The value of option '%s' must larger than %d, but is %d", option.key(), exclusiveMin, optionValue));
    }

    private static StartupOptions getStartupOptions(Configuration config) {
        String modeString = (String)config.get(PostgresDataSourceOptions.SCAN_STARTUP_MODE);
        switch (modeString.toLowerCase()) {
            case "initial": {
                return StartupOptions.initial();
            }
            case "snapshot": {
                return StartupOptions.snapshot();
            }
            case "latest-offset": {
                return StartupOptions.latest();
            }
            case "committed-offset": {
                return StartupOptions.committed();
            }
        }
        throw new ValidationException(String.format("Invalid value for option '%s'. Supported values are [%s, %s, %s], but was: %s", SourceOptions.SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE_VALUE_INITIAL, SCAN_STARTUP_MODE_VALUE_SNAPSHOT, SCAN_STARTUP_MODE_VALUE_LATEST, modeString));
    }

    private void validateDistributionFactorUpper(double distributionFactorUpper) {
        Preconditions.checkState((ObjectUtils.doubleCompare(distributionFactorUpper, 1.0) >= 0 ? 1 : 0) != 0, (Object)String.format("The value of option '%s' must larger than or equals %s, but is %s", PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(), 1.0, distributionFactorUpper));
    }

    private void validateDistributionFactorLower(double distributionFactorLower) {
        Preconditions.checkState((ObjectUtils.doubleCompare(distributionFactorLower, 0.0) >= 0 && ObjectUtils.doubleCompare(distributionFactorLower, 1.0) <= 0 ? 1 : 0) != 0, (Object)String.format("The value of option '%s' must between %s and %s inclusively, but is %s", PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(), 0.0, 1.0, distributionFactorLower));
    }

    private Optional<String> getValidateDatabaseName(String tables) {
        if (tables == null || tables.trim().isEmpty()) {
            throw new IllegalArgumentException("Parameter tables cannot be null or empty");
        }
        CharSequence[] tableNames = tables.split(",");
        String dbName = null;
        for (String string : tableNames) {
            String trimmedTableName = string.trim();
            if (!trimmedTableName.contains(".")) continue;
            String[] tableNameParts = trimmedTableName.split("(?<!\\\\)\\.", -1);
            Preconditions.checkState((tableNameParts.length == 3 ? 1 : 0) != 0, (Object)String.format("Tables format must db.schema.table, can not 'tables' = %s", PostgresDataSourceOptions.TABLES.key()));
            String currentDbName = tableNameParts[0];
            Preconditions.checkState((boolean)this.isValidPostgresDbName(currentDbName), (Object)String.format("%s is not a valid PostgreSQL database name", currentDbName));
            if (dbName == null) {
                dbName = currentDbName;
                continue;
            }
            Preconditions.checkState((boolean)dbName.equals(currentDbName), (String)"The value of option `%s` is `%s`, but not all table names have the same database name", (Object[])new Object[]{PostgresDataSourceOptions.TABLES.key(), String.join((CharSequence)",", tableNames)});
        }
        return Optional.ofNullable(dbName);
    }

    private boolean isValidPostgresDbName(String dbName) {
        if (dbName == null || dbName.length() > 63) {
            return false;
        }
        return dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$]*");
    }

    private static ZoneId getServerTimeZone(Configuration config) {
        String serverTimeZone = (String)config.get(PostgresDataSourceOptions.SERVER_TIME_ZONE);
        if (serverTimeZone != null) {
            return ZoneId.of(serverTimeZone);
        }
        LOG.warn("{} is not set, which might cause data inconsistencies for time-related fields.", (Object)PostgresDataSourceOptions.SERVER_TIME_ZONE.key());
        return ZoneId.systemDefault();
    }
}

