/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql.connection;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.ReplicaIdentityMapper;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.MessageDecoderContext;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.connector.postgresql.connection.WalPositionLocator;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.nio.ByteBuffer;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.ServerVersion;
import org.postgresql.core.Version;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresReplicationConnection
extends JdbcConnection
implements ReplicationConnection {
    private static final String SQL_STATE_INSUFFICIENT_PRIVILEGE = "42501";
    private static final String SQL_LOCK_NOT_AVAILABLE = "55P03";
    private static final String PUBLICATION_QUERY_FAILURE_MESSAGE = "Creation of publication failed: query to create/update publication timed out, please make sure that there are no maintenance activities going on the database end.";
    private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
    private final String slotName;
    private final String publicationName;
    private final RelationalTableFilters tableFilter;
    private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode;
    private final PostgresConnectorConfig.LogicalDecoder plugin;
    private final boolean dropSlotOnClose;
    private final boolean createFailOverSlot;
    private final PostgresConnectorConfig connectorConfig;
    private final Duration statusUpdateInterval;
    private final MessageDecoder messageDecoder;
    private final PostgresConnection jdbcConnection;
    private final TypeRegistry typeRegistry;
    private final Properties streamParams;
    private Lsn defaultStartingPos;
    private SlotCreationResult slotCreationInfo;
    private boolean hasInitedSlot;
    private final Optional<ReplicaIdentityMapper> replicaIdentityMapper;

    private PostgresReplicationConnection(PostgresConnectorConfig config, String slotName, String publicationName, RelationalTableFilters tableFilter, PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode, PostgresConnectorConfig.LogicalDecoder plugin, boolean dropSlotOnClose, boolean createFailOverSlot, Duration statusUpdateInterval, PostgresConnection jdbcConnection, TypeRegistry typeRegistry, Properties streamParams, PostgresSchema schema) {
        super(PostgresReplicationConnection.addDefaultSettings(config.getJdbcConfig()), PostgresConnection.FACTORY, "\"", "\"");
        this.connectorConfig = config;
        this.slotName = slotName;
        this.publicationName = publicationName;
        this.tableFilter = tableFilter;
        this.publicationAutocreateMode = publicationAutocreateMode;
        this.plugin = plugin;
        this.dropSlotOnClose = dropSlotOnClose;
        this.createFailOverSlot = createFailOverSlot;
        this.statusUpdateInterval = statusUpdateInterval;
        this.messageDecoder = plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection);
        this.jdbcConnection = jdbcConnection;
        this.typeRegistry = typeRegistry;
        this.streamParams = streamParams;
        this.slotCreationInfo = null;
        this.hasInitedSlot = false;
        this.replicaIdentityMapper = config.replicaIdentityMapper();
    }

    private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) {
        return JdbcConfiguration.adapt((Configuration)PostgresConnection.addDefaultSettings(configuration, "Debezium Streaming").edit().with("replication", "database").with("preferQueryMode", "simple").build());
    }

    private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException {
        try (PostgresConnection connection = new PostgresConnection(this.connectorConfig.getJdbcConfig(), "Debezium Slot Info");){
            ServerInfo.ReplicationSlot replicationSlot = connection.readReplicationSlotInfo(this.slotName, this.plugin.getPostgresPluginName());
            return replicationSlot;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Optional<String> captureCurrentStatementTimeout(Statement stmt) {
        try (ResultSet timeoutRs = stmt.executeQuery("SHOW statement_timeout;");){
            if (!timeoutRs.next()) return Optional.empty();
            String timeout = timeoutRs.getString(1);
            LOGGER.debug("Captured original statement_timeout: {}", (Object)timeout);
            Optional<String> optional = Optional.of(timeout);
            return optional;
        }
        catch (SQLException ex) {
            LOGGER.warn("Failed to capture current statement_timeout", (Throwable)ex);
        }
        return Optional.empty();
    }

    private void resetStatementTimeout(Statement stmt, Optional<String> originalTimeout) {
        if (originalTimeout.isPresent()) {
            try {
                stmt.execute("SET statement_timeout = '" + originalTimeout.get() + "';");
                LOGGER.debug("Reset statement_timeout to: {}", (Object)originalTimeout.get());
            }
            catch (SQLException ex) {
                LOGGER.warn("Failed to reset statement_timeout to original value: {}", (Object)originalTimeout.get(), (Object)ex);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeWithTimeout(Statement stmt, String statementToExecute) throws SQLException {
        Optional<String> originalTimeout = this.captureCurrentStatementTimeout(stmt);
        try {
            StringBuilder statements = new StringBuilder();
            statements.append("SET statement_timeout = ").append(TimeUnit.SECONDS.toMillis(this.connectorConfig.createSlotCommandTimeout())).append("; ");
            statements.append(statementToExecute);
            stmt.execute(statements.toString());
        }
        finally {
            this.resetStatementTimeout(stmt, originalTimeout);
        }
    }

    /*
     * Unable to fully structure code
     */
    protected void initPublication() {
        if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals((Object)this.plugin)) {
            PostgresReplicationConnection.LOGGER.info("Initializing PgOutput logical decoder publication");
            try {
                conn = this.pgConnection();
                conn.setAutoCommit(false);
                selectPublication = String.format("SELECT puballtables FROM pg_publication WHERE pubname = '%s'", new Object[]{this.publicationName});
                try {
                    stmt = conn.createStatement();
                    try {
                        block39: {
                            isOnlyRead = this.isReadOnlyDb();
                            rs = stmt.executeQuery(selectPublication);
                            try {
                                publicationExists = rs.next();
                                if (!publicationExists) {
                                    PostgresReplicationConnection.LOGGER.info("Creating new publication '{}' for plugin '{}'", (Object)this.publicationName, (Object)this.plugin);
                                    switch (2.$SwitchMap$io$debezium$connector$postgresql$PostgresConnectorConfig$AutoCreateMode[this.publicationAutocreateMode.ordinal()]) {
                                        case 1: {
                                            throw new ConnectException("Publication autocreation is disabled, please create one and restart the connector.");
                                        }
                                        case 2: {
                                            createPublicationStmt = this.connectorConfig.isPublishViaPartitionRoot() != false ? String.format("CREATE PUBLICATION %s FOR ALL TABLES WITH (publish_via_partition_root = true);", new Object[]{this.publicationName}) : String.format("CREATE PUBLICATION %s FOR ALL TABLES;", new Object[]{this.publicationName});
                                            PostgresReplicationConnection.LOGGER.info("Creating Publication with statement '{}'", (Object)createPublicationStmt);
                                            if (!isOnlyRead) {
                                                try {
                                                    this.executeWithTimeout(stmt, createPublicationStmt);
                                                    break;
                                                }
                                                catch (SQLException ex) {
                                                    if (PSQLState.QUERY_CANCELED.getState().equals(ex.getSQLState()) || "55P03".equals(ex.getSQLState())) {
                                                        throw new DebeziumException("Creation of publication failed: query to create/update publication timed out, please make sure that there are no maintenance activities going on the database end.", (Throwable)ex);
                                                    }
                                                    throw ex;
                                                }
                                            }
                                            PostgresReplicationConnection.LOGGER.info("The Postgres server in stand by mode, skip create statement execution");
                                            break;
                                        }
                                        case 3: {
                                            if (isOnlyRead) {
                                                this.validatePublications(stmt);
                                                break;
                                            }
                                            this.createOrUpdatePublicationModeFiltered(stmt, false);
                                            break;
                                        }
                                        case 4: {
                                            createPublicationWithNoTablesStmt = this.connectorConfig.isPublishViaPartitionRoot() != false ? String.format("CREATE PUBLICATION %s WITH (publish_via_partition_root = true);", new Object[]{this.publicationName}) : String.format("CREATE PUBLICATION %s;", new Object[]{this.publicationName});
                                            PostgresReplicationConnection.LOGGER.info("Creating publication with statement '{}'", (Object)createPublicationWithNoTablesStmt);
                                            try {
                                                this.executeWithTimeout(stmt, createPublicationWithNoTablesStmt);
                                                break;
                                            }
                                            catch (SQLException ex) {
                                                if (PSQLState.QUERY_CANCELED.getState().equals(ex.getSQLState()) || "55P03".equals(ex.getSQLState())) {
                                                    throw new DebeziumException("Creation of publication failed: query to create/update publication timed out, please make sure that there are no maintenance activities going on the database end.", (Throwable)ex);
                                                }
                                                throw ex;
                                            }
                                        }
                                    }
                                    break block39;
                                }
                                switch (2.$SwitchMap$io$debezium$connector$postgresql$PostgresConnectorConfig$AutoCreateMode[this.publicationAutocreateMode.ordinal()]) {
                                    case 3: {
                                        allTables = rs.getBoolean(1);
                                        if (allTables) {
                                            throw new DebeziumException(String.format("A logical publication for all tables named '%s' for plugin '%s' and database '%s' is already active on the server and can not be altered. If you need to exclude some tables or include only specific subset, please recreate the publication with necessary configuration or let plugin recreate it by dropping existing publication. Otherwise please change the 'publication.autocreate.mode' property to 'all_tables'.", new Object[]{this.publicationName, this.plugin, this.database()}));
                                        }
                                        if (isOnlyRead) {
                                            this.validatePublications(stmt);
                                            ** break;
lbl57:
                                            // 1 sources

                                        } else if (this.isPublicationUpdateRequired(stmt)) {
                                            this.createOrUpdatePublicationModeFiltered(stmt, true);
                                            ** break;
                                        }
lbl61:
                                        // 3 sources

                                        break;
                                    }
                                    default: {
                                        PostgresReplicationConnection.LOGGER.trace("A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server and will be used by the plugin", new Object[]{this.publicationName, this.plugin, this.database()});
                                        break;
                                    }
                                }
                            }
                            finally {
                                if (rs != null) {
                                    rs.close();
                                }
                            }
                        }
                        conn.commit();
                        conn.setAutoCommit(true);
                    }
                    finally {
                        if (stmt != null) {
                            stmt.close();
                        }
                    }
                }
                catch (SQLException e) {
                    throw new JdbcConnectionException(e);
                }
            }
            catch (SQLException e) {
                throw new JdbcConnectionException(e);
            }
        }
    }

    private boolean isReadOnlyDb() throws SQLException {
        AtomicBoolean isReadOnly = new AtomicBoolean();
        this.query("select pg_is_in_recovery()", rs -> {
            if (rs.next()) {
                isReadOnly.set(rs.getBoolean(1));
            }
        });
        return isReadOnly.get();
    }

    private Optional<Set<TableId>> getCurrentPublicationTables(Statement stmt) {
        String getPublicationTablesQuery = String.format("SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = '%s'", this.publicationName);
        HashSet<TableId> publicationTables = new HashSet<TableId>();
        try (PreparedStatement prepStmt = stmt.getConnection().prepareStatement(getPublicationTablesQuery);
             ResultSet rs = prepStmt.executeQuery();){
            while (rs.next()) {
                String schemaName = rs.getString("schemaname");
                String tableName = rs.getString("tablename");
                TableId tableId = this.jdbcConnection.createTableId(this.connectorConfig.databaseName(), schemaName, tableName);
                publicationTables.add(tableId);
            }
        }
        catch (SQLException e) {
            LOGGER.warn("Unable to query pg_publication_tables for publication '{}'. This may be due to insufficient privileges. Publication will be updated to ensure synchronization. Error: {}", (Object)this.publicationName, (Object)e.getMessage());
            return Optional.empty();
        }
        return Optional.of(publicationTables);
    }

    public boolean isPublicationUpdateRequired(Statement stmt) throws SQLException {
        Set<TableId> desiredTables;
        Optional<Set<TableId>> currentPublicationTables = this.getCurrentPublicationTables(stmt);
        if (currentPublicationTables.isEmpty()) {
            LOGGER.info("Unable to determine current publication tables for '{}', will update publication to ensure synchronization", (Object)this.publicationName);
            return true;
        }
        try {
            desiredTables = this.determineCapturedTables();
        }
        catch (Exception e) {
            throw new SQLException("Failed to determine captured tables", e);
        }
        if (desiredTables.isEmpty()) {
            LOGGER.warn("No table filters found for filtered publication {}", (Object)this.publicationName);
            return false;
        }
        Set<TableId> currentTables = currentPublicationTables.get();
        if (currentTables.equals(desiredTables)) {
            LOGGER.info("Publication '{}' is already up to date with desired tables", (Object)this.publicationName);
            return false;
        }
        HashSet<TableId> toAdd = new HashSet<TableId>(desiredTables);
        toAdd.removeAll(currentTables);
        HashSet<TableId> toRemove = new HashSet<TableId>(currentTables);
        toRemove.removeAll(desiredTables);
        LOGGER.info("Publication '{}' has to be updated. Tables to add: {}, Tables to remove: {}", new Object[]{this.publicationName, toAdd, toRemove});
        return true;
    }

    private void validatePublications(Statement stmt) throws SQLException {
        String validatePublication = "SELECT schemaname, tablename FROM pg_catalog.pg_publication_tables WHERE pubname=? ";
        Set<TableId> tablesToCapture = null;
        try {
            tablesToCapture = this.determineCapturedTables();
            if (tablesToCapture.isEmpty()) {
                throw new SQLException(String.format("No table filters found for filtered publication %s", this.publicationName));
            }
        }
        catch (Exception e) {
            throw new SQLException("Failed to determine captured tables", e);
        }
        Set tableNames = tablesToCapture.stream().map(entity -> entity.schema() + "." + entity.table()).collect(Collectors.toSet());
        HashSet<String> dbTableNamesHashSet = new HashSet<String>();
        try (PreparedStatement prepStmt = stmt.getConnection().prepareStatement(validatePublication);){
            prepStmt.setString(1, this.publicationName);
            ResultSet rs = prepStmt.executeQuery();
            while (rs.next()) {
                String tableName = String.format("%s.%s", rs.getString(1), rs.getString(2));
                dbTableNamesHashSet.add(tableName);
            }
        }
        if (dbTableNamesHashSet.equals(tableNames)) {
            throw new SQLException("Database replication is not up to date");
        }
    }

    private void createOrUpdatePublicationModeFiltered(Statement stmt, boolean isUpdate) {
        String tableFilterString = null;
        try {
            Set<TableId> tablesToCapture = this.determineCapturedTables();
            tableFilterString = tablesToCapture.stream().map(TableId::toDoubleQuotedString).collect(Collectors.joining(", "));
            if (tableFilterString.isEmpty()) {
                throw new DebeziumException(String.format("No table filters found for filtered publication %s", this.publicationName));
            }
            String createOrUpdatePublicationStmt = isUpdate ? String.format("ALTER PUBLICATION %s SET TABLE %s;", this.publicationName, tableFilterString) : (this.connectorConfig.isPublishViaPartitionRoot() ? String.format("CREATE PUBLICATION %s FOR TABLE %s WITH (publish_via_partition_root = true);", this.publicationName, tableFilterString) : String.format("CREATE PUBLICATION %s FOR TABLE %s;", this.publicationName, tableFilterString));
            LOGGER.info(isUpdate ? "Updating Publication with statement '{}'" : "Creating Publication with statement '{}'", (Object)createOrUpdatePublicationStmt);
            try {
                this.executeWithTimeout(stmt, createOrUpdatePublicationStmt);
            }
            catch (SQLException ex) {
                if (PSQLState.QUERY_CANCELED.getState().equals(ex.getSQLState()) || SQL_LOCK_NOT_AVAILABLE.equals(ex.getSQLState())) {
                    throw new DebeziumException(PUBLICATION_QUERY_FAILURE_MESSAGE, (Throwable)ex);
                }
                throw ex;
            }
        }
        catch (Exception e) {
            throw new ConnectException(String.format("Unable to %s filtered publication %s for %s", isUpdate ? "update" : "create", this.publicationName, tableFilterString), (Throwable)e);
        }
    }

    private void initReplicaIdentity() {
        if (this.replicaIdentityMapper.isPresent()) {
            Set<TableId> tablesCaptured;
            LOGGER.info("Updating Replica Identity");
            try {
                tablesCaptured = this.determineCapturedTables();
            }
            catch (Exception e) {
                throw new DebeziumException("Unable to get Captured tables", (Throwable)e);
            }
            tablesCaptured.forEach(tableId -> {
                block8: {
                    try {
                        Optional<ReplicaIdentityInfo> newReplicaIdentity = this.replicaIdentityMapper.get().findReplicaIdentity((TableId)tableId);
                        if (newReplicaIdentity.isPresent()) {
                            ReplicaIdentityInfo currentReplicaIdentity = null;
                            try {
                                currentReplicaIdentity = this.jdbcConnection.readReplicaIdentityInfo((TableId)tableId);
                                if (currentReplicaIdentity.getReplicaIdentity() == ReplicaIdentityInfo.ReplicaIdentity.INDEX) {
                                    currentReplicaIdentity.setIndexName(this.jdbcConnection.readIndexOfReplicaIdentity((TableId)tableId));
                                }
                            }
                            catch (SQLException e) {
                                LOGGER.error("Cannot determine REPLICA IDENTITY information for table {}", tableId);
                            }
                            if (currentReplicaIdentity != null && !currentReplicaIdentity.toString().equals(newReplicaIdentity.get().toString())) {
                                this.jdbcConnection.setReplicaIdentityForTable((TableId)tableId, newReplicaIdentity.get());
                                LOGGER.info("Replica identity set to {} for table '{}'", (Object)newReplicaIdentity.get(), tableId);
                            } else {
                                LOGGER.info("Replica identity for table '{}' is already {}", tableId, (Object)currentReplicaIdentity);
                            }
                            break block8;
                        }
                        LOGGER.debug("Replica identity for table '{}' will not be updated because Replica Identity is not defined on REPLICA_IDENTITY_AUTOSET_VALUES property", tableId);
                    }
                    catch (Exception e) {
                        LOGGER.error("Unable to update Replica Identity for table {}", tableId, (Object)e);
                    }
                }
            });
        }
    }

    private Set<TableId> determineCapturedTables() throws Exception {
        Set<TableId> allTableIds = this.jdbcConnection.getAllTableIds(this.connectorConfig.databaseName());
        HashSet<TableId> capturedTables = new HashSet<TableId>();
        for (TableId tableId : allTableIds) {
            if (this.tableFilter.dataCollectionFilter().isIncluded(tableId)) {
                LOGGER.trace("Adding table {} to the list of captured tables", (Object)tableId);
                capturedTables.add(tableId);
                continue;
            }
            LOGGER.trace("Ignoring table {} as it's not included in the filter configuration", (Object)tableId);
        }
        return capturedTables.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
    }

    protected void initReplicationSlot() throws SQLException, InterruptedException {
        ServerInfo.ReplicationSlot slotInfo = this.getSlotInfo();
        boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
        try {
            if (shouldCreateSlot) {
                this.createReplicationSlot();
            }
            this.pgConnection();
            String identifySystemStatement = "IDENTIFY_SYSTEM";
            LOGGER.debug("running '{}' to validate replication connection", (Object)"IDENTIFY_SYSTEM");
            Lsn xlogStart = (Lsn)this.queryAndMap("IDENTIFY_SYSTEM", rs -> {
                if (!rs.next()) {
                    throw new IllegalStateException("The DB connection is not a valid replication connection");
                }
                String xlogpos = rs.getString("xlogpos");
                LOGGER.debug("received latest xlogpos '{}'", (Object)xlogpos);
                return Lsn.valueOf(xlogpos);
            });
            if (this.slotCreationInfo != null) {
                this.defaultStartingPos = this.slotCreationInfo.startLsn();
            } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
                this.defaultStartingPos = xlogStart;
            } else {
                Lsn latestFlushedLsn = slotInfo.latestFlushedLsn();
                Lsn lsn = this.defaultStartingPos = latestFlushedLsn.compareTo(xlogStart) < 0 ? latestFlushedLsn : xlogStart;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("found previous flushed LSN '{}'", (Object)latestFlushedLsn);
                }
            }
            this.hasInitedSlot = true;
        }
        catch (SQLException e) {
            throw new JdbcConnectionException(e);
        }
    }

    private boolean useTemporarySlot() throws SQLException {
        return false;
    }

    private boolean useFailOverSlot() throws SQLException {
        boolean isPrimary;
        if (!this.createFailOverSlot) {
            return false;
        }
        if (!this.pgConnection().haveMinimumServerVersion(ServerVersion.from((String)"17"))) {
            LOGGER.debug("Can't create a failover slot on Postgres before version 17. Continuing to create a non-failover slot");
            return false;
        }
        boolean bl = isPrimary = !this.isReadOnlyDb();
        if (!isPrimary) {
            LOGGER.debug("Can't create a failover on a replica server. Continuing to create a non-failover slot");
        }
        return isPrimary;
    }

    @Override
    public ReplicationStream startStreaming(WalPositionLocator walPosition) throws SQLException, InterruptedException {
        return this.startStreaming(null, walPosition);
    }

    @Override
    public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPosition) throws SQLException, InterruptedException {
        this.initConnection();
        this.connect();
        if (offset == null || !offset.isValid()) {
            offset = this.defaultStartingPos;
        }
        Lsn lsn = offset;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("starting streaming from LSN '{}'", (Object)lsn);
        }
        int maxRetries = this.connectorConfig.maxRetries();
        Duration delay = this.connectorConfig.retryDelay();
        int tryCount = 0;
        while (true) {
            try {
                if (this.connectorConfig.slotSeekToKnownOffsetOnStart()) {
                    this.validateSlotIsInExpectedState(walPosition);
                }
                return this.createReplicationStream(lsn, walPosition);
            }
            catch (Exception e) {
                String message = "Failed to start replication stream at " + String.valueOf(lsn);
                if (++tryCount > maxRetries) {
                    if (e.getMessage().matches(".*replication slot .* is active.*")) {
                        message = message + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
                    }
                    throw new DebeziumException(message, (Throwable)e);
                }
                LOGGER.warn(message + ", waiting for {} ms and retrying, attempt number {} over {}", new Object[]{delay, tryCount, maxRetries, e});
                Metronome metronome = Metronome.sleeper((Duration)delay, (Clock)Clock.SYSTEM);
                metronome.pause();
                continue;
            }
            break;
        }
    }

    protected void validateSlotIsInExpectedState(WalPositionLocator walPosition) throws SQLException {
        Lsn lsn;
        Lsn lsn2 = lsn = walPosition.getLastCommitStoredLsn() != null ? walPosition.getLastCommitStoredLsn() : walPosition.getLastEventStoredLsn();
        if (lsn == null || !this.connectorConfig.isFlushLsnOnSource()) {
            return;
        }
        try (Statement stmt = this.pgConnection().createStatement();){
            String seekCommand = String.format("SELECT pg_replication_slot_advance('%s', '%s')", this.slotName, lsn.asString());
            LOGGER.info("Seeking to {} on the replication slot with command {}", (Object)lsn, (Object)seekCommand);
            stmt.execute(seekCommand);
        }
        catch (PSQLException e) {
            if (e.getMessage().matches("ERROR: function pg_replication_slot_advance.*does not exist(.|\\n)*") || PSQLState.UNDEFINED_FUNCTION.getState().equals(e.getSQLState())) {
                LOGGER.info("Postgres server doesn't support the command pg_replication_slot_advance(). Not seeking to last known offset.");
            }
            if (e.getMessage().matches("ERROR: must be superuser or replication role to use replication slots(.|\\n)*") || SQL_STATE_INSUFFICIENT_PRIVILEGE.equals(e.getSQLState())) {
                LOGGER.warn("Unable to use pg_replication_slot_advance() function. The Postgres server is likely on an old RDS version or privileges are not correctly set", (Throwable)e);
            }
            if (e.getMessage().matches("ERROR: cannot advance replication slot to.*") || PSQLState.OBJECT_NOT_IN_STATE.getState().equals(e.getSQLState())) {
                switch (this.connectorConfig.getEventProcessingFailureHandlingMode()) {
                    case FAIL: {
                        throw new DebeziumException(String.format("Cannot seek to the last known offset '%s' on replication slot '%s'. Error from server: %s", lsn.asString(), this.slotName, e.getMessage()));
                    }
                    case WARN: {
                        LOGGER.warn("Cannot seek to the last known offset '{}' on replication slot '{}'. Error from server: '{}'", new Object[]{lsn.asString(), this.slotName, e.getMessage(), e});
                        break;
                    }
                    case SKIP: 
                    case IGNORE: {
                        LOGGER.debug("Cannot seek to the last known offset '{}' on replication slot '{}'. Error from server: '{}'", new Object[]{lsn.asString(), this.slotName, e.getMessage(), e});
                    }
                }
            }
            switch (this.connectorConfig.getEventProcessingFailureHandlingMode()) {
                case FAIL: {
                    throw new DebeziumException((Throwable)e);
                }
                case WARN: {
                    LOGGER.warn("Unexpected error while trying to seek LSN", (Throwable)e);
                    break;
                }
                case SKIP: 
                case IGNORE: {
                    LOGGER.debug("Unexpected error while trying to seek LSN", (Throwable)e);
                }
            }
        }
    }

    @Override
    public void initConnection() throws SQLException, InterruptedException {
        this.initPublication();
        this.initReplicaIdentity();
        if (!this.hasInitedSlot) {
            this.initReplicationSlot();
        }
    }

    @Override
    public Optional<SlotCreationResult> createReplicationSlot() throws SQLException {
        LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", (Object)this.slotName, (Object)this.plugin);
        String tempPart = "";
        boolean canExportSnapshot = this.pgConnection().haveMinimumServerVersion((Version)ServerVersion.v9_4);
        if (this.dropSlotOnClose && !canExportSnapshot) {
            LOGGER.warn("A slot marked as temporary or with an exported snapshot was created, but not on a supported version of Postgres, ignoring!");
        }
        if (this.useTemporarySlot()) {
            tempPart = "TEMPORARY";
        }
        String failOverPart = "";
        if (this.useFailOverSlot()) {
            failOverPart = "( FAILOVER )";
        }
        this.initPublication();
        try (Statement stmt = this.pgConnection().createStatement();){
            stmt.setQueryTimeout(Math.toIntExact(this.connectorConfig.createSlotCommandTimeout()));
            String createCommand = String.format("CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s %s", this.slotName, tempPart, this.plugin.getPostgresPluginName(), failOverPart);
            LOGGER.info("Creating replication slot with command {}", (Object)createCommand);
            int maxRetries = this.connectorConfig.maxRetries();
            Duration delay = this.connectorConfig.retryDelay();
            int tryCount = 0;
            while (true) {
                try {
                    stmt.execute(createCommand);
                }
                catch (SQLException ex) {
                    if (ex.getSQLState().equals("57014") || ex.getSQLState().equals(SQL_LOCK_NOT_AVAILABLE)) {
                        String message = "Creation of replication slot failed; query to create replication slot timed out, please make sure that there are no long running queries on the database.";
                        if (++tryCount > maxRetries) {
                            throw new DebeziumException(message, (Throwable)ex);
                        }
                        LOGGER.warn("{} Waiting for {} and retrying, attempt number {} over {}", new Object[]{message, delay, tryCount, maxRetries, ex});
                        Metronome metronome = Metronome.parker((Duration)delay, (Clock)Clock.SYSTEM);
                        try {
                            metronome.pause();
                        }
                        catch (InterruptedException e) {
                            LOGGER.warn("Slot creation retry sleep interrupted by exception: {}", (Object)e.getMessage());
                            Thread.currentThread().interrupt();
                        }
                        continue;
                    }
                    throw ex;
                }
                break;
            }
            if (canExportSnapshot) {
                this.slotCreationInfo = this.parseSlotCreation(stmt.getResultSet());
            }
            Optional<SlotCreationResult> optional = Optional.ofNullable(this.slotCreationInfo);
            return optional;
        }
    }

    protected BaseConnection pgConnection() throws SQLException {
        return this.connection(false).unwrap(BaseConnection.class);
    }

    private SlotCreationResult parseSlotCreation(ResultSet rs) {
        try {
            if (rs.next()) {
                String slotName = rs.getString("slot_name");
                String startPoint = rs.getString("consistent_point");
                String snapName = rs.getString("snapshot_name");
                String pluginName = rs.getString("output_plugin");
                return new SlotCreationResult(slotName, startPoint, snapName, pluginName);
            }
            throw new ConnectException("No replication slot found");
        }
        catch (SQLException ex) {
            throw new ConnectException("Unable to parse create_replication_slot response", (Throwable)ex);
        }
    }

    private ReplicationStream createReplicationStream(final Lsn startLsn, final WalPositionLocator walPosition) throws SQLException, InterruptedException {
        PGReplicationStream s;
        try {
            try {
                s = this.startPgReplicationStream(startLsn, this.messageDecoder::defaultOptions);
            }
            catch (PSQLException e) {
                LOGGER.debug("Could not register for streaming, retrying without optional options", (Throwable)e);
                if (this.useTemporarySlot()) {
                    this.initReplicationSlot();
                }
                s = this.startPgReplicationStream(startLsn, this.messageDecoder::defaultOptions);
            }
        }
        catch (PSQLException e) {
            if (e.getMessage().matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
                LOGGER.error("Cannot rewind to last processed WAL position", (Throwable)e);
                throw new ConnectException("The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
            }
            throw e;
        }
        final PGReplicationStream stream = s;
        return new ReplicationStream(){
            private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
            private int warningCheckCounter = 100;
            private ExecutorService keepAliveExecutor = null;
            private AtomicBoolean keepAliveRunning;
            private final Metronome metronome;
            private volatile Lsn lastReceivedLsn;
            {
                this.metronome = Metronome.sleeper((Duration)PostgresReplicationConnection.this.statusUpdateInterval, (Clock)Clock.SYSTEM);
            }

            @Override
            public void read(ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                this.processWarnings(false);
                ByteBuffer read = stream.read();
                Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                LOGGER.trace("Streaming requested from LSN {}, received LSN {}", (Object)startLsn, (Object)lastReceiveLsn);
                if (PostgresReplicationConnection.this.messageDecoder.shouldMessageBeSkipped(read, lastReceiveLsn, startLsn, walPosition)) {
                    return;
                }
                this.deserializeMessages(read, processor);
            }

            @Override
            public boolean readPending(ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                this.processWarnings(false);
                ByteBuffer read = stream.readPending();
                Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                LOGGER.trace("Streaming requested from LSN {}, received LSN {}", (Object)startLsn, (Object)lastReceiveLsn);
                if (read == null) {
                    return false;
                }
                if (PostgresReplicationConnection.this.messageDecoder.shouldMessageBeSkipped(read, lastReceiveLsn, startLsn, walPosition)) {
                    return true;
                }
                this.deserializeMessages(read, processor);
                return true;
            }

            private void deserializeMessages(ByteBuffer buffer, ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                this.lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                LOGGER.trace("Received message at LSN {}", (Object)this.lastReceivedLsn);
                PostgresReplicationConnection.this.messageDecoder.processMessage(buffer, processor, PostgresReplicationConnection.this.typeRegistry);
            }

            @Override
            public void close() throws SQLException {
                this.processWarnings(true);
                stream.close();
            }

            @Override
            public void flushLsn(Lsn lsn) throws SQLException {
                if (PostgresReplicationConnection.this.connectorConfig.isFlushLsnOnSource()) {
                    this.doFlushLsn(lsn);
                }
            }

            private void doFlushLsn(Lsn lsn) throws SQLException {
                stream.setFlushedLSN(lsn.asLogSequenceNumber());
                stream.setAppliedLSN(lsn.asLogSequenceNumber());
                stream.forceUpdateStatus();
            }

            @Override
            public Lsn lastReceivedLsn() {
                return this.lastReceivedLsn;
            }

            @Override
            public void startKeepAlive(ExecutorService service) {
                if (this.keepAliveExecutor == null) {
                    this.keepAliveExecutor = service;
                    this.keepAliveRunning = new AtomicBoolean(true);
                    this.keepAliveExecutor.submit(() -> {
                        while (this.keepAliveRunning.get()) {
                            try {
                                LOGGER.trace("Forcing status update with replication stream");
                                stream.forceUpdateStatus();
                                this.metronome.pause();
                            }
                            catch (Exception exp) {
                                LOGGER.error("Unexpected exception while performing keepalive status update on the replication stream", (Throwable)exp);
                                return;
                            }
                        }
                    });
                }
            }

            @Override
            public void stopKeepAlive() {
                if (this.keepAliveExecutor != null) {
                    this.keepAliveRunning.set(false);
                    this.keepAliveExecutor.shutdownNow();
                    this.keepAliveExecutor = null;
                }
            }

            private void processWarnings(boolean forced) throws SQLException {
                if (--this.warningCheckCounter == 0 || forced) {
                    this.warningCheckCounter = 100;
                    for (SQLWarning w = PostgresReplicationConnection.this.connection().getWarnings(); w != null; w = w.getNextWarning()) {
                        LOGGER.debug("Server-side message: '{}', state = {}, code = {}", new Object[]{w.getMessage(), w.getSQLState(), w.getErrorCode()});
                    }
                    PostgresReplicationConnection.this.connection().clearWarnings();
                }
            }

            @Override
            public Lsn startLsn() {
                return startLsn;
            }
        };
    }

    private PGReplicationStream startPgReplicationStream(Lsn lsn, BiFunction<ChainedLogicalStreamBuilder, Function<Integer, Boolean>, ChainedLogicalStreamBuilder> configurator) throws SQLException {
        assert (lsn != null);
        boolean enableDriverKeepaliveFlush = this.connectorConfig.getLsnFlushMode() == PostgresConnectorConfig.LsnFlushMode.CONNECTOR_AND_DRIVER;
        LOGGER.info("Starting replication stream from LSN {} with automaticFlush={} (mode={})", new Object[]{lsn, enableDriverKeepaliveFlush, this.connectorConfig.getLsnFlushMode()});
        ChainedLogicalStreamBuilder streamBuilder = ((ChainedLogicalStreamBuilder)((ChainedLogicalStreamBuilder)((ChainedLogicalStreamBuilder)this.pgConnection().getReplicationAPI().replicationStream().logical().withSlotName("\"" + this.slotName + "\"")).withStartPosition(lsn.asLogSequenceNumber())).withAutomaticFlush(enableDriverKeepaliveFlush)).withSlotOptions(this.streamParams);
        streamBuilder = configurator.apply(streamBuilder, this::hasMinimumVersion);
        if (this.statusUpdateInterval != null && this.statusUpdateInterval.toMillis() > 0L) {
            streamBuilder.withStatusInterval(Math.toIntExact(this.statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
        }
        PGReplicationStream stream = streamBuilder.start();
        try {
            Thread.sleep(10L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        stream.forceUpdateStatus();
        return stream;
    }

    private Boolean hasMinimumVersion(int version) {
        try {
            return this.pgConnection().haveMinimumServerVersion(version);
        }
        catch (SQLException e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    @Override
    public synchronized void close() {
        this.close(true);
    }

    public synchronized void close(boolean dropSlot) {
        try {
            LOGGER.debug("Closing message decoder");
            this.messageDecoder.close();
        }
        catch (Throwable e) {
            LOGGER.error("Unexpected error while closing message decoder", e);
        }
        try {
            LOGGER.debug("Closing replication connection");
            super.close();
        }
        catch (Throwable e) {
            LOGGER.error("Unexpected error while closing Postgres connection", e);
        }
        if (this.dropSlotOnClose && dropSlot) {
            try (PostgresConnection connection = new PostgresConnection(this.connectorConfig.getJdbcConfig(), "Debezium Drop Slot");){
                connection.dropReplicationSlot(this.slotName);
            }
            catch (Throwable e) {
                LOGGER.error("Unexpected error while dropping replication slot", e);
            }
        }
    }

    @Override
    public void reconnect() throws SQLException {
        this.close(false);
        this.connection(false);
    }

    protected static class ReplicationConnectionBuilder
    implements ReplicationConnection.Builder {
        private final PostgresConnectorConfig config;
        private String slotName = "debezium";
        private String publicationName = "dbz_publication";
        private RelationalTableFilters tableFilter;
        private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode = PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
        private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
        private boolean dropSlotOnClose = true;
        private boolean createFailOverSlot = false;
        private Duration statusUpdateIntervalVal;
        private TypeRegistry typeRegistry;
        private PostgresSchema schema;
        private Properties slotStreamParams = new Properties();
        private PostgresConnection jdbcConnection;

        protected ReplicationConnectionBuilder(PostgresConnectorConfig config) {
            assert (config != null);
            this.config = config;
        }

        @Override
        public ReplicationConnectionBuilder withSlot(String slotName) {
            assert (slotName != null);
            this.slotName = slotName;
            return this;
        }

        @Override
        public ReplicationConnection.Builder withPublication(String publicationName) {
            assert (publicationName != null);
            this.publicationName = publicationName;
            return this;
        }

        @Override
        public ReplicationConnection.Builder withTableFilter(RelationalTableFilters tableFilter) {
            assert (tableFilter != null);
            this.tableFilter = tableFilter;
            return this;
        }

        @Override
        public ReplicationConnection.Builder withPublicationAutocreateMode(PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode) {
            assert (this.publicationName != null);
            this.publicationAutocreateMode = publicationAutocreateMode;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder withPlugin(PostgresConnectorConfig.LogicalDecoder plugin) {
            assert (plugin != null);
            this.plugin = plugin;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder dropSlotOnClose(boolean dropSlotOnClose) {
            this.dropSlotOnClose = dropSlotOnClose;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder createFailOverSlot(boolean createFailOverSlot) {
            this.createFailOverSlot = createFailOverSlot;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder streamParams(String slotStreamParams) {
            if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
                String[] paramsWithValues;
                this.slotStreamParams = new Properties();
                for (String paramsWithValue : paramsWithValues = slotStreamParams.split(";")) {
                    String[] paramAndValue = paramsWithValue.split("=");
                    if (paramAndValue.length == 2) {
                        this.slotStreamParams.setProperty(paramAndValue[0], paramAndValue[1]);
                        continue;
                    }
                    LOGGER.warn("The following STREAM_PARAMS value is invalid: {}", (Object)paramsWithValue);
                }
            }
            return this;
        }

        @Override
        public ReplicationConnectionBuilder statusUpdateInterval(Duration statusUpdateInterval) {
            this.statusUpdateIntervalVal = statusUpdateInterval;
            return this;
        }

        @Override
        public ReplicationConnection.Builder jdbcMetadataConnection(PostgresConnection jdbcConnection) {
            this.jdbcConnection = jdbcConnection;
            return this;
        }

        @Override
        public ReplicationConnection build() {
            assert (this.plugin != null) : "Decoding plugin name is not set";
            return new PostgresReplicationConnection(this.config, this.slotName, this.publicationName, this.tableFilter, this.publicationAutocreateMode, this.plugin, this.dropSlotOnClose, this.createFailOverSlot, this.statusUpdateIntervalVal, this.jdbcConnection, this.typeRegistry, this.slotStreamParams, this.schema);
        }

        @Override
        public ReplicationConnection.Builder withTypeRegistry(TypeRegistry typeRegistry) {
            this.typeRegistry = typeRegistry;
            return this;
        }

        @Override
        public ReplicationConnection.Builder withSchema(PostgresSchema schema) {
            this.schema = schema;
            return this;
        }
    }
}

