/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql.connection;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.source.snapshot.incremental.ChunkQueryBuilder;
import io.debezium.pipeline.source.snapshot.incremental.RowValueConstructorChunkQueryBuilder;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.nio.charset.Charset;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.Driver;
import org.postgresql.core.BaseConnection;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.TimestampUtils;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.util.PGmoney;
import org.postgresql.util.PSQLState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresConnection
extends JdbcConnection {
    public static final String CONNECTION_STREAMING = "Debezium Streaming";
    public static final String CONNECTION_SLOT_INFO = "Debezium Slot Info";
    public static final String CONNECTION_DROP_SLOT = "Debezium Drop Slot";
    public static final String CONNECTION_VALIDATE_CONNECTION = "Debezium Validate Connection";
    public static final String CONNECTION_HEARTBEAT = "Debezium Heartbeat";
    public static final String CONNECTION_GENERAL = "Debezium General";
    private static final Pattern FUNCTION_DEFAULT_PATTERN = Pattern.compile("^[(]?[A-Za-z0-9_.]+\\((?:.+(?:, ?.+)*)?\\)");
    private static final Pattern EXPRESSION_DEFAULT_PATTERN = Pattern.compile("\\(+(?:.+(?:[+ - * / < > = ~ ! @ # % ^ & | ` ?] ?.+)+)+\\)");
    private static Logger LOGGER = LoggerFactory.getLogger(PostgresConnection.class);
    private static final String URL_PATTERN = "jdbc:postgresql://${" + String.valueOf(JdbcConfiguration.HOSTNAME) + "}:${" + String.valueOf(JdbcConfiguration.PORT) + "}/${" + String.valueOf(JdbcConfiguration.DATABASE) + "}";
    protected static final JdbcConnection.ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory((String)URL_PATTERN, (String)Driver.class.getName(), (ClassLoader)PostgresConnection.class.getClassLoader(), (Field[])new Field[]{JdbcConfiguration.PORT.withDefault(PostgresConnectorConfig.PORT.defaultValueAsString())});
    private static final int MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT = 900;
    private static final Duration PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS = Duration.ofSeconds(2L);
    private final TypeRegistry typeRegistry;
    private final PostgresDefaultValueConverter defaultValueConverter;

    public PostgresConnection(JdbcConfiguration config, PostgresValueConverterBuilder valueConverterBuilder, String connectionUsage) {
        super(PostgresConnection.addDefaultSettings(config, connectionUsage), FACTORY, PostgresConnection::validateServerVersion, "\"", "\"");
        if (Objects.isNull(valueConverterBuilder)) {
            this.typeRegistry = null;
            this.defaultValueConverter = null;
        } else {
            this.typeRegistry = new TypeRegistry(this);
            PostgresValueConverter valueConverter = valueConverterBuilder.build(this.typeRegistry);
            this.defaultValueConverter = new PostgresDefaultValueConverter(valueConverter, this.getTimestampUtils(), this.typeRegistry);
        }
    }

    public PostgresConnection(PostgresConnectorConfig config, TypeRegistry typeRegistry, String connectionUsage) {
        super(PostgresConnection.addDefaultSettings(config.getJdbcConfig(), connectionUsage), FACTORY, PostgresConnection::validateServerVersion, "\"", "\"");
        if (Objects.isNull(typeRegistry)) {
            this.typeRegistry = null;
            this.defaultValueConverter = null;
        } else {
            this.typeRegistry = typeRegistry;
            PostgresValueConverter valueConverter = PostgresValueConverter.of(config, this.getDatabaseCharset(), typeRegistry);
            this.defaultValueConverter = new PostgresDefaultValueConverter(valueConverter, this.getTimestampUtils(), typeRegistry);
        }
    }

    public PostgresConnection(JdbcConfiguration config, String connectionUsage) {
        this(config, null, connectionUsage);
    }

    static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration, String connectionUsage) {
        return JdbcConfiguration.adapt((Configuration)configuration.edit().with("assumeMinServerVersion", "9.4").with("ApplicationName", connectionUsage).build());
    }

    public String connectionString() {
        return this.connectionString(URL_PATTERN);
    }

    public ReplicaIdentityInfo readReplicaIdentityInfo(TableId tableId) throws SQLException {
        String statement = "SELECT relreplident FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON c.relnamespace=n.oid WHERE n.nspname=? and c.relname=?";
        String schema = tableId.schema() != null && tableId.schema().length() > 0 ? tableId.schema() : "public";
        StringBuilder replIdentity = new StringBuilder();
        this.prepareQuery(statement, stmt -> {
            stmt.setString(1, schema);
            stmt.setString(2, tableId.table());
        }, rs -> {
            if (rs.next()) {
                replIdentity.append(rs.getString(1));
            } else {
                LOGGER.warn("Cannot determine REPLICA IDENTITY information for table '{}'", (Object)tableId);
            }
        });
        return new ReplicaIdentityInfo(ReplicaIdentityInfo.ReplicaIdentity.parseFromDB(replIdentity.toString()));
    }

    public String readIndexOfReplicaIdentity(TableId tableId) throws SQLException {
        String statement = "with rel_index as (select split_part(indexrelid::regclass::text, '.', 1) as index_schema, split_part(indexrelid::regclass::text, '.', 2) as index_name from pg_catalog.pg_index where indisreplident ) SELECT i.index_name FROM pg_catalog.pg_class c     LEFT JOIN pg_catalog.pg_namespace n ON c.relnamespace=n.oid     LEFT join rel_index i on n.nspname = i.index_schema WHERE n.nspname=? and c.relname=?";
        String schema = tableId.schema() != null && tableId.schema().length() > 0 ? tableId.schema() : "public";
        StringBuilder indexName = new StringBuilder();
        this.prepareQuery(statement, stmt -> {
            stmt.setString(1, schema);
            stmt.setString(2, tableId.table());
        }, rs -> {
            if (rs.next()) {
                indexName.append(rs.getString(1));
            } else {
                LOGGER.warn("Cannot determine index linked to REPLICA IDENTITY for table '{}'", (Object)tableId);
            }
        });
        return indexName.toString();
    }

    public void setReplicaIdentityForTable(TableId tableId, ReplicaIdentityInfo replicaIdentityValue) {
        try {
            LOGGER.debug("Updating Replica Identity '{}'", (Object)tableId.table());
            this.execute(new String[]{String.format("ALTER TABLE %s REPLICA IDENTITY %s;", tableId, replicaIdentityValue)});
        }
        catch (SQLException e) {
            if (e.getSQLState().equals("42501")) {
                LOGGER.error("Replica identity could not be updated because of lack of privileges", (Throwable)e);
            }
            LOGGER.error("Unexpected error while attempting to alter Replica Identity", (Throwable)e);
        }
    }

    public SlotState getReplicationSlotState(String slotName, String pluginName) throws SQLException {
        try {
            ServerInfo.ReplicationSlot slot = this.readReplicationSlotInfo(slotName, pluginName);
            if (slot.equals(ServerInfo.ReplicationSlot.INVALID)) {
                return null;
            }
            return slot.asSlotState();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ConnectException("Interrupted while waiting for valid replication slot info", (Throwable)e);
        }
    }

    private ServerInfo.ReplicationSlot fetchReplicationSlotInfo(String slotName, String pluginName) throws SQLException {
        String database = this.database();
        ServerInfo.ReplicationSlot slot = this.queryForSlot(slotName, database, pluginName, (JdbcConnection.ResultSetMapper<ServerInfo.ReplicationSlot>)((JdbcConnection.ResultSetMapper)rs -> {
            if (rs.next()) {
                boolean active = rs.getBoolean("active");
                Lsn confirmedFlushedLsn = this.parseConfirmedFlushLsn(slotName, pluginName, database, rs);
                if (confirmedFlushedLsn == null) {
                    return null;
                }
                Lsn restartLsn = this.parseRestartLsn(slotName, pluginName, database, rs);
                if (restartLsn == null) {
                    return null;
                }
                Long xmin = rs.getLong("catalog_xmin");
                return new ServerInfo.ReplicationSlot(active, confirmedFlushedLsn, restartLsn, xmin);
            }
            LOGGER.debug("No replication slot '{}' is present for plugin '{}' and database '{}'", new Object[]{slotName, pluginName, database});
            return ServerInfo.ReplicationSlot.INVALID;
        }));
        return slot;
    }

    ServerInfo.ReplicationSlot readReplicationSlotInfo(String slotName, String pluginName) throws SQLException, InterruptedException {
        String database = this.database();
        Metronome metronome = Metronome.parker((Duration)PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS, (Clock)Clock.SYSTEM);
        for (int attempt = 1; attempt <= 900; ++attempt) {
            ServerInfo.ReplicationSlot slot = this.fetchReplicationSlotInfo(slotName, pluginName);
            if (slot != null) {
                LOGGER.info("Obtained valid replication slot {}", (Object)slot);
                return slot;
            }
            LOGGER.warn("Cannot obtain valid replication slot '{}' for plugin '{}' and database '{}' [during attempt {} out of {}, concurrent tx probably blocks taking snapshot.", new Object[]{slotName, pluginName, database, attempt, 900});
            metronome.pause();
        }
        throw new ConnectException("Unable to obtain valid replication slot. Make sure there are no long-running transactions running in parallel as they may hinder the allocation of the replication slot when starting this connector");
    }

    protected ServerInfo.ReplicationSlot queryForSlot(String slotName, String database, String pluginName, JdbcConnection.ResultSetMapper<ServerInfo.ReplicationSlot> map) throws SQLException {
        return (ServerInfo.ReplicationSlot)this.prepareQueryAndMap("select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> {
            statement.setString(1, slotName);
            statement.setString(2, database);
            statement.setString(3, pluginName);
        }, map);
    }

    private Lsn parseConfirmedFlushLsn(String slotName, String pluginName, String database, ResultSet rs) {
        Lsn confirmedFlushedLsn = null;
        try {
            confirmedFlushedLsn = this.tryParseLsn(slotName, pluginName, database, rs, "confirmed_flush_lsn");
            if (confirmedFlushedLsn == null) {
                LOGGER.info("Failed to obtain valid replication slot, confirmed flush lsn is null");
                if (!this.hasIdleTransactions()) {
                    confirmedFlushedLsn = this.tryFallbackToRestartLsn(slotName, pluginName, database, rs);
                }
            }
        }
        catch (SQLException e) {
            confirmedFlushedLsn = this.tryFallbackToRestartLsn(slotName, pluginName, database, rs);
        }
        return confirmedFlushedLsn;
    }

    private boolean hasIdleTransactions() throws SQLException {
        return (Boolean)this.queryAndMap("select * from pg_stat_activity where state like 'idle in transaction' AND application_name != 'Debezium General' AND pid <> pg_backend_pid()", rs -> {
            if (rs.next()) {
                LOGGER.debug("Found at least one idle transaction with pid " + rs.getInt("pid") + " for application" + rs.getString("application_name"));
                return true;
            }
            return false;
        });
    }

    private Lsn tryFallbackToRestartLsn(String slotName, String pluginName, String database, ResultSet rs) {
        Lsn confirmedFlushedLsn;
        LOGGER.info("Unable to find confirmed_flushed_lsn, falling back to restart_lsn");
        try {
            confirmedFlushedLsn = this.tryParseLsn(slotName, pluginName, database, rs, "restart_lsn");
        }
        catch (SQLException e) {
            throw new DebeziumException("Neither confirmed_flush_lsn nor restart_lsn could be found", (Throwable)e);
        }
        return confirmedFlushedLsn;
    }

    private Lsn parseRestartLsn(String slotName, String pluginName, String database, ResultSet rs) {
        Lsn restartLsn = null;
        try {
            restartLsn = this.tryParseLsn(slotName, pluginName, database, rs, "restart_lsn");
        }
        catch (SQLException e) {
            throw new DebeziumException("restart_lsn could be found");
        }
        return restartLsn;
    }

    private Lsn tryParseLsn(String slotName, String pluginName, String database, ResultSet rs, String column) throws ConnectException, SQLException {
        Lsn lsn = null;
        String lsnStr = rs.getString(column);
        if (lsnStr == null) {
            return null;
        }
        try {
            lsn = Lsn.valueOf(lsnStr);
        }
        catch (Exception e) {
            throw new DebeziumException("Value " + column + " in the pg_replication_slots table for slot = '" + slotName + "', plugin = '" + pluginName + "', database = '" + database + "' is not valid. This is an abnormal situation and the database status should be checked.");
        }
        if (!lsn.isValid()) {
            throw new DebeziumException("Invalid LSN returned from database");
        }
        return lsn;
    }

    public boolean dropReplicationSlot(String slotName) {
        int ATTEMPTS = 3;
        for (int i = 0; i < 3; ++i) {
            try {
                this.execute(new String[]{"select pg_drop_replication_slot('" + slotName + "')"});
                return true;
            }
            catch (SQLException e) {
                if (PSQLState.OBJECT_IN_USE.getState().equals(e.getSQLState())) {
                    if (i >= 2) {
                        LOGGER.warn("Cannot drop replication slot '{}' because it's still in use", (Object)slotName);
                        return false;
                    }
                } else {
                    if (PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) {
                        LOGGER.debug("Replication slot {} has already been dropped", (Object)slotName);
                        return false;
                    }
                    LOGGER.error("Unexpected error while attempting to drop replication slot", (Throwable)e);
                    return false;
                }
                LOGGER.debug("Cannot drop replication slot '{}' because it's still in use", (Object)slotName);
                try {
                    Metronome.parker((Duration)Duration.ofSeconds(1L), (Clock)Clock.system()).pause();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                continue;
            }
        }
        return false;
    }

    public boolean dropPublication(String publicationName) {
        try {
            LOGGER.debug("Dropping publication '{}'", (Object)publicationName);
            this.execute(new String[]{"DROP PUBLICATION " + publicationName});
            return true;
        }
        catch (SQLException e) {
            if (PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) {
                LOGGER.debug("Publication {} has already been dropped", (Object)publicationName);
            } else {
                LOGGER.error("Unexpected error while attempting to drop publication", (Throwable)e);
            }
            return false;
        }
    }

    public synchronized void close() {
        try {
            super.close();
        }
        catch (SQLException e) {
            LOGGER.error("Unexpected error while closing Postgres connection", (Throwable)e);
        }
    }

    public Long currentTransactionId() throws SQLException {
        AtomicLong txId = new AtomicLong(0L);
        int majorVersion = this.connection().getMetaData().getDatabaseMajorVersion();
        String txIdQuery = majorVersion >= 13 ? "select (case pg_is_in_recovery() when 't' then '0'::xid8 else pg_current_xact_id() end) AS pg_current_txid" : "select (case pg_is_in_recovery() when 't' then 0 else txid_current() end) AS pg_current_txid";
        this.query(txIdQuery, rs -> {
            if (rs.next()) {
                txId.compareAndSet(0L, rs.getLong(1));
            }
        });
        long value = txId.get();
        return value > 0L ? Long.valueOf(value) : null;
    }

    public long currentXLogLocation() throws SQLException {
        AtomicLong result = new AtomicLong(0L);
        int majorVersion = this.connection().getMetaData().getDatabaseMajorVersion();
        this.query(majorVersion >= 10 ? "select (case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) AS pg_current_wal_lsn" : "select * from pg_current_xlog_location()", rs -> {
            if (!rs.next()) {
                throw new IllegalStateException("there should always be a valid xlog position");
            }
            result.compareAndSet(0L, LogSequenceNumber.valueOf((String)rs.getString(1)).asLong());
        });
        return result.get();
    }

    public ServerInfo serverInfo() throws SQLException {
        ServerInfo serverInfo = new ServerInfo();
        this.query("SELECT version(), current_user, current_database()", rs -> {
            if (rs.next()) {
                serverInfo.withServer(rs.getString(1)).withUsername(rs.getString(2)).withDatabase(rs.getString(3));
            }
        });
        String username = serverInfo.username();
        if (username != null) {
            this.query("SELECT oid, rolname, rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin, rolreplication FROM pg_roles WHERE pg_has_role('" + username + "', oid, 'member')", rs -> {
                while (rs.next()) {
                    String roleInfo = "superuser: " + rs.getBoolean(3) + ", replication: " + rs.getBoolean(8) + ", inherit: " + rs.getBoolean(4) + ", create role: " + rs.getBoolean(5) + ", create db: " + rs.getBoolean(6) + ", can log in: " + rs.getBoolean(7);
                    String roleName = rs.getString(2);
                    serverInfo.addRole(roleName, roleInfo);
                }
            });
        }
        serverInfo.withMajorVersion(this.connection().getMetaData().getDatabaseMajorVersion());
        return serverInfo;
    }

    public Charset getDatabaseCharset() {
        try {
            return Charset.forName(((BaseConnection)this.connection()).getEncoding().name());
        }
        catch (SQLException e) {
            throw new DebeziumException("Couldn't obtain encoding for database " + this.database(), (Throwable)e);
        }
    }

    public TimestampUtils getTimestampUtils() {
        try {
            return ((PgConnection)this.connection()).getTimestampUtils();
        }
        catch (SQLException e) {
            throw new DebeziumException("Couldn't get timestamp utils from underlying connection", (Throwable)e);
        }
    }

    private static void validateServerVersion(Statement statement) throws SQLException {
        DatabaseMetaData metaData = statement.getConnection().getMetaData();
        int majorVersion = metaData.getDatabaseMajorVersion();
        int minorVersion = metaData.getDatabaseMinorVersion();
        if (majorVersion < 9 || majorVersion == 9 && minorVersion < 4) {
            throw new SQLException("Cannot connect to a version of Postgres lower than 9.4");
        }
    }

    protected int resolveNativeType(String typeName) {
        return this.getTypeRegistry().get(typeName).getRootType().getOid();
    }

    protected int resolveJdbcType(int metadataJdbcType, int nativeType) {
        return this.getTypeRegistry().get(nativeType).getRootType().getJdbcId();
    }

    protected Optional<ColumnEditor> readTableColumn(ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter) throws SQLException {
        return this.doReadTableColumn(columnMetadata, tableId, columnFilter);
    }

    public Optional<Column> readColumnForDecoder(ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnNameFilter) throws SQLException {
        return this.doReadTableColumn(columnMetadata, tableId, columnNameFilter).map(ColumnEditor::create);
    }

    private Optional<ColumnEditor> doReadTableColumn(ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter) throws SQLException {
        String columnName = columnMetadata.getString(4);
        if (columnFilter == null || columnFilter.matches(tableId.catalog(), tableId.schema(), tableId.table(), columnName)) {
            String defaultValueExpression;
            ColumnEditor column = Column.editor().name(columnName);
            column.type(columnMetadata.getString(6));
            column.length(columnMetadata.getInt(7));
            if (columnMetadata.getObject(9) != null) {
                column.scale(Integer.valueOf(columnMetadata.getInt(9)));
            }
            column.optional(PostgresConnection.isNullable((int)columnMetadata.getInt(11)));
            column.position(columnMetadata.getInt(17));
            column.autoIncremented("YES".equalsIgnoreCase(columnMetadata.getString(23)));
            String autogenerated = null;
            try {
                autogenerated = columnMetadata.getString(24);
            }
            catch (SQLException sQLException) {
                // empty catch block
            }
            column.generated("YES".equalsIgnoreCase(autogenerated));
            PostgresType nativeType = this.getTypeRegistry().get(column.typeName());
            column.nativeType(nativeType.getRootType().getOid());
            column.jdbcType(nativeType.getRootType().getJdbcId());
            if (2001 == nativeType.getJdbcId()) {
                column.length(nativeType.getDefaultLength());
                column.scale(Integer.valueOf(nativeType.getDefaultScale()));
            }
            if ((defaultValueExpression = columnMetadata.getString(13)) != null && this.getDefaultValueConverter().supportConversion(column.typeName())) {
                column.defaultValueExpression(defaultValueExpression);
            }
            return Optional.of(column);
        }
        return Optional.empty();
    }

    public PostgresDefaultValueConverter getDefaultValueConverter() {
        Objects.requireNonNull(this.defaultValueConverter, "Connection does not provide default value converter");
        return this.defaultValueConverter;
    }

    public TypeRegistry getTypeRegistry() {
        Objects.requireNonNull(this.typeRegistry, "Connection does not provide type registry");
        return this.typeRegistry;
    }

    public Object getColumnValue(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException {
        try {
            ResultSetMetaData metaData = rs.getMetaData();
            String columnTypeName = metaData.getColumnTypeName(columnIndex);
            PostgresType type = this.getTypeRegistry().get(columnTypeName);
            LOGGER.trace("Type of incoming data is: {}", (Object)type.getOid());
            LOGGER.trace("ColumnTypeName is: {}", (Object)columnTypeName);
            LOGGER.trace("Type is: {}", (Object)type);
            if (type.isArrayType()) {
                return rs.getArray(columnIndex);
            }
            switch (type.getOid()) {
                case 790: {
                    String sMoney = rs.getString(columnIndex);
                    if (sMoney == null) {
                        return sMoney;
                    }
                    if (sMoney.startsWith("-")) {
                        String negativeMoney = "(" + sMoney.substring(1) + ")";
                        return new PGmoney((String)negativeMoney).val;
                    }
                    return new PGmoney((String)sMoney).val;
                }
                case 1560: {
                    return rs.getString(columnIndex);
                }
                case 1700: {
                    String s = rs.getString(columnIndex);
                    if (s == null) {
                        return s;
                    }
                    Optional<SpecialValueDecimal> value = PostgresValueConverter.toSpecialValue(s);
                    return value.isPresent() ? value.get() : new SpecialValueDecimal(rs.getBigDecimal(columnIndex));
                }
                case 1083: 
                case 1266: {
                    return rs.getString(columnIndex);
                }
            }
            Object x = rs.getObject(columnIndex);
            if (x != null) {
                LOGGER.trace("rs getobject returns class: {}; rs getObject value is: {}", x.getClass(), x);
            }
            return x;
        }
        catch (SQLException e) {
            return super.getColumnValue(rs, columnIndex, column, table);
        }
    }

    protected String[] supportedTableTypes() {
        return new String[]{"VIEW", "MATERIALIZED VIEW", "TABLE", "PARTITIONED TABLE"};
    }

    protected boolean isTableType(String tableType) {
        return "TABLE".equals(tableType) || "PARTITIONED TABLE".equals(tableType);
    }

    protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) {
        if (columnName != null) {
            return !FUNCTION_DEFAULT_PATTERN.matcher(columnName).matches() && !EXPRESSION_DEFAULT_PATTERN.matcher(columnName).matches();
        }
        return false;
    }

    public Set<TableId> getAllTableIds(String catalogName) throws SQLException {
        return this.readTableNames(catalogName, null, null, new String[]{"TABLE", "PARTITIONED TABLE"});
    }

    public <T extends DataCollectionId> ChunkQueryBuilder<T> chunkQueryBuilder(RelationalDatabaseConnectorConfig connectorConfig) {
        return new RowValueConstructorChunkQueryBuilder(connectorConfig, (JdbcConnection)this);
    }

    public Optional<Boolean> nullsSortLast() {
        return Optional.of(true);
    }

    public Map<String, Object> reselectColumns(Table table, List<String> columns, List<String> keyColumns, List<Object> keyValues, Struct source) throws SQLException {
        String query = String.format("SELECT %s FROM %s WHERE %s", columns.stream().map(arg_0 -> ((PostgresConnection)this).quoteIdentifier(arg_0)).collect(Collectors.joining(",")), this.quotedTableIdString(table.id()), keyColumns.stream().map(key -> {
            Column column = table.columnWithName(key);
            String castableType = this.typeRegistry.get(column.nativeType()).getName();
            return key + "=?::" + castableType;
        }).collect(Collectors.joining(" AND ")));
        return this.reselectColumns(query, table.id(), columns, keyValues);
    }

    public void setQueryColumnValue(PreparedStatement statement, Column column, int pos, Object value) throws SQLException {
        PostgresType resolvedType = this.typeRegistry.get(column.nativeType());
        if (resolvedType != null && resolvedType.isEnumType()) {
            statement.setObject(pos, value, 1111);
        } else {
            super.setQueryColumnValue(statement, column, pos, value);
        }
    }

    public TableId createTableId(String databaseName, String schemaName, String tableName) {
        return new TableId(null, schemaName, tableName);
    }

    public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) {
        Lsn storedLsn = ((PostgresOffsetContext)offset).lastCommitLsn();
        String slotName = ((PostgresConnectorConfig)config).slotName();
        String postgresPluginName = ((PostgresConnectorConfig)config).plugin().getPostgresPluginName();
        try {
            SlotState slotState = this.getReplicationSlotState(slotName, postgresPluginName);
            if (slotState == null) {
                return false;
            }
            LOGGER.info("Slot '{}' has restart LSN '{}'", (Object)slotName, (Object)slotState.slotRestartLsn());
            return storedLsn == null || slotState.slotRestartLsn().compareTo(storedLsn) <= 0;
        }
        catch (SQLException e) {
            throw new DebeziumException("Unable to get last available log position", (Throwable)e);
        }
    }

    public List<Column> getTableColumnsForDecoder(TableId tableId, Tables.ColumnNameFilter columnFilter) throws SQLException {
        try {
            ArrayList<Column> readColumns = new ArrayList<Column>();
            DatabaseMetaData databaseMetaData = this.connection().getMetaData();
            String schemaNamePattern = this.createPatternFromName(tableId.schema(), databaseMetaData.getSearchStringEscape());
            String tableNamePattern = this.createPatternFromName(tableId.table(), databaseMetaData.getSearchStringEscape());
            try (ResultSet columnMetadata = databaseMetaData.getColumns(null, schemaNamePattern, tableNamePattern, null);){
                while (columnMetadata.next()) {
                    this.readColumnForDecoder(columnMetadata, tableId, columnFilter).ifPresent(readColumns::add);
                }
            }
            return readColumns;
        }
        catch (SQLException e) {
            LOGGER.error("Failed to read column metadata for '{}.{}'", (Object)tableId.schema(), (Object)tableId.table());
            throw e;
        }
    }

    @FunctionalInterface
    public static interface PostgresValueConverterBuilder {
        public PostgresValueConverter build(TypeRegistry var1);
    }
}

