/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.sqlserver;

import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.SqlServerChangeRecordEmitter;
import io.debezium.connector.sqlserver.SqlServerChangeTable;
import io.debezium.connector.sqlserver.SqlServerChangeTablePointer;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
import io.debezium.connector.sqlserver.SqlServerOffsetContext;
import io.debezium.connector.sqlserver.SqlServerPartition;
import io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter;
import io.debezium.connector.sqlserver.SqlServerStreamingExecutionContext;
import io.debezium.connector.sqlserver.TxLogPosition;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.ChangeTable;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqlServerStreamingChangeEventSource
implements StreamingChangeEventSource<SqlServerPartition, SqlServerOffsetContext> {
    private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name '(.*)\\.cdc.fn_cdc_get_all_changes_(.*)'\\.");
    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class);
    private static final Duration DEFAULT_INTERVAL_BETWEEN_COMMITS = Duration.ofMinutes(1L);
    private static final int INTERVAL_BETWEEN_COMMITS_BASED_ON_POLL_FACTOR = 3;
    private final SqlServerConnection dataConnection;
    private final SqlServerConnection metadataConnection;
    private final EventDispatcher<SqlServerPartition, TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final SqlServerDatabaseSchema schema;
    private final Duration pollInterval;
    private final SqlServerConnectorConfig connectorConfig;
    private final ElapsedTimeStrategy pauseBetweenCommits;
    private final Map<SqlServerPartition, SqlServerStreamingExecutionContext> streamingExecutionContexts;
    private boolean checkAgent;
    private SqlServerOffsetContext effectiveOffset;

    public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerConnection dataConnection, SqlServerConnection metadataConnection, EventDispatcher<SqlServerPartition, TableId> dispatcher, ErrorHandler errorHandler, Clock clock, SqlServerDatabaseSchema schema) {
        this.connectorConfig = connectorConfig;
        this.dataConnection = dataConnection;
        this.metadataConnection = metadataConnection;
        this.dispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = schema;
        this.pollInterval = connectorConfig.getPollInterval();
        Duration intervalBetweenCommitsBasedOnPoll = this.pollInterval.multipliedBy(3L);
        this.pauseBetweenCommits = ElapsedTimeStrategy.constant((Clock)clock, (long)(DEFAULT_INTERVAL_BETWEEN_COMMITS.compareTo(intervalBetweenCommitsBasedOnPoll) > 0 ? DEFAULT_INTERVAL_BETWEEN_COMMITS.toMillis() : intervalBetweenCommitsBasedOnPoll.toMillis()));
        this.pauseBetweenCommits.hasElapsed();
        this.streamingExecutionContexts = new HashMap<SqlServerPartition, SqlServerStreamingExecutionContext>();
        this.checkAgent = true;
    }

    public void init(SqlServerOffsetContext offsetContext) throws InterruptedException {
        this.effectiveOffset = offsetContext == null ? new SqlServerOffsetContext(this.connectorConfig, TxLogPosition.NULL, false, false) : offsetContext;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext context, SqlServerPartition partition, SqlServerOffsetContext offsetContext) throws InterruptedException {
        throw new UnsupportedOperationException("Currently unsupported by the SQL Server connector");
    }

    public boolean executeIteration(ChangeEventSource.ChangeEventSourceContext context, SqlServerPartition partition, SqlServerOffsetContext offsetContext) throws InterruptedException {
        block17: {
            if (this.connectorConfig.getSnapshotMode().equals((Object)SqlServerConnectorConfig.SnapshotMode.INITIAL_ONLY)) {
                LOGGER.info("Streaming is not enabled in current configuration");
                return false;
            }
            String databaseName = partition.getDatabaseName();
            this.effectiveOffset = offsetContext;
            try {
                SqlServerStreamingExecutionContext streamingExecutionContext = this.streamingExecutionContexts.getOrDefault((Object)partition, new SqlServerStreamingExecutionContext(new PriorityQueue<SqlServerChangeTable>((x, y) -> x.getStopLsn().compareTo(y.getStopLsn())), new AtomicReference<SqlServerChangeTable[]>(), offsetContext.getChangePosition(), new AtomicBoolean(false), offsetContext.isSnapshotCompleted()));
                if (!this.streamingExecutionContexts.containsKey((Object)partition)) {
                    this.streamingExecutionContexts.put(partition, streamingExecutionContext);
                    LOGGER.info("Last position recorded in offsets is {}[{}]", (Object)offsetContext.getChangePosition(), (Object)offsetContext.getEventSerialNo());
                }
                Queue<SqlServerChangeTable> schemaChangeCheckpoints = streamingExecutionContext.getSchemaChangeCheckpoints();
                AtomicReference<SqlServerChangeTable[]> tablesSlot = streamingExecutionContext.getTablesSlot();
                TxLogPosition lastProcessedPositionOnStart = offsetContext.getChangePosition();
                long lastProcessedEventSerialNoOnStart = offsetContext.getEventSerialNo();
                AtomicBoolean changesStoppedBeingMonotonic = streamingExecutionContext.getChangesStoppedBeingMonotonic();
                int maxTransactionsPerIteration = this.connectorConfig.getMaxTransactionsPerIteration();
                TxLogPosition lastProcessedPosition = streamingExecutionContext.getLastProcessedPosition();
                if (!context.isRunning()) break block17;
                this.commitTransaction();
                Lsn toLsn = this.getToLsn(this.dataConnection, databaseName, lastProcessedPosition, maxTransactionsPerIteration);
                if (!toLsn.isAvailable()) {
                    if (this.checkAgent) {
                        try {
                            if (!this.dataConnection.isAgentRunning(databaseName)) {
                                LOGGER.error("No maximum LSN recorded in the database; SQL Server Agent is not running");
                            }
                        }
                        catch (SQLException e) {
                            LOGGER.warn("No maximum LSN recorded in the database; this may happen if there are no changes recorded in the change table yet or low activity database where the cdc clean up job periodically clears entries from the cdc tables. Otherwise, this may be an indication that the SQL Server Agent is not running. You should follow the documentation on how to configure SQL Server Agent running status query.");
                            LOGGER.warn("Cannot query the status of the SQL Server Agent", (Throwable)e);
                        }
                        this.checkAgent = false;
                    }
                    return false;
                }
                if (!this.checkAgent) {
                    this.checkAgent = true;
                }
                if (toLsn.compareTo(lastProcessedPosition.getCommitLsn()) <= 0 && streamingExecutionContext.getShouldIncreaseFromLsn()) {
                    LOGGER.debug("No change in the database");
                    return false;
                }
                Lsn fromLsn = lastProcessedPosition.getCommitLsn().isAvailable() && streamingExecutionContext.getShouldIncreaseFromLsn() ? this.dataConnection.incrementLsn(databaseName, lastProcessedPosition.getCommitLsn()) : lastProcessedPosition.getCommitLsn();
                streamingExecutionContext.setShouldIncreaseFromLsn(true);
                while (!schemaChangeCheckpoints.isEmpty()) {
                    this.migrateTable(partition, schemaChangeCheckpoints, offsetContext);
                }
                if (!this.dataConnection.getNewChangeTables(databaseName, fromLsn, toLsn).isEmpty()) {
                    SqlServerChangeTable[] tables = this.getChangeTablesToQuery(partition, offsetContext, toLsn);
                    tablesSlot.set(tables);
                    for (SqlServerChangeTable table : tables) {
                        if (!table.getStartLsn().isBetween(fromLsn, toLsn)) continue;
                        LOGGER.info("Schema will be changed for {}", (Object)table);
                        schemaChangeCheckpoints.add(table);
                    }
                }
                if (tablesSlot.get() == null) {
                    tablesSlot.set(this.getChangeTablesToQuery(partition, offsetContext, toLsn));
                }
                try {
                    this.dataConnection.getChangesForTables(databaseName, tablesSlot.get(), fromLsn, toLsn, resultSets -> {
                        long eventSerialNoInInitialTx = 1L;
                        int tableCount = resultSets.length;
                        SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[tableCount];
                        SqlServerChangeTable[] tables = (SqlServerChangeTable[])tablesSlot.get();
                        for (int i = 0; i < tableCount; ++i) {
                            changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i]);
                            changeTables[i].next();
                        }
                        while (true) {
                            SqlServerChangeTablePointer tableWithSmallestLsn = null;
                            for (SqlServerChangeTablePointer changeTable : changeTables) {
                                if (changeTable.isCompleted() || tableWithSmallestLsn != null && changeTable.compareTo(tableWithSmallestLsn) >= 0) continue;
                                tableWithSmallestLsn = changeTable;
                            }
                            if (tableWithSmallestLsn == null) break;
                            if (!((TxLogPosition)tableWithSmallestLsn.getChangePosition()).isAvailable() || !((TxLogPosition)tableWithSmallestLsn.getChangePosition()).getInTxLsn().isAvailable()) {
                                LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", (Object)tableWithSmallestLsn);
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            if (tableWithSmallestLsn.isNewTransaction() && changesStoppedBeingMonotonic.get()) {
                                LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes");
                                changesStoppedBeingMonotonic.set(false);
                            }
                            if (tableWithSmallestLsn.isCurrentPositionSmallerThanPreviousPosition()) {
                                LOGGER.info("Disabling skipping changes due to not monotonic order of changes");
                                changesStoppedBeingMonotonic.set(true);
                            }
                            if (!changesStoppedBeingMonotonic.get() && ((TxLogPosition)tableWithSmallestLsn.getChangePosition()).compareTo(lastProcessedPositionOnStart) < 0) {
                                LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", (Object)tableWithSmallestLsn, (Object)lastProcessedPositionOnStart);
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            if (!changesStoppedBeingMonotonic.get() && ((TxLogPosition)tableWithSmallestLsn.getChangePosition()).compareTo(lastProcessedPositionOnStart) == 0 && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) {
                                LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", new Object[]{tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart});
                                ++eventSerialNoInInitialTx;
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            if (((SqlServerChangeTable)tableWithSmallestLsn.getChangeTable()).getStopLsn().isAvailable() && ((SqlServerChangeTable)tableWithSmallestLsn.getChangeTable()).getStopLsn().compareTo(((TxLogPosition)tableWithSmallestLsn.getChangePosition()).getCommitLsn()) <= 0) {
                                LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", (Object)tableWithSmallestLsn, (Object)tableWithSmallestLsn.getChangePosition());
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            LOGGER.trace("Processing change {}", (Object)tableWithSmallestLsn);
                            LOGGER.trace("Schema change checkpoints {}", (Object)schemaChangeCheckpoints);
                            if (!schemaChangeCheckpoints.isEmpty() && ((TxLogPosition)tableWithSmallestLsn.getChangePosition()).getCommitLsn().compareTo(((SqlServerChangeTable)((Object)((Object)schemaChangeCheckpoints.peek()))).getStartLsn()) >= 0) {
                                this.migrateTable(partition, schemaChangeCheckpoints, offsetContext);
                            }
                            TableId tableId = ((SqlServerChangeTable)tableWithSmallestLsn.getChangeTable()).getSourceTableId();
                            int operation = tableWithSmallestLsn.getOperation();
                            Object[] data = tableWithSmallestLsn.getData();
                            int eventCount = 1;
                            if (operation == 3) {
                                if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != 4) {
                                    throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                                }
                                eventCount = 2;
                            }
                            Object[] dataNext = operation == 3 ? tableWithSmallestLsn.getData() : null;
                            ResultSet resultSet = tableWithSmallestLsn.getResultSet();
                            offsetContext.setChangePosition((TxLogPosition)tableWithSmallestLsn.getChangePosition(), eventCount);
                            offsetContext.event((DataCollectionId)((SqlServerChangeTable)tableWithSmallestLsn.getChangeTable()).getSourceTableId(), resultSet.getTimestamp(resultSet.getMetaData().getColumnCount()).toInstant());
                            this.dispatcher.dispatchDataChangeEvent((Partition)partition, (DataCollectionId)tableId, (ChangeRecordEmitter)new SqlServerChangeRecordEmitter(partition, (OffsetContext)offsetContext, operation, data, dataNext, this.clock, this.connectorConfig));
                            tableWithSmallestLsn.next();
                        }
                    });
                    streamingExecutionContext.setLastProcessedPosition(TxLogPosition.valueOf(toLsn));
                    this.dataConnection.rollback();
                }
                catch (SQLException e) {
                    tablesSlot.set(this.processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
                }
            }
            catch (Exception e) {
                this.errorHandler.setProducerThrowable((Throwable)e);
            }
        }
        return true;
    }

    public SqlServerOffsetContext getOffsetContext() {
        return this.effectiveOffset;
    }

    private void commitTransaction() throws SQLException {
        if (this.connectorConfig.isReadOnlyDatabaseConnection() || this.pauseBetweenCommits.hasElapsed()) {
            this.dataConnection.commit();
            this.metadataConnection.commit();
        }
    }

    private void migrateTable(SqlServerPartition partition, Queue<SqlServerChangeTable> schemaChangeCheckpoints, SqlServerOffsetContext offsetContext) throws InterruptedException, SQLException {
        SqlServerChangeTable newTable = schemaChangeCheckpoints.poll();
        LOGGER.info("Migrating schema to {}", (Object)newTable);
        Table oldTableSchema = this.schema.tableFor(newTable.getSourceTableId());
        Table tableSchema = this.metadataConnection.getTableSchemaFromTable(partition.getDatabaseName(), newTable);
        if (oldTableSchema.equals(tableSchema)) {
            LOGGER.info("Migration skipped, no table schema changes detected.");
            return;
        }
        this.dispatcher.dispatchSchemaChangeEvent((Partition)partition, (OffsetContext)offsetContext, (DataCollectionId)newTable.getSourceTableId(), (SchemaChangeEventEmitter)new SqlServerSchemaChangeEventEmitter(partition, offsetContext, newTable, tableSchema, this.schema, SchemaChangeEvent.SchemaChangeEventType.ALTER));
        newTable.setSourceTable(tableSchema);
    }

    private SqlServerChangeTable[] processErrorFromChangeTableQuery(String databaseName, SQLException exception, SqlServerChangeTable[] currentChangeTables) throws Exception {
        Matcher m = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(exception.getMessage());
        if (m.matches() && m.group(1).equals(databaseName)) {
            String captureName = m.group(2);
            LOGGER.info("Table is no longer captured with capture instance {}", (Object)captureName);
            return (SqlServerChangeTable[])Arrays.stream(currentChangeTables).filter(x -> !x.getCaptureInstance().equals(captureName)).toArray(SqlServerChangeTable[]::new);
        }
        throw exception;
    }

    private SqlServerChangeTable[] getChangeTablesToQuery(SqlServerPartition partition, SqlServerOffsetContext offsetContext, Lsn toLsn) throws SQLException, InterruptedException {
        Map<TableId, List<SqlServerChangeTable>> includeListChangeTables;
        String databaseName = partition.getDatabaseName();
        List<SqlServerChangeTable> changeTables = this.dataConnection.getChangeTables(databaseName, toLsn);
        if (changeTables.isEmpty()) {
            LOGGER.warn("No table has enabled CDC or security constraints prevents getting the list of change tables");
        }
        if ((includeListChangeTables = changeTables.stream().filter(changeTable -> {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(changeTable.getSourceTableId())) {
                return true;
            }
            LOGGER.info("CDC is enabled for table {} but the table is not on connector's table include list", (Object)changeTable);
            return false;
        }).collect(Collectors.groupingBy(ChangeTable::getSourceTableId))).isEmpty()) {
            LOGGER.warn("No table on connector's include list has enabled CDC, tables on include list do not contain any table with CDC enabled or no table match the include/exclude filter(s)");
        }
        ArrayList<SqlServerChangeTable> tables = new ArrayList<SqlServerChangeTable>();
        for (List<SqlServerChangeTable> captures : includeListChangeTables.values()) {
            SqlServerChangeTable currentTable = captures.get(0);
            if (captures.size() > 1) {
                SqlServerChangeTable futureTable;
                if (captures.get(0).getStartLsn().compareTo(captures.get(1).getStartLsn()) < 0) {
                    futureTable = captures.get(1);
                } else {
                    currentTable = captures.get(1);
                    futureTable = captures.get(0);
                }
                currentTable.setStopLsn(futureTable.getStartLsn());
                futureTable.setSourceTable(this.dataConnection.getTableSchemaFromTable(databaseName, futureTable));
                tables.add(futureTable);
                LOGGER.info("Multiple capture instances present for the same table: {} and {}", (Object)currentTable, (Object)futureTable);
            }
            if (this.schema.tableFor(currentTable.getSourceTableId()) == null) {
                LOGGER.info("Table {} is new to be monitored by capture instance {}", (Object)currentTable.getSourceTableId(), (Object)currentTable.getCaptureInstance());
                offsetContext.event((DataCollectionId)currentTable.getSourceTableId(), Instant.now());
                this.dispatcher.dispatchSchemaChangeEvent((Partition)partition, (OffsetContext)offsetContext, (DataCollectionId)currentTable.getSourceTableId(), (SchemaChangeEventEmitter)new SqlServerSchemaChangeEventEmitter(partition, offsetContext, currentTable, this.dataConnection.getTableSchemaFromTable(databaseName, currentTable), this.schema, SchemaChangeEvent.SchemaChangeEventType.CREATE));
            }
            currentTable.setSourceTable(this.schema.tableFor(currentTable.getSourceTableId()));
            tables.add(currentTable);
        }
        return tables.toArray(new SqlServerChangeTable[tables.size()]);
    }

    private Lsn getToLsn(SqlServerConnection connection, String databaseName, TxLogPosition lastProcessedPosition, int maxTransactionsPerIteration) throws SQLException {
        if (maxTransactionsPerIteration == 0) {
            return connection.getMaxTransactionLsn(databaseName);
        }
        Lsn fromLsn = lastProcessedPosition.getCommitLsn();
        if (!fromLsn.isAvailable()) {
            return connection.getNthTransactionLsnFromBeginning(databaseName, maxTransactionsPerIteration);
        }
        return connection.getNthTransactionLsnFromLast(databaseName, fromLsn, maxTransactionsPerIteration);
    }
}

