/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.PgSnapshot;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresReadOnlyIncrementalSnapshotContext;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresReadOnlyIncrementalSnapshotChangeEventSource<P extends PostgresPartition>
extends AbstractIncrementalSnapshotChangeEventSource<P, TableId> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresReadOnlyIncrementalSnapshotChangeEventSource.class);
    private static final String FORCE_NEW_TRANSACTION = "SELECT * FROM pg_current_xact_id();";
    private static final String CURRENT_SNAPSHOT = "SELECT * FROM pg_current_snapshot();";
    private final PostgresConnection jdbcConnection;
    private final PostgresSchema schema;

    public PostgresReadOnlyIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config, JdbcConnection jdbcConnection, EventDispatcher<P, TableId> dispatcher, DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener<P> progressListener, DataChangeEventListener<P> dataChangeEventListener, NotificationService<P, ? extends OffsetContext> notificationService) {
        super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener, notificationService);
        this.jdbcConnection = (PostgresConnection)jdbcConnection;
        this.schema = (PostgresSchema)databaseSchema;
    }

    protected void preIncrementalSnapshotStart() {
        super.preIncrementalSnapshotStart();
        this.forceNewTransactionId();
    }

    private PostgresReadOnlyIncrementalSnapshotContext<TableId> getContext() {
        return (PostgresReadOnlyIncrementalSnapshotContext)this.context;
    }

    protected void emitWindowOpen() {
        this.getCurrentSnapshot(this.getContext()::setLowWatermark);
    }

    protected void emitWindowClose(P partition, OffsetContext offsetContext) {
        this.getCurrentSnapshot(this.getContext()::setHighWatermark);
    }

    public void processMessage(P partition, DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) throws InterruptedException {
        if (this.getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.debug("Checking window for table '{}', key '{}', window contains '{}'", new Object[]{dataCollectionId, key, this.window});
        this.getContext().updateWindowState(offsetContext);
        boolean windowClosed = this.getContext().isWindowClosed();
        if (windowClosed) {
            this.sendWindowEvents((Partition)partition, offsetContext);
            this.readChunk((Partition)partition, offsetContext);
        } else if (!this.window.isEmpty() && this.getContext().deduplicationNeeded()) {
            LOGGER.trace("Deduplicating");
            this.deduplicateWindow(dataCollectionId, key);
        }
    }

    public void processTransactionCommittedEvent(P partition, OffsetContext offsetContext) throws InterruptedException {
        if (this.getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Processing transaction event");
        this.readUntilNewTransactionChange(partition, offsetContext);
        LOGGER.trace("Finished processing transaction event");
    }

    public void processHeartbeat(P partition, OffsetContext offsetContext) throws InterruptedException {
        if (this.getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Processing heartbeat event");
        this.readUntilNewTransactionChange(partition, offsetContext);
        LOGGER.trace("Finished processing heartbeat event");
    }

    protected Table refreshTableSchema(Table table) throws SQLException {
        LOGGER.debug("Refreshing table '{}' schema for incremental snapshot.", (Object)table.id());
        this.schema.refresh(this.jdbcConnection, table.id(), true);
        return this.schema.tableFor(table.id());
    }

    private void readUntilNewTransactionChange(P partition, OffsetContext offsetContext) throws InterruptedException {
        Long eventTxId = offsetContext.getSourceInfo().getInt64("txId");
        LOGGER.debug("Event txId {}, snapshot is running {}, reachedHighWatermark {}", new Object[]{eventTxId, this.getContext().snapshotRunning(), this.getContext().isTransactionVisible(eventTxId)});
        LOGGER.trace("Incremental snapshot context {}", this.getContext());
        if (this.getContext().snapshotRunning() && this.maxInProgressTransactionCommitted(eventTxId)) {
            this.getContext().closeWindow();
            this.sendWindowEvents((Partition)partition, offsetContext);
            this.readChunk((Partition)partition, offsetContext);
            return;
        }
        while (this.getContext().snapshotRunning() && this.getContext().isTransactionVisible(eventTxId)) {
            LOGGER.debug("Finishing snapshot, snapshot is running {}, reachedHighWatermark {}", (Object)this.getContext().snapshotRunning(), (Object)this.getContext().isTransactionVisible(eventTxId));
            this.getContext().closeWindow();
            this.sendWindowEvents((Partition)partition, offsetContext);
            this.readChunk((Partition)partition, offsetContext);
            if (this.getContext().watermarksChanged()) {
                LOGGER.trace("Watermarks changed");
                return;
            }
            LOGGER.trace("Re read chunk finished, snapshot is running {}, reachedHighWatermark {}", (Object)this.getContext().snapshotRunning(), (Object)this.getContext().isTransactionVisible(eventTxId));
        }
    }

    private void getCurrentSnapshot(Consumer<PgSnapshot> watermark) {
        try {
            PgSnapshot pgSnapshot = (PgSnapshot)this.jdbcConnection.queryAndMap(CURRENT_SNAPSHOT, this.jdbcConnection.singleResultMapper(rs -> {
                String currentSnapshot = rs.getString(1);
                LOGGER.trace("Current snapshot {}", (Object)currentSnapshot);
                return PgSnapshot.valueOf(currentSnapshot);
            }, "Unable to get current snapshot"));
            watermark.accept(pgSnapshot);
        }
        catch (SQLException e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    private boolean maxInProgressTransactionCommitted(Long eventTxId) {
        if (this.getContext().getHighWatermark() == null) {
            return false;
        }
        return this.getContext().getHighWatermark().getXMax().equals(eventTxId);
    }

    private void forceNewTransactionId() {
        try {
            this.jdbcConnection.query(FORCE_NEW_TRANSACTION, rs -> {
                if (rs.next()) {
                    LOGGER.trace("Created new transaction ID {}", (Object)rs.getString(1));
                }
            });
        }
        catch (SQLException e) {
            throw new DebeziumException((Throwable)e);
        }
    }
}

