/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.binlog;

import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogOffsetContext;
import io.debezium.connector.binlog.BinlogPartition;
import io.debezium.connector.binlog.BinlogReadOnlyIncrementalSnapshotContext;
import io.debezium.connector.binlog.BinlogSourceInfo;
import io.debezium.connector.binlog.gtid.GtidSet;
import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalPayload;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BinlogReadOnlyIncrementalSnapshotChangeEventSource<P extends BinlogPartition, O extends BinlogOffsetContext>
extends AbstractIncrementalSnapshotChangeEventSource<P, TableId> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReadOnlyIncrementalSnapshotChangeEventSource.class);
    private final GtidSetFactory gtidSetFactory;

    public BinlogReadOnlyIncrementalSnapshotChangeEventSource(BinlogConnectorConfig connectorConfig, JdbcConnection jdbcConnection, EventDispatcher<P, TableId> dispatcher, DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener<P> progressListener, DataChangeEventListener<P> dataChangeEventListener, NotificationService<P, O> notificationService) {
        super((RelationalDatabaseConnectorConfig)connectorConfig, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener, notificationService);
        this.gtidSetFactory = connectorConfig.getGtidSetFactory();
    }

    public void processMessage(P partition, DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) throws InterruptedException {
        if (this.getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", new Object[]{dataCollectionId, key, this.window});
        boolean windowClosed = this.getContext().updateWindowState(offsetContext);
        if (windowClosed) {
            this.sendWindowEvents((Partition)partition, offsetContext);
            this.readChunk((Partition)partition, offsetContext);
        } else if (!this.window.isEmpty() && this.getContext().deduplicationNeeded()) {
            this.deduplicateWindow(dataCollectionId, key);
        }
    }

    public void processHeartbeat(P partition, OffsetContext offsetContext) throws InterruptedException {
        if (this.getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        this.readUntilGtidChange(partition, offsetContext);
    }

    public void processFilteredEvent(P partition, OffsetContext offsetContext) throws InterruptedException {
        if (this.getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        boolean windowClosed = this.getContext().updateWindowState(offsetContext);
        if (windowClosed) {
            this.sendWindowEvents((Partition)partition, offsetContext);
            this.readChunk((Partition)partition, offsetContext);
        }
    }

    public void processTransactionStartedEvent(P partition, OffsetContext offsetContext) throws InterruptedException {
        if (this.getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        boolean windowClosed = this.getContext().updateWindowState(offsetContext);
        if (windowClosed) {
            this.sendWindowEvents((Partition)partition, offsetContext);
            this.readChunk((Partition)partition, offsetContext);
        }
    }

    public void processTransactionCommittedEvent(P partition, OffsetContext offsetContext) throws InterruptedException {
        if (this.getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        this.readUntilGtidChange(partition, offsetContext);
    }

    protected void emitWindowOpen() {
        this.updateLowWatermark();
    }

    protected void emitWindowClose(P partition, OffsetContext offsetContext) throws Exception {
        this.updateHighWatermark();
        if (this.getContext().hasServerIdentifierChanged()) {
            this.rereadChunk((Partition)partition, offsetContext);
        }
    }

    protected void sendEvent(P partition, EventDispatcher<P, TableId> dispatcher, OffsetContext offsetContext, Object[] row) throws InterruptedException {
        Object sourceInfo = ((BinlogOffsetContext)offsetContext).getSource();
        String query = ((BinlogSourceInfo)((Object)sourceInfo)).getQuery();
        ((BinlogSourceInfo)((Object)sourceInfo)).setQuery(null);
        super.sendEvent(partition, dispatcher, offsetContext, row);
        ((BinlogSourceInfo)((Object)sourceInfo)).setQuery(query);
    }

    public void addDataCollectionNamesToSnapshot(SignalPayload<P> signalPayload, SnapshotConfiguration snapshotConfiguration) throws InterruptedException {
        Map additionalData = signalPayload.additionalData;
        super.addDataCollectionNamesToSnapshot(signalPayload, snapshotConfiguration);
        this.getContext().setSignalOffset((Long)additionalData.get("channelOffset"));
    }

    public void stopSnapshot(P partition, OffsetContext offsetContext, Map<String, Object> additionalData, List<String> dataCollectionIds) {
        super.stopSnapshot(partition, offsetContext, additionalData, dataCollectionIds);
        this.getContext().setSignalOffset((Long)additionalData.get("channelOffset"));
    }

    private BinlogReadOnlyIncrementalSnapshotContext<TableId> getContext() {
        return (BinlogReadOnlyIncrementalSnapshotContext)this.context;
    }

    private void readUntilGtidChange(P partition, OffsetContext offsetContext) throws InterruptedException {
        String currentGtid = this.getContext().getCurrentGtid(offsetContext);
        while (this.getContext().snapshotRunning() && this.getContext().reachedHighWatermark(currentGtid)) {
            this.getContext().closeWindow();
            this.sendWindowEvents((Partition)partition, offsetContext);
            this.readChunk((Partition)partition, offsetContext);
            if (currentGtid != null || !this.getContext().watermarksChanged()) continue;
            return;
        }
    }

    private void updateLowWatermark() {
        this.getExecutedGtidSet(this.getContext()::setLowWatermark);
    }

    private void updateHighWatermark() {
        this.getExecutedGtidSet(this.getContext()::setHighWatermark);
    }

    protected abstract void getExecutedGtidSet(Consumer<GtidSet> var1);
}

