/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.DeleteWindowCloser;
import io.debezium.pipeline.source.snapshot.incremental.InsertWindowCloser;
import io.debezium.pipeline.source.snapshot.incremental.SignalMetadata;
import io.debezium.pipeline.source.snapshot.incremental.WatermarkWindowCloser;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class SignalBasedIncrementalSnapshotChangeEventSource<P extends Partition, T extends DataCollectionId>
extends AbstractIncrementalSnapshotChangeEventSource<P, T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SignalBasedIncrementalSnapshotChangeEventSource.class);
    private final String signalWindowStatement;
    private final String signalWindowDeleteStatement;
    private SignalMetadata signalMetadata;

    public SignalBasedIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config, JdbcConnection jdbcConnection, EventDispatcher<P, T> dispatcher, DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener<P> progressListener, DataChangeEventListener<P> dataChangeEventListener, NotificationService<P, ? extends OffsetContext> notificationService) {
        super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener, notificationService);
        this.signalWindowStatement = "INSERT INTO " + this.getSignalTableName(config.getSignalingDataCollectionId()) + " VALUES (?, ?, ?)";
        this.signalWindowDeleteStatement = "DELETE FROM " + this.getSignalTableName(config.getSignalingDataCollectionId()) + " WHERE id = ?";
    }

    @Override
    public void processMessage(Partition partition, DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) {
        this.context = offsetContext.getIncrementalSnapshotContext();
        if (this.context == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", new Object[]{dataCollectionId, key, this.window});
        if (!this.window.isEmpty() && this.context.deduplicationNeeded()) {
            this.deduplicateWindow(dataCollectionId, key);
        }
    }

    @Override
    protected void emitWindowOpen() throws SQLException {
        this.signalMetadata = new SignalMetadata(Instant.now(), null);
        this.jdbcConnection.prepareUpdate(this.signalWindowStatement, x -> {
            LOGGER.trace("Emitting open window for chunk = '{}'", (Object)this.context.currentChunkId());
            x.setString(1, this.context.currentChunkId() + "-open");
            x.setString(2, "snapshot-window-open");
            x.setString(3, this.signalMetadata.metadataString());
        });
        this.jdbcConnection.commit();
    }

    @Override
    protected void emitWindowClose(Partition partition, OffsetContext offsetContext) throws Exception {
        String signalTableName = this.getSignalTableName(this.connectorConfig.getSignalingDataCollectionId());
        WatermarkWindowCloser watermarkWindowCloser = this.getWatermarkWindowCloser(this.connectorConfig, this.jdbcConnection, signalTableName);
        watermarkWindowCloser.closeWindow(partition, offsetContext, this.context.currentChunkId());
    }

    private WatermarkWindowCloser getWatermarkWindowCloser(CommonConnectorConfig connectorConfig, JdbcConnection jdbcConnection, String signalTable) {
        if (Objects.requireNonNull(connectorConfig.getIncrementalSnapshotWatermarkingStrategy()) == CommonConnectorConfig.WatermarkStrategy.INSERT_DELETE) {
            return new DeleteWindowCloser(jdbcConnection, signalTable, this);
        }
        return new InsertWindowCloser(jdbcConnection, signalTable, new SignalMetadata(this.signalMetadata.getOpenWindowTimestamp(), Instant.now()));
    }
}

