/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql.connection;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalCreateSlotBuilder;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.postgresql.util.PSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresReplicationConnection
extends JdbcConnection
implements ReplicationConnection {
    private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
    private final String slotName;
    private final PostgresConnectorConfig.LogicalDecoder plugin;
    private final boolean dropSlotOnClose;
    private final Configuration originalConfig;
    private final Integer statusUpdateIntervalMillis;
    private final MessageDecoder messageDecoder;
    private final TypeRegistry typeRegistry;
    private long defaultStartingPos;

    private PostgresReplicationConnection(Configuration config, String slotName, PostgresConnectorConfig.LogicalDecoder plugin, boolean dropSlotOnClose, Integer statusUpdateIntervalMillis, TypeRegistry typeRegistry) {
        super(config, PostgresConnection.FACTORY, null, PostgresReplicationConnection::defaultSettings);
        this.originalConfig = config;
        this.slotName = slotName;
        this.plugin = plugin;
        this.dropSlotOnClose = dropSlotOnClose;
        this.statusUpdateIntervalMillis = statusUpdateIntervalMillis;
        this.messageDecoder = plugin.messageDecoder();
        this.typeRegistry = typeRegistry;
        try {
            this.initReplicationSlot();
        }
        catch (ConnectException e) {
            this.close();
            throw e;
        }
        catch (Throwable t) {
            this.close();
            throw new ConnectException("Cannot create replication connection", t);
        }
    }

    protected void initReplicationSlot() throws SQLException, InterruptedException {
        ServerInfo.ReplicationSlot slotInfo;
        String postgresPluginName = this.plugin.getPostgresPluginName();
        try (PostgresConnection connection = new PostgresConnection(this.originalConfig);){
            slotInfo = connection.readReplicationSlotInfo(this.slotName, postgresPluginName);
        }
        boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
        try {
            if (shouldCreateSlot) {
                LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", (Object)this.slotName, (Object)this.plugin);
                if (this.dropSlotOnClose && this.pgConnection().getServerMajorVersion() >= 10) {
                    try (Statement stmt = this.pgConnection().createStatement();){
                        stmt.execute(String.format("CREATE_REPLICATION_SLOT %s TEMPORARY LOGICAL %s", this.slotName, postgresPluginName));
                    }
                } else {
                    ((ChainedLogicalCreateSlotBuilder)this.pgConnection().getReplicationAPI().createReplicationSlot().logical().withSlotName(this.slotName)).withOutputPlugin(postgresPluginName).make();
                }
            } else if (slotInfo.active()) {
                LOGGER.error("A logical replication slot named '{}' for plugin '{}' and database '{}' is already active on the server.You cannot have multiple slots with the same name active for the same database", new Object[]{this.slotName, postgresPluginName, this.database()});
                throw new IllegalStateException();
            }
            AtomicLong xlogStart = new AtomicLong();
            this.pgConnection();
            this.execute(statement -> {
                String identifySystemStatement = "IDENTIFY_SYSTEM";
                LOGGER.debug("running '{}' to validate replication connection", (Object)identifySystemStatement);
                try (ResultSet rs = statement.executeQuery(identifySystemStatement);){
                    if (!rs.next()) {
                        throw new IllegalStateException("The DB connection is not a valid replication connection");
                    }
                    String xlogpos = rs.getString("xlogpos");
                    LOGGER.debug("received latest xlogpos '{}'", (Object)xlogpos);
                    xlogStart.compareAndSet(0L, LogSequenceNumber.valueOf((String)xlogpos).asLong());
                }
            });
            if (shouldCreateSlot || !slotInfo.hasValidFlushedLSN()) {
                this.defaultStartingPos = xlogStart.get();
            } else {
                Long latestFlushedLSN = slotInfo.latestFlushedLSN();
                long l = this.defaultStartingPos = latestFlushedLSN < xlogStart.get() ? latestFlushedLSN.longValue() : xlogStart.get();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("found previous flushed LSN '{}'", (Object)ReplicationConnection.format(latestFlushedLSN));
                }
            }
        }
        catch (SQLException e) {
            throw new JdbcConnectionException(e);
        }
    }

    @Override
    public ReplicationStream startStreaming() throws SQLException {
        return this.startStreaming(this.defaultStartingPos);
    }

    @Override
    public ReplicationStream startStreaming(Long offset) throws SQLException {
        this.connect();
        if (offset == null || offset <= 0L) {
            offset = this.defaultStartingPos;
        }
        LogSequenceNumber lsn = LogSequenceNumber.valueOf((long)offset);
        LOGGER.debug("starting streaming from LSN '{}'", (Object)lsn.asString());
        return this.createReplicationStream(lsn);
    }

    protected PgConnection pgConnection() throws SQLException {
        return (PgConnection)this.connection(false);
    }

    private ReplicationStream createReplicationStream(LogSequenceNumber lsn) throws SQLException {
        PGReplicationStream s;
        try {
            s = this.startPgReplicationStream(lsn, this.plugin.forceRds() ? this.messageDecoder::optionsWithoutMetadata : this.messageDecoder::optionsWithMetadata);
            this.messageDecoder.setContainsMetadata(!this.plugin.forceRds());
        }
        catch (PSQLException e) {
            if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
                LOGGER.warn("Could not register for streaming with metadata in messages, falling back to messages without metadata");
                s = this.startPgReplicationStream(lsn, this.messageDecoder::optionsWithoutMetadata);
                this.messageDecoder.setContainsMetadata(false);
            }
            if (e.getMessage().matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
                LOGGER.error("Cannot rewind to last processed WAL position", (Throwable)e);
                throw new ConnectException("The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
            }
            throw e;
        }
        final PGReplicationStream stream = s;
        final long lsnLong = lsn.asLong();
        return new ReplicationStream(){
            private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
            private int warningCheckCounter = 100;
            private volatile LogSequenceNumber lastReceivedLSN;

            @Override
            public void read(ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                ByteBuffer read = stream.read();
                if (lsnLong >= stream.getLastReceiveLSN().asLong()) {
                    return;
                }
                this.deserializeMessages(read, processor);
            }

            @Override
            public void readPending(ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                ByteBuffer read = stream.readPending();
                if (read == null || lsnLong >= stream.getLastReceiveLSN().asLong()) {
                    return;
                }
                this.deserializeMessages(read, processor);
            }

            private void deserializeMessages(ByteBuffer buffer, ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                this.lastReceivedLSN = stream.getLastReceiveLSN();
                PostgresReplicationConnection.this.messageDecoder.processMessage(buffer, processor, PostgresReplicationConnection.this.typeRegistry);
            }

            @Override
            public void close() throws SQLException {
                this.processWarnings(true);
                stream.close();
            }

            @Override
            public void flushLastReceivedLsn() throws SQLException {
                if (this.lastReceivedLSN == null) {
                    return;
                }
                this.doFlushLsn(this.lastReceivedLSN);
            }

            @Override
            public void flushLsn(long lsn) throws SQLException {
                this.doFlushLsn(LogSequenceNumber.valueOf((long)lsn));
            }

            private void doFlushLsn(LogSequenceNumber lsn) throws SQLException {
                stream.setFlushedLSN(lsn);
                stream.setAppliedLSN(lsn);
                stream.forceUpdateStatus();
            }

            @Override
            public Long lastReceivedLsn() {
                return this.lastReceivedLSN != null ? Long.valueOf(this.lastReceivedLSN.asLong()) : null;
            }

            private void processWarnings(boolean forced) throws SQLException {
                if (--this.warningCheckCounter == 0 || forced) {
                    this.warningCheckCounter = 100;
                    for (SQLWarning w = PostgresReplicationConnection.this.connection().getWarnings(); w != null; w = w.getNextWarning()) {
                        LOGGER.debug("Server-side message: '{}', state = {}, code = {}", new Object[]{w.getMessage(), w.getSQLState(), w.getErrorCode()});
                    }
                }
            }
        };
    }

    private PGReplicationStream startPgReplicationStream(LogSequenceNumber lsn, Function<ChainedLogicalStreamBuilder, ChainedLogicalStreamBuilder> configurator) throws SQLException {
        assert (lsn != null);
        ChainedLogicalStreamBuilder streamBuilder = (ChainedLogicalStreamBuilder)((ChainedLogicalStreamBuilder)this.pgConnection().getReplicationAPI().replicationStream().logical().withSlotName(this.slotName)).withStartPosition(lsn);
        streamBuilder = configurator.apply(streamBuilder);
        if (this.statusUpdateIntervalMillis != null && this.statusUpdateIntervalMillis > 0) {
            streamBuilder.withStatusInterval(this.statusUpdateIntervalMillis.intValue(), TimeUnit.MILLISECONDS);
        }
        PGReplicationStream stream = streamBuilder.start();
        try {
            Thread.sleep(10L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        stream.forceUpdateStatus();
        return stream;
    }

    @Override
    public synchronized void close() {
        try {
            super.close();
        }
        catch (Throwable e) {
            LOGGER.error("Unexpected error while closing Postgres connection", e);
        }
        if (this.dropSlotOnClose) {
            try (PostgresConnection connection = new PostgresConnection(this.originalConfig);){
                connection.dropReplicationSlot(this.slotName);
            }
            catch (Throwable e) {
                LOGGER.error("Unexpected error while dropping replication slot", e);
            }
        }
    }

    protected static void defaultSettings(Configuration.Builder builder) {
        PostgresConnection.defaultSettings(builder);
        builder.with("replication", "database").with("preferQueryMode", "simple");
    }

    protected static class ReplicationConnectionBuilder
    implements ReplicationConnection.Builder {
        private final Configuration config;
        private String slotName = "debezium";
        private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
        private boolean dropSlotOnClose = true;
        private Integer statusUpdateIntervalMillis;
        private TypeRegistry typeRegistry;

        protected ReplicationConnectionBuilder(Configuration config) {
            assert (config != null);
            this.config = config;
        }

        @Override
        public ReplicationConnectionBuilder withSlot(String slotName) {
            assert (slotName != null);
            this.slotName = slotName;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder withPlugin(PostgresConnectorConfig.LogicalDecoder plugin) {
            assert (plugin != null);
            this.plugin = plugin;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder dropSlotOnClose(boolean dropSlotOnClose) {
            this.dropSlotOnClose = dropSlotOnClose;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder statusUpdateIntervalMillis(Integer statusUpdateIntervalMillis) {
            this.statusUpdateIntervalMillis = statusUpdateIntervalMillis;
            return this;
        }

        @Override
        public ReplicationConnection build() {
            assert (this.plugin != null) : "Decoding plugin name is not set";
            return new PostgresReplicationConnection(this.config, this.slotName, this.plugin, this.dropSlotOnClose, this.statusUpdateIntervalMillis, this.typeRegistry);
        }

        @Override
        public ReplicationConnection.Builder withTypeRegistry(TypeRegistry typeRegistry) {
            this.typeRegistry = typeRegistry;
            return this;
        }
    }
}

