/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.sqlserver;

import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.MaxLsnResult;
import io.debezium.connector.sqlserver.SqlServerChangeRecordEmitter;
import io.debezium.connector.sqlserver.SqlServerChangeTable;
import io.debezium.connector.sqlserver.SqlServerChangeTablePointer;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
import io.debezium.connector.sqlserver.SqlServerOffsetContext;
import io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter;
import io.debezium.connector.sqlserver.TxLogPosition;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqlServerStreamingChangeEventSource
implements StreamingChangeEventSource {
    private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name 'cdc.fn_cdc_get_all_changes_(.*)'\\.");
    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class);
    private final SqlServerConnection dataConnection;
    private final SqlServerConnection metadataConnection;
    private final EventDispatcher<TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final SqlServerDatabaseSchema schema;
    private final SqlServerOffsetContext offsetContext;
    private final Duration pollInterval;
    private final SqlServerConnectorConfig connectorConfig;

    public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext offsetContext, SqlServerConnection dataConnection, SqlServerConnection metadataConnection, EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, SqlServerDatabaseSchema schema) {
        this.connectorConfig = connectorConfig;
        this.dataConnection = dataConnection;
        this.metadataConnection = metadataConnection;
        this.dispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = schema;
        this.offsetContext = offsetContext;
        this.pollInterval = connectorConfig.getPollInterval();
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        if (this.connectorConfig.getSnapshotMode().equals((Object)SqlServerConnectorConfig.SnapshotMode.INITIAL_ONLY)) {
            LOGGER.info("Streaming is not enabled in current configuration");
            return;
        }
        Metronome metronome = Metronome.sleeper((Duration)this.pollInterval, (Clock)this.clock);
        PriorityQueue<SqlServerChangeTable> schemaChangeCheckpoints = new PriorityQueue<SqlServerChangeTable>((x, y) -> x.getStopLsn().compareTo(y.getStopLsn()));
        try {
            AtomicReference<SqlServerChangeTable[]> tablesSlot = new AtomicReference<SqlServerChangeTable[]>(this.getCdcTablesToQuery());
            TxLogPosition lastProcessedPositionOnStart = this.offsetContext.getChangePosition();
            long lastProcessedEventSerialNoOnStart = this.offsetContext.getEventSerialNo();
            LOGGER.info("Last position recorded in offsets is {}[{}]", (Object)lastProcessedPositionOnStart, (Object)lastProcessedEventSerialNoOnStart);
            AtomicBoolean changesStoppedBeingMonotonic = new AtomicBoolean(false);
            TxLogPosition lastProcessedPosition = lastProcessedPositionOnStart;
            boolean shouldIncreaseFromLsn = this.offsetContext.isSnapshotCompleted();
            while (context.isRunning()) {
                MaxLsnResult maxLsnResult;
                if (this.connectorConfig.isReadOnlyDatabaseConnection()) {
                    this.dataConnection.commit();
                }
                if (!(maxLsnResult = this.dataConnection.getMaxLsnResult(this.connectorConfig.isSkipLowActivityLsnsEnabled())).getMaxLsn().isAvailable() || !maxLsnResult.getMaxTransactionalLsn().isAvailable()) {
                    LOGGER.warn("No maximum LSN recorded in the database; please ensure that the SQL Server Agent is running");
                    metronome.pause();
                    continue;
                }
                if (maxLsnResult.getMaxTransactionalLsn().compareTo(lastProcessedPosition.getCommitLsn()) <= 0 && shouldIncreaseFromLsn) {
                    LOGGER.debug("No change in the database");
                    metronome.pause();
                    continue;
                }
                Lsn fromLsn = lastProcessedPosition.getCommitLsn().isAvailable() && shouldIncreaseFromLsn ? this.dataConnection.incrementLsn(lastProcessedPosition.getCommitLsn()) : lastProcessedPosition.getCommitLsn();
                shouldIncreaseFromLsn = true;
                while (!schemaChangeCheckpoints.isEmpty()) {
                    this.migrateTable(schemaChangeCheckpoints);
                }
                if (!this.dataConnection.listOfNewChangeTables(fromLsn, maxLsnResult.getMaxLsn()).isEmpty()) {
                    SqlServerChangeTable[] tables = this.getCdcTablesToQuery();
                    tablesSlot.set(tables);
                    for (SqlServerChangeTable table : tables) {
                        if (!table.getStartLsn().isBetween(fromLsn, maxLsnResult.getMaxLsn())) continue;
                        LOGGER.info("Schema will be changed for {}", (Object)table);
                        schemaChangeCheckpoints.add(table);
                    }
                }
                try {
                    this.dataConnection.getChangesForTables(tablesSlot.get(), fromLsn, maxLsnResult.getMaxLsn(), resultSets -> {
                        long eventSerialNoInInitialTx = 1L;
                        int tableCount = resultSets.length;
                        SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[tableCount];
                        SqlServerChangeTable[] tables = (SqlServerChangeTable[])tablesSlot.get();
                        for (int i = 0; i < tableCount; ++i) {
                            changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i]);
                            changeTables[i].next();
                        }
                        while (true) {
                            SqlServerChangeTablePointer tableWithSmallestLsn = null;
                            for (SqlServerChangeTablePointer changeTable : changeTables) {
                                if (changeTable.isCompleted() || tableWithSmallestLsn != null && changeTable.compareTo(tableWithSmallestLsn) >= 0) continue;
                                tableWithSmallestLsn = changeTable;
                            }
                            if (tableWithSmallestLsn == null) break;
                            if (!((TxLogPosition)tableWithSmallestLsn.getChangePosition()).isAvailable() || !((TxLogPosition)tableWithSmallestLsn.getChangePosition()).getInTxLsn().isAvailable()) {
                                LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", (Object)tableWithSmallestLsn);
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            if (tableWithSmallestLsn.isNewTransaction() && changesStoppedBeingMonotonic.get()) {
                                LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes");
                                changesStoppedBeingMonotonic.set(false);
                            }
                            if (tableWithSmallestLsn.isCurrentPositionSmallerThanPreviousPosition()) {
                                LOGGER.info("Disabling skipping changes due to not monotonic order of changes");
                                changesStoppedBeingMonotonic.set(true);
                            }
                            if (!changesStoppedBeingMonotonic.get() && ((TxLogPosition)tableWithSmallestLsn.getChangePosition()).compareTo(lastProcessedPositionOnStart) < 0) {
                                LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", (Object)tableWithSmallestLsn, (Object)lastProcessedPositionOnStart);
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            if (!changesStoppedBeingMonotonic.get() && ((TxLogPosition)tableWithSmallestLsn.getChangePosition()).compareTo(lastProcessedPositionOnStart) == 0 && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) {
                                LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", new Object[]{tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart});
                                ++eventSerialNoInInitialTx;
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            if (((SqlServerChangeTable)tableWithSmallestLsn.getChangeTable()).getStopLsn().isAvailable() && ((SqlServerChangeTable)tableWithSmallestLsn.getChangeTable()).getStopLsn().compareTo(((TxLogPosition)tableWithSmallestLsn.getChangePosition()).getCommitLsn()) <= 0) {
                                LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", (Object)tableWithSmallestLsn, (Object)tableWithSmallestLsn.getChangePosition());
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            LOGGER.trace("Processing change {}", (Object)tableWithSmallestLsn);
                            LOGGER.trace("Schema change checkpoints {}", (Object)schemaChangeCheckpoints);
                            if (!schemaChangeCheckpoints.isEmpty() && ((TxLogPosition)tableWithSmallestLsn.getChangePosition()).getCommitLsn().compareTo(((SqlServerChangeTable)((Object)((Object)schemaChangeCheckpoints.peek()))).getStartLsn()) >= 0) {
                                this.migrateTable(schemaChangeCheckpoints);
                            }
                            TableId tableId = ((SqlServerChangeTable)tableWithSmallestLsn.getChangeTable()).getSourceTableId();
                            int operation = tableWithSmallestLsn.getOperation();
                            Object[] data = tableWithSmallestLsn.getData();
                            int eventCount = 1;
                            if (operation == 3) {
                                if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != 4) {
                                    throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                                }
                                eventCount = 2;
                            }
                            Object[] dataNext = operation == 3 ? tableWithSmallestLsn.getData() : null;
                            this.offsetContext.setChangePosition((TxLogPosition)tableWithSmallestLsn.getChangePosition(), eventCount);
                            this.offsetContext.event((DataCollectionId)((SqlServerChangeTable)tableWithSmallestLsn.getChangeTable()).getSourceTableId(), this.metadataConnection.timestampOfLsn(((TxLogPosition)tableWithSmallestLsn.getChangePosition()).getCommitLsn()));
                            this.dispatcher.dispatchDataChangeEvent((DataCollectionId)tableId, (ChangeRecordEmitter)new SqlServerChangeRecordEmitter(this.offsetContext, operation, data, dataNext, this.clock));
                            tableWithSmallestLsn.next();
                        }
                    });
                    lastProcessedPosition = TxLogPosition.valueOf(maxLsnResult.getMaxLsn());
                    this.dataConnection.rollback();
                }
                catch (SQLException e) {
                    tablesSlot.set(this.processErrorFromChangeTableQuery(e, tablesSlot.get()));
                }
            }
        }
        catch (Exception e) {
            this.errorHandler.setProducerThrowable((Throwable)e);
        }
    }

    private void migrateTable(Queue<SqlServerChangeTable> schemaChangeCheckpoints) throws InterruptedException, SQLException {
        SqlServerChangeTable newTable = schemaChangeCheckpoints.poll();
        LOGGER.info("Migrating schema to {}", (Object)newTable);
        Table tableSchema = this.metadataConnection.getTableSchemaFromTable(newTable);
        this.dispatcher.dispatchSchemaChangeEvent((DataCollectionId)newTable.getSourceTableId(), (SchemaChangeEventEmitter)new SqlServerSchemaChangeEventEmitter(this.offsetContext, newTable, tableSchema, SchemaChangeEvent.SchemaChangeEventType.ALTER));
        newTable.setSourceTable(tableSchema);
    }

    private SqlServerChangeTable[] processErrorFromChangeTableQuery(SQLException exception, SqlServerChangeTable[] currentChangeTables) throws Exception {
        Matcher m = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(exception.getMessage());
        if (m.matches()) {
            String captureName = m.group(1);
            LOGGER.info("Table is no longer captured with capture instance {}", (Object)captureName);
            return Arrays.asList(currentChangeTables).stream().filter(x -> !x.getCaptureInstance().equals(captureName)).collect(Collectors.toList()).toArray(new SqlServerChangeTable[0]);
        }
        throw exception;
    }

    private SqlServerChangeTable[] getCdcTablesToQuery() throws SQLException, InterruptedException {
        Map<TableId, List<SqlServerChangeTable>> includeListCdcEnabledTables;
        Set<SqlServerChangeTable> cdcEnabledTables = this.dataConnection.listOfChangeTables();
        if (cdcEnabledTables.isEmpty()) {
            LOGGER.warn("No table has enabled CDC or security constraints prevents getting the list of change tables");
        }
        if ((includeListCdcEnabledTables = cdcEnabledTables.stream().filter(changeTable -> {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(changeTable.getSourceTableId())) {
                return true;
            }
            LOGGER.info("CDC is enabled for table {} but the table is not whitelisted by connector", (Object)changeTable);
            return false;
        }).collect(Collectors.groupingBy(x -> x.getSourceTableId()))).isEmpty()) {
            LOGGER.warn("No whitelisted table has enabled CDC, whitelisted table list does not contain any table with CDC enabled or no table match the white/blacklist filter(s)");
        }
        ArrayList<SqlServerChangeTable> tables = new ArrayList<SqlServerChangeTable>();
        for (List<SqlServerChangeTable> captures : includeListCdcEnabledTables.values()) {
            SqlServerChangeTable currentTable = captures.get(0);
            if (captures.size() > 1) {
                SqlServerChangeTable futureTable;
                if (captures.get(0).getStartLsn().compareTo(captures.get(1).getStartLsn()) < 0) {
                    futureTable = captures.get(1);
                } else {
                    currentTable = captures.get(1);
                    futureTable = captures.get(0);
                }
                currentTable.setStopLsn(futureTable.getStartLsn());
                futureTable.setSourceTable(this.dataConnection.getTableSchemaFromTable(futureTable));
                tables.add(futureTable);
                LOGGER.info("Multiple capture instances present for the same table: {} and {}", (Object)currentTable, (Object)futureTable);
            }
            if (this.schema.tableFor(currentTable.getSourceTableId()) == null) {
                LOGGER.info("Table {} is new to be monitored by capture instance {}", (Object)currentTable.getSourceTableId(), (Object)currentTable.getCaptureInstance());
                this.offsetContext.event((DataCollectionId)currentTable.getSourceTableId(), Instant.now());
                this.dispatcher.dispatchSchemaChangeEvent((DataCollectionId)currentTable.getSourceTableId(), (SchemaChangeEventEmitter)new SqlServerSchemaChangeEventEmitter(this.offsetContext, currentTable, this.dataConnection.getTableSchemaFromTable(currentTable), SchemaChangeEvent.SchemaChangeEventType.CREATE));
            }
            currentTable.setSourceTable(this.schema.tableFor(currentTable.getSourceTableId()));
            tables.add(currentTable);
        }
        return tables.toArray(new SqlServerChangeTable[tables.size()]);
    }
}

