/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.DeleteWindowCloser;
import io.debezium.pipeline.source.snapshot.incremental.InsertWindowCloser;
import io.debezium.pipeline.source.snapshot.incremental.SignalMetadata;
import io.debezium.pipeline.source.snapshot.incremental.WatermarkWindowCloser;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.AbstractPartition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Loggings;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class SignalBasedIncrementalSnapshotChangeEventSource<P extends Partition, T extends DataCollectionId>
extends AbstractIncrementalSnapshotChangeEventSource<P, T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SignalBasedIncrementalSnapshotChangeEventSource.class);
    private SignalMetadata signalMetadata;

    public SignalBasedIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config, JdbcConnection jdbcConnection, EventDispatcher<P, T> dispatcher, DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener<P> progressListener, DataChangeEventListener<P> dataChangeEventListener, NotificationService<P, ? extends OffsetContext> notificationService) {
        super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener, notificationService);
    }

    protected String getSignalTableNameForPartition(Partition partition) {
        if (this.connectorConfig.getSignalingDataCollectionIds().size() == 1) {
            return this.getSignalTableName(this.connectorConfig.getSignalingDataCollectionIds().get(0));
        }
        String partitionDatabase = this.getDatabaseNameFromPartition(partition);
        if (partitionDatabase == null) {
            return null;
        }
        return this.connectorConfig.getSignalingDataCollectionIds().stream().filter(signalCollection -> partitionDatabase.equals(this.getDatabaseName(signalCollection).get())).map(this::getSignalTableName).findFirst().orElse(null);
    }

    protected String getDatabaseNameFromPartition(Partition partition) {
        if (partition instanceof AbstractPartition) {
            Map<String, String> loggingContext = partition.getLoggingContext();
            return loggingContext.get("dbz.databaseName");
        }
        return null;
    }

    @Override
    public void processMessage(Partition partition, DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) {
        this.context = offsetContext.getIncrementalSnapshotContext();
        if (this.context == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", new Object[]{dataCollectionId, Loggings.maybeRedactSensitiveData(key), this.window});
        if (!this.window.isEmpty() && this.context.deduplicationNeeded()) {
            this.deduplicateWindow(dataCollectionId, key);
        }
    }

    @Override
    protected void emitWindowOpen(P partition, OffsetContext offsetContext) throws SQLException {
        String signalTableName = this.getSignalTableNameForPartition((Partition)partition);
        if (signalTableName == null) {
            LOGGER.warn("Not able to determine signal table, cannot emit window open signal");
            return;
        }
        String signalWindowStatement = "INSERT INTO " + signalTableName + " VALUES (?, ?, ?)";
        this.signalMetadata = new SignalMetadata(Instant.now(), null);
        this.jdbcConnection.prepareUpdate(signalWindowStatement, x -> {
            LOGGER.trace("Emitting open window for chunk = '{}' to signal table '{}'", (Object)this.context.currentChunkId(), (Object)signalTableName);
            x.setString(1, this.context.currentChunkId() + "-open");
            x.setString(2, "snapshot-window-open");
            x.setString(3, this.signalMetadata.metadataString());
        });
        this.jdbcConnection.commit();
    }

    @Override
    protected void emitWindowClose(P partition, OffsetContext offsetContext) throws Exception {
        String signalTableName = this.getSignalTableNameForPartition((Partition)partition);
        if (signalTableName == null) {
            LOGGER.warn("Not able to detect signal table, cannot emit window close signal");
            return;
        }
        LOGGER.trace("Emitting close window for chunk = '{}' to signal table '{}'", (Object)this.context.currentChunkId(), (Object)signalTableName);
        WatermarkWindowCloser watermarkWindowCloser = this.getWatermarkWindowCloser(this.connectorConfig, this.jdbcConnection, signalTableName);
        watermarkWindowCloser.closeWindow((Partition)partition, offsetContext, this.context.currentChunkId());
    }

    private WatermarkWindowCloser getWatermarkWindowCloser(CommonConnectorConfig connectorConfig, JdbcConnection jdbcConnection, String signalTable) {
        if (Objects.requireNonNull(connectorConfig.getIncrementalSnapshotWatermarkingStrategy()) == CommonConnectorConfig.WatermarkStrategy.INSERT_DELETE) {
            return new DeleteWindowCloser(jdbcConnection, signalTable, this);
        }
        return new InsertWindowCloser(jdbcConnection, signalTable, new SignalMetadata(this.signalMetadata.getOpenWindowTimestamp(), Instant.now()));
    }
}

