/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql.connection;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalCreateSlotBuilder;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.postgresql.util.PSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresReplicationConnection
extends JdbcConnection
implements ReplicationConnection {
    private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
    private final String slotName;
    private final PostgresConnectorConfig.LogicalDecoder plugin;
    private final boolean dropSlotOnClose;
    private final Configuration originalConfig;
    private final Duration statusUpdateInterval;
    private final MessageDecoder messageDecoder;
    private final TypeRegistry typeRegistry;
    private final Properties streamParams;
    private long defaultStartingPos;

    private PostgresReplicationConnection(Configuration config, String slotName, PostgresConnectorConfig.LogicalDecoder plugin, boolean dropSlotOnClose, Duration statusUpdateInterval, TypeRegistry typeRegistry, Properties streamParams) {
        super(config, PostgresConnection.FACTORY, null, PostgresReplicationConnection::defaultSettings);
        this.originalConfig = config;
        this.slotName = slotName;
        this.plugin = plugin;
        this.dropSlotOnClose = dropSlotOnClose;
        this.statusUpdateInterval = statusUpdateInterval;
        this.messageDecoder = plugin.messageDecoder();
        this.typeRegistry = typeRegistry;
        this.streamParams = streamParams;
        try {
            this.initReplicationSlot();
        }
        catch (ConnectException e) {
            this.close();
            throw e;
        }
        catch (Throwable t) {
            this.close();
            throw new ConnectException("Cannot create replication connection", t);
        }
    }

    protected void initReplicationSlot() throws SQLException, InterruptedException {
        ServerInfo.ReplicationSlot slotInfo;
        String postgresPluginName = this.plugin.getPostgresPluginName();
        try (PostgresConnection connection = new PostgresConnection(this.originalConfig);){
            slotInfo = connection.readReplicationSlotInfo(this.slotName, postgresPluginName);
        }
        boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
        try {
            if (shouldCreateSlot) {
                LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", (Object)this.slotName, (Object)this.plugin);
                if (this.useTemporarySlot()) {
                    try (Statement stmt = this.pgConnection().createStatement();){
                        stmt.execute(String.format("CREATE_REPLICATION_SLOT %s TEMPORARY LOGICAL %s", this.slotName, postgresPluginName));
                    }
                } else {
                    ((ChainedLogicalCreateSlotBuilder)this.pgConnection().getReplicationAPI().createReplicationSlot().logical().withSlotName(this.slotName)).withOutputPlugin(postgresPluginName).make();
                }
            } else if (slotInfo.active()) {
                LOGGER.error("A logical replication slot named '{}' for plugin '{}' and database '{}' is already active on the server.You cannot have multiple slots with the same name active for the same database", new Object[]{this.slotName, postgresPluginName, this.database()});
                throw new IllegalStateException();
            }
            AtomicLong xlogStart = new AtomicLong();
            this.pgConnection();
            this.execute(statement -> {
                String identifySystemStatement = "IDENTIFY_SYSTEM";
                LOGGER.debug("running '{}' to validate replication connection", (Object)identifySystemStatement);
                try (ResultSet rs = statement.executeQuery(identifySystemStatement);){
                    if (!rs.next()) {
                        throw new IllegalStateException("The DB connection is not a valid replication connection");
                    }
                    String xlogpos = rs.getString("xlogpos");
                    LOGGER.debug("received latest xlogpos '{}'", (Object)xlogpos);
                    xlogStart.compareAndSet(0L, LogSequenceNumber.valueOf((String)xlogpos).asLong());
                }
            });
            if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
                this.defaultStartingPos = xlogStart.get();
            } else {
                Long latestFlushedLsn = slotInfo.latestFlushedLsn();
                long l = this.defaultStartingPos = latestFlushedLsn < xlogStart.get() ? latestFlushedLsn.longValue() : xlogStart.get();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("found previous flushed LSN '{}'", (Object)ReplicationConnection.format(latestFlushedLsn));
                }
            }
        }
        catch (SQLException e) {
            throw new JdbcConnectionException(e);
        }
    }

    private boolean useTemporarySlot() throws SQLException {
        return this.dropSlotOnClose && this.pgConnection().getServerMajorVersion() >= 10;
    }

    @Override
    public ReplicationStream startStreaming() throws SQLException, InterruptedException {
        return this.startStreaming(this.defaultStartingPos);
    }

    @Override
    public ReplicationStream startStreaming(Long offset) throws SQLException, InterruptedException {
        this.connect();
        if (offset == null || offset <= 0L) {
            offset = this.defaultStartingPos;
        }
        LogSequenceNumber lsn = LogSequenceNumber.valueOf((long)offset);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("starting streaming from LSN '{}'", (Object)lsn.asString());
        }
        return this.createReplicationStream(lsn);
    }

    protected PgConnection pgConnection() throws SQLException {
        return (PgConnection)this.connection(false);
    }

    private ReplicationStream createReplicationStream(LogSequenceNumber lsn) throws SQLException, InterruptedException {
        PGReplicationStream s;
        try {
            try {
                s = this.startPgReplicationStream(lsn, this.plugin.forceRds() ? x -> this.messageDecoder.optionsWithoutMetadata(this.messageDecoder.tryOnceOptions((ChainedLogicalStreamBuilder)x)) : x -> this.messageDecoder.optionsWithMetadata(this.messageDecoder.tryOnceOptions((ChainedLogicalStreamBuilder)x)));
                this.messageDecoder.setContainsMetadata(!this.plugin.forceRds());
            }
            catch (PSQLException e) {
                LOGGER.debug("Could not register for streaming, retrying without optional options", (Throwable)e);
                if (this.useTemporarySlot()) {
                    this.initReplicationSlot();
                }
                s = this.startPgReplicationStream(lsn, this.plugin.forceRds() ? this.messageDecoder::optionsWithoutMetadata : this.messageDecoder::optionsWithMetadata);
                this.messageDecoder.setContainsMetadata(!this.plugin.forceRds());
            }
        }
        catch (PSQLException e) {
            if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
                LOGGER.warn("Could not register for streaming with metadata in messages, falling back to messages without metadata");
                if (this.useTemporarySlot()) {
                    this.initReplicationSlot();
                }
                s = this.startPgReplicationStream(lsn, this.messageDecoder::optionsWithoutMetadata);
                this.messageDecoder.setContainsMetadata(false);
            }
            if (e.getMessage().matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
                LOGGER.error("Cannot rewind to last processed WAL position", (Throwable)e);
                throw new ConnectException("The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
            }
            throw e;
        }
        final PGReplicationStream stream = s;
        final long lsnLong = lsn.asLong();
        return new ReplicationStream(){
            private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
            private int warningCheckCounter = 100;
            private ExecutorService keepAliveExecutor = null;
            private AtomicBoolean keepAliveRunning;
            private final Metronome metronome = Metronome.sleeper((Duration)PostgresReplicationConnection.access$000(PostgresReplicationConnection.this), (Clock)Clock.SYSTEM);
            private volatile LogSequenceNumber lastReceivedLsn;

            @Override
            public void read(ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                ByteBuffer read = stream.read();
                if (lsnLong >= stream.getLastReceiveLSN().asLong()) {
                    return;
                }
                this.deserializeMessages(read, processor);
            }

            private void deserializeMessages(ByteBuffer buffer, ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                this.lastReceivedLsn = stream.getLastReceiveLSN();
                PostgresReplicationConnection.this.messageDecoder.processMessage(buffer, processor, PostgresReplicationConnection.this.typeRegistry);
            }

            @Override
            public void close() throws SQLException {
                this.processWarnings(true);
                stream.close();
            }

            @Override
            public void flushLsn(long lsn) throws SQLException {
                this.doFlushLsn(LogSequenceNumber.valueOf((long)lsn));
            }

            private void doFlushLsn(LogSequenceNumber lsn) throws SQLException {
                stream.setFlushedLSN(lsn);
                stream.setAppliedLSN(lsn);
                stream.forceUpdateStatus();
            }

            @Override
            public Long lastReceivedLsn() {
                return this.lastReceivedLsn != null ? Long.valueOf(this.lastReceivedLsn.asLong()) : null;
            }

            @Override
            public void startKeepAlive(ExecutorService service) {
                if (this.keepAliveExecutor == null) {
                    this.keepAliveExecutor = service;
                    this.keepAliveRunning = new AtomicBoolean(true);
                    this.keepAliveExecutor.submit(() -> {
                        while (this.keepAliveRunning.get()) {
                            try {
                                LOGGER.trace("Forcing status update with replication stream");
                                stream.forceUpdateStatus();
                                this.metronome.pause();
                            }
                            catch (Exception exp) {
                                throw new RuntimeException("received unexpected exception will perform keep alive", exp);
                            }
                        }
                    });
                }
            }

            @Override
            public void stopKeepAlive() {
                if (this.keepAliveExecutor != null) {
                    this.keepAliveRunning.set(false);
                    this.keepAliveExecutor.shutdownNow();
                    this.keepAliveExecutor = null;
                }
            }

            private void processWarnings(boolean forced) throws SQLException {
                if (--this.warningCheckCounter == 0 || forced) {
                    this.warningCheckCounter = 100;
                    for (SQLWarning w = PostgresReplicationConnection.this.connection().getWarnings(); w != null; w = w.getNextWarning()) {
                        LOGGER.debug("Server-side message: '{}', state = {}, code = {}", new Object[]{w.getMessage(), w.getSQLState(), w.getErrorCode()});
                    }
                }
            }
        };
    }

    private PGReplicationStream startPgReplicationStream(LogSequenceNumber lsn, Function<ChainedLogicalStreamBuilder, ChainedLogicalStreamBuilder> configurator) throws SQLException {
        assert (lsn != null);
        ChainedLogicalStreamBuilder streamBuilder = ((ChainedLogicalStreamBuilder)((ChainedLogicalStreamBuilder)this.pgConnection().getReplicationAPI().replicationStream().logical().withSlotName(this.slotName)).withStartPosition(lsn)).withSlotOptions(this.streamParams);
        streamBuilder = configurator.apply(streamBuilder);
        if (this.statusUpdateInterval != null && this.statusUpdateInterval.toMillis() > 0L) {
            streamBuilder.withStatusInterval(Math.toIntExact(this.statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
        }
        PGReplicationStream stream = streamBuilder.start();
        try {
            Thread.sleep(10L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        stream.forceUpdateStatus();
        return stream;
    }

    @Override
    public synchronized void close() {
        try {
            super.close();
        }
        catch (Throwable e) {
            LOGGER.error("Unexpected error while closing Postgres connection", e);
        }
        if (this.dropSlotOnClose) {
            try (PostgresConnection connection = new PostgresConnection(this.originalConfig);){
                connection.dropReplicationSlot(this.slotName);
            }
            catch (Throwable e) {
                LOGGER.error("Unexpected error while dropping replication slot", e);
            }
        }
    }

    protected static void defaultSettings(Configuration.Builder builder) {
        PostgresConnection.defaultSettings(builder);
        builder.with("replication", "database").with("preferQueryMode", "simple");
    }

    static /* synthetic */ Duration access$000(PostgresReplicationConnection x0) {
        return x0.statusUpdateInterval;
    }

    protected static class ReplicationConnectionBuilder
    implements ReplicationConnection.Builder {
        private final Configuration config;
        private String slotName = "debezium";
        private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
        private boolean dropSlotOnClose = true;
        private Duration statusUpdateIntervalVal;
        private TypeRegistry typeRegistry;
        private Properties slotStreamParams = new Properties();

        protected ReplicationConnectionBuilder(Configuration config) {
            assert (config != null);
            this.config = config;
        }

        @Override
        public ReplicationConnectionBuilder withSlot(String slotName) {
            assert (slotName != null);
            this.slotName = slotName;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder withPlugin(PostgresConnectorConfig.LogicalDecoder plugin) {
            assert (plugin != null);
            this.plugin = plugin;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder dropSlotOnClose(boolean dropSlotOnClose) {
            this.dropSlotOnClose = dropSlotOnClose;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder streamParams(String slotStreamParams) {
            if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
                String[] paramsWithValues;
                this.slotStreamParams = new Properties();
                for (String paramsWithValue : paramsWithValues = slotStreamParams.split(";")) {
                    String[] paramAndValue = paramsWithValue.split("=");
                    if (paramAndValue.length == 2) {
                        this.slotStreamParams.setProperty(paramAndValue[0], paramAndValue[1]);
                        continue;
                    }
                    LOGGER.warn("The following STREAM_PARAMS value is invalid: {}", (Object)paramsWithValue);
                }
            }
            return this;
        }

        @Override
        public ReplicationConnectionBuilder statusUpdateInterval(Duration statusUpdateInterval) {
            this.statusUpdateIntervalVal = statusUpdateInterval;
            return this;
        }

        @Override
        public ReplicationConnection build() {
            assert (this.plugin != null) : "Decoding plugin name is not set";
            return new PostgresReplicationConnection(this.config, this.slotName, this.plugin, this.dropSlotOnClose, this.statusUpdateIntervalVal, this.typeRegistry, this.slotStreamParams);
        }

        @Override
        public ReplicationConnection.Builder withTypeRegistry(TypeRegistry typeRegistry) {
            this.typeRegistry = typeRegistry;
            return this;
        }
    }
}

