/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.postgres.source;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresObjectUtils;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresTopicSelector;
import io.debezium.connector.postgresql.Utils;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.TopicSelector;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter;
import org.apache.flink.cdc.connectors.postgres.source.PostgresConnectionPoolFactory;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask;
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
import org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
import org.apache.flink.util.FlinkRuntimeException;

public class PostgresDialect
implements JdbcDataSourceDialect {
    private static final long serialVersionUID = 1L;
    private static final String CONNECTION_NAME = "postgres-cdc-connector";
    private final PostgresSourceConfig sourceConfig;
    private transient Tables.TableFilter filters;
    private transient CustomPostgresSchema schema;
    @Nullable
    private PostgresStreamFetchTask streamFetchTask;

    public PostgresDialect(PostgresSourceConfig sourceConfig) {
        this.sourceConfig = sourceConfig;
    }

    public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
        PostgresSourceConfig postgresSourceConfig = (PostgresSourceConfig)sourceConfig;
        PostgresConnectorConfig dbzConfig = postgresSourceConfig.getDbzConnectorConfig();
        PostgresConnection.PostgresValueConverterBuilder valueConverterBuilder = PostgresObjectUtils.newPostgresValueConverterBuilder(dbzConfig);
        PostgresConnection jdbc = new PostgresConnection(dbzConfig.getJdbcConfig(), valueConverterBuilder, CONNECTION_NAME, (JdbcConnection.ConnectionFactory)new JdbcConnectionFactory(sourceConfig, this.getPooledDataSourceFactory()));
        try {
            jdbc.connect();
        }
        catch (Exception e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
        return jdbc;
    }

    public PostgresConnection openJdbcConnection() {
        return (PostgresConnection)this.openJdbcConnection(this.sourceConfig);
    }

    public PostgresReplicationConnection openPostgresReplicationConnection(PostgresConnection jdbcConnection) {
        try {
            PostgresConnectorConfig pgConnectorConfig = this.sourceConfig.getDbzConnectorConfig();
            TopicSelector topicSelector = PostgresTopicSelector.create((PostgresConnectorConfig)pgConnectorConfig);
            PostgresConnection.PostgresValueConverterBuilder valueConverterBuilder = PostgresObjectUtils.newPostgresValueConverterBuilder(pgConnectorConfig);
            PostgresSchema schema = PostgresObjectUtils.newSchema(jdbcConnection, pgConnectorConfig, jdbcConnection.getTypeRegistry(), (TopicSelector<TableId>)topicSelector, valueConverterBuilder.build(jdbcConnection.getTypeRegistry()));
            PostgresTaskContext taskContext = PostgresObjectUtils.newTaskContext(pgConnectorConfig, schema, (TopicSelector<TableId>)topicSelector);
            return (PostgresReplicationConnection)PostgresObjectUtils.createReplicationConnection(taskContext, jdbcConnection, false, pgConnectorConfig);
        }
        catch (SQLException e) {
            throw new RuntimeException("Failed to initialize PostgresReplicationConnection", e);
        }
    }

    public String getName() {
        return "PostgreSQL";
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) {
        try (JdbcConnection jdbc = this.openJdbcConnection(sourceConfig);){
            PostgresOffset postgresOffset = Utils.currentOffset((PostgresConnection)jdbc);
            return postgresOffset;
        }
        catch (SQLException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
        return true;
    }

    public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
        return new PostgresChunkSplitter(sourceConfig, this);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
        try (JdbcConnection jdbc = this.openJdbcConnection(sourceConfig);){
            List<TableId> list = TableDiscoveryUtils.listTables((String)sourceConfig.getDatabaseList().get(0), jdbc, sourceConfig.getTableFilters());
            return list;
        }
        catch (SQLException e) {
            throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), (Throwable)e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Map<TableId, TableChanges.TableChange> discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig) {
        List<TableId> capturedTableIds = this.discoverDataCollections(sourceConfig);
        try (JdbcConnection jdbc = this.openJdbcConnection(sourceConfig);){
            HashMap<TableId, TableChanges.TableChange> tableSchemas = new HashMap<TableId, TableChanges.TableChange>();
            for (TableId tableId : capturedTableIds) {
                TableChanges.TableChange tableSchema = this.queryTableSchema(jdbc, tableId);
                tableSchemas.put(tableId, tableSchema);
            }
            HashMap<TableId, TableChanges.TableChange> hashMap = tableSchemas;
            return hashMap;
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Error to discover table schemas: " + e.getMessage(), (Throwable)e);
        }
    }

    public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
        return new PostgresConnectionPoolFactory();
    }

    public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
        if (this.schema == null) {
            this.schema = new CustomPostgresSchema((PostgresConnection)jdbc, this.sourceConfig);
        }
        return this.schema.getTableSchema(tableId);
    }

    public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) {
        if (sourceSplitBase.isSnapshotSplit()) {
            return new PostgresScanFetchTask(sourceSplitBase.asSnapshotSplit());
        }
        this.streamFetchTask = new PostgresStreamFetchTask(sourceSplitBase.asStreamSplit());
        return this.streamFetchTask;
    }

    public JdbcSourceFetchTaskContext createFetchTaskContext(JdbcSourceConfig taskSourceConfig) {
        return new PostgresSourceFetchTaskContext(taskSourceConfig, this);
    }

    public void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception {
        if (this.streamFetchTask != null) {
            this.streamFetchTask.commitCurrentOffset(offset);
        }
    }

    public boolean isIncludeDataCollection(JdbcSourceConfig sourceConfig, TableId tableId) {
        if (this.filters == null) {
            this.filters = sourceConfig.getTableFilters().dataCollectionFilter();
        }
        return this.filters.isIncluded(tableId);
    }

    public String getSlotName() {
        return this.sourceConfig.getDbzProperties().getProperty(PostgresConnectorConfig.SLOT_NAME.name());
    }

    public String getPluginName() {
        return this.sourceConfig.getDbzProperties().getProperty(PostgresConnectorConfig.PLUGIN_NAME.name());
    }
}

