/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.sqlserver;

import io.debezium.connector.sqlserver.ChangeTable;
import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.SqlServerChangeRecordEmitter;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
import io.debezium.connector.sqlserver.SqlServerOffsetContext;
import io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter;
import io.debezium.connector.sqlserver.TxLogPosition;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqlServerStreamingChangeEventSource
implements StreamingChangeEventSource {
    private static final int COL_COMMIT_LSN = 1;
    private static final int COL_ROW_LSN = 2;
    private static final int COL_OPERATION = 3;
    private static final int COL_DATA = 5;
    private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name 'cdc.fn_cdc_get_all_changes_(.*)'\\.");
    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class);
    private final SqlServerConnection connection;
    private final EventDispatcher<TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final SqlServerDatabaseSchema schema;
    private final SqlServerOffsetContext offsetContext;
    private final Duration pollInterval;
    private final SqlServerConnectorConfig connectorConfig;

    public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext offsetContext, SqlServerConnection connection, EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, SqlServerDatabaseSchema schema) {
        this.connectorConfig = connectorConfig;
        this.connection = connection;
        this.dispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = schema;
        this.offsetContext = offsetContext;
        this.pollInterval = connectorConfig.getPollInterval();
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        Metronome metronome = Metronome.sleeper((Duration)this.pollInterval, (Clock)this.clock);
        PriorityQueue<ChangeTable> schemaChangeCheckpoints = new PriorityQueue<ChangeTable>((x, y) -> x.getStopLsn().compareTo(y.getStopLsn()));
        try {
            AtomicReference<ChangeTable[]> tablesSlot = new AtomicReference<ChangeTable[]>(this.getCdcTablesToQuery());
            TxLogPosition lastProcessedPositionOnStart = this.offsetContext.getChangePosition();
            LOGGER.info("Last position recorded in offsets is {}", (Object)lastProcessedPositionOnStart);
            TxLogPosition lastProcessedPosition = lastProcessedPositionOnStart;
            boolean shouldIncreaseFromLsn = this.offsetContext.isSnapshotCompleted();
            while (context.isRunning()) {
                Lsn currentMaxLsn = this.connection.getMaxLsn();
                if (!currentMaxLsn.isAvailable()) {
                    LOGGER.warn("No maximum LSN recorded in the database; please ensure that the SQL Server Agent is running");
                    metronome.pause();
                    continue;
                }
                if (currentMaxLsn.equals(lastProcessedPosition.getCommitLsn()) && shouldIncreaseFromLsn) {
                    LOGGER.debug("No change in the database");
                    metronome.pause();
                    continue;
                }
                Lsn fromLsn = lastProcessedPosition.getCommitLsn().isAvailable() && shouldIncreaseFromLsn ? this.connection.incrementLsn(lastProcessedPosition.getCommitLsn()) : lastProcessedPosition.getCommitLsn();
                shouldIncreaseFromLsn = true;
                while (!schemaChangeCheckpoints.isEmpty()) {
                    this.migrateTable(schemaChangeCheckpoints);
                }
                if (!this.connection.listOfNewChangeTables(fromLsn, currentMaxLsn).isEmpty()) {
                    ChangeTable[] tables = this.getCdcTablesToQuery();
                    tablesSlot.set(tables);
                    for (ChangeTable table : tables) {
                        if (!table.getStartLsn().isBetween(fromLsn, currentMaxLsn)) continue;
                        LOGGER.info("Schema will be changed for {}", (Object)table);
                        schemaChangeCheckpoints.add(table);
                    }
                }
                try {
                    this.connection.getChangesForTables(tablesSlot.get(), fromLsn, currentMaxLsn, resultSets -> {
                        int tableCount = resultSets.length;
                        ChangeTablePointer[] changeTables = new ChangeTablePointer[tableCount];
                        ChangeTable[] tables = (ChangeTable[])tablesSlot.get();
                        for (int i = 0; i < tableCount; ++i) {
                            changeTables[i] = new ChangeTablePointer(tables[i], resultSets[i]);
                            changeTables[i].next();
                        }
                        while (true) {
                            ChangeTablePointer tableWithSmallestLsn = null;
                            for (ChangeTablePointer changeTable : changeTables) {
                                if (changeTable.isCompleted() || tableWithSmallestLsn != null && changeTable.compareTo(tableWithSmallestLsn) >= 0) continue;
                                tableWithSmallestLsn = changeTable;
                            }
                            if (tableWithSmallestLsn == null) break;
                            if (!tableWithSmallestLsn.getChangePosition().isAvailable() || !tableWithSmallestLsn.getChangePosition().getInTxLsn().isAvailable()) {
                                LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", (Object)tableWithSmallestLsn);
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            if (tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) <= 0) {
                                LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", (Object)tableWithSmallestLsn, (Object)lastProcessedPositionOnStart);
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            if (tableWithSmallestLsn.getChangeTable().getStopLsn().isAvailable() && tableWithSmallestLsn.getChangeTable().getStopLsn().compareTo(tableWithSmallestLsn.getChangePosition().getCommitLsn()) <= 0) {
                                LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", (Object)tableWithSmallestLsn, (Object)tableWithSmallestLsn.getChangePosition());
                                tableWithSmallestLsn.next();
                                continue;
                            }
                            LOGGER.trace("Processing change {}", (Object)tableWithSmallestLsn);
                            if (!schemaChangeCheckpoints.isEmpty() && tableWithSmallestLsn.getChangePosition().getCommitLsn().compareTo(((ChangeTable)schemaChangeCheckpoints.peek()).getStopLsn()) >= 0) {
                                this.migrateTable(schemaChangeCheckpoints);
                            }
                            TableId tableId = tableWithSmallestLsn.getChangeTable().getSourceTableId();
                            int operation = tableWithSmallestLsn.getOperation();
                            Object[] data = tableWithSmallestLsn.getData();
                            if (!(operation != 3 || tableWithSmallestLsn.next() && tableWithSmallestLsn.getOperation() == 4)) {
                                throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                            }
                            Object[] dataNext = operation == 3 ? tableWithSmallestLsn.getData() : null;
                            this.offsetContext.setChangePosition(tableWithSmallestLsn.getChangePosition());
                            this.offsetContext.setSourceTime(this.connection.timestampOfLsn(tableWithSmallestLsn.getChangePosition().getCommitLsn()));
                            this.offsetContext.setTableId(tableWithSmallestLsn.getChangeTable().getSourceTableId());
                            this.dispatcher.dispatchDataChangeEvent((DataCollectionId)tableId, (ChangeRecordEmitter)new SqlServerChangeRecordEmitter(this.offsetContext, operation, data, dataNext, this.clock));
                            tableWithSmallestLsn.next();
                        }
                    });
                    lastProcessedPosition = TxLogPosition.valueOf(currentMaxLsn);
                    this.connection.rollback();
                }
                catch (SQLException e) {
                    tablesSlot.set(this.processErrorFromChangeTableQuery(e, tablesSlot.get()));
                }
            }
        }
        catch (Exception e) {
            this.errorHandler.setProducerThrowable((Throwable)e);
        }
    }

    private void migrateTable(Queue<ChangeTable> schemaChangeCheckpoints) throws InterruptedException, SQLException {
        ChangeTable newTable = schemaChangeCheckpoints.poll();
        LOGGER.info("Migrating schema to {}", (Object)newTable);
        this.dispatcher.dispatchSchemaChangeEvent((DataCollectionId)newTable.getSourceTableId(), (SchemaChangeEventEmitter)new SqlServerSchemaChangeEventEmitter(this.offsetContext, newTable, this.connection.getTableSchemaFromTable(newTable), SchemaChangeEvent.SchemaChangeEventType.ALTER));
    }

    private ChangeTable[] processErrorFromChangeTableQuery(SQLException exception, ChangeTable[] currentChangeTables) throws Exception {
        Matcher m = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(exception.getMessage());
        if (m.matches()) {
            String captureName = m.group(1);
            LOGGER.info("Table is no longer captured with capture instance {}", (Object)captureName);
            return Arrays.asList(currentChangeTables).stream().filter(x -> !x.getCaptureInstance().equals(captureName)).collect(Collectors.toList()).toArray(new ChangeTable[0]);
        }
        throw exception;
    }

    private ChangeTable[] getCdcTablesToQuery() throws SQLException, InterruptedException {
        Map<TableId, List<ChangeTable>> whitelistedCdcEnabledTables;
        Set<ChangeTable> cdcEnabledTables = this.connection.listOfChangeTables();
        if (cdcEnabledTables.isEmpty()) {
            LOGGER.warn("No table has enabled CDC or security constraints prevents getting the list of change tables");
        }
        if ((whitelistedCdcEnabledTables = cdcEnabledTables.stream().filter(changeTable -> {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(changeTable.getSourceTableId())) {
                return true;
            }
            LOGGER.info("CDC is enabled for table {} but the table is not whitelisted by connector", changeTable);
            return false;
        }).collect(Collectors.groupingBy(x -> x.getSourceTableId()))).isEmpty()) {
            LOGGER.warn("No whitelisted table has enabled CDC, whitelisted table list does not contain any table with CDC enabled or no table match the white/blacklist filter(s)");
        }
        ArrayList<ChangeTable> tables = new ArrayList<ChangeTable>();
        for (List<ChangeTable> captures : whitelistedCdcEnabledTables.values()) {
            ChangeTable currentTable = captures.get(0);
            if (captures.size() > 1) {
                ChangeTable futureTable;
                if (captures.get(0).getStartLsn().compareTo(captures.get(1).getStartLsn()) < 0) {
                    futureTable = captures.get(1);
                } else {
                    currentTable = captures.get(1);
                    futureTable = captures.get(0);
                }
                currentTable.setStopLsn(futureTable.getStartLsn());
                tables.add(futureTable);
                LOGGER.info("Multiple capture instances present for the same table: {} and {}", (Object)currentTable, (Object)futureTable);
            }
            if (this.schema.tableFor(currentTable.getSourceTableId()) == null) {
                LOGGER.info("Table {} is new to be monitored by capture instance {}", (Object)currentTable.getSourceTableId(), (Object)currentTable.getCaptureInstance());
                this.dispatcher.dispatchSchemaChangeEvent((DataCollectionId)currentTable.getSourceTableId(), (SchemaChangeEventEmitter)new SqlServerSchemaChangeEventEmitter(this.offsetContext, currentTable, this.connection.getTableSchemaFromTable(currentTable), SchemaChangeEvent.SchemaChangeEventType.CREATE));
            }
            tables.add(currentTable);
        }
        return tables.toArray(new ChangeTable[tables.size()]);
    }

    public void commitOffset(Map<String, ?> offset) {
    }

    private static class ChangeTablePointer {
        private final ChangeTable changeTable;
        private final ResultSet resultSet;
        private boolean completed = false;
        private TxLogPosition currentChangePosition;

        public ChangeTablePointer(ChangeTable changeTable, ResultSet resultSet) {
            this.changeTable = changeTable;
            this.resultSet = resultSet;
        }

        public ChangeTable getChangeTable() {
            return this.changeTable;
        }

        public TxLogPosition getChangePosition() throws SQLException {
            return this.currentChangePosition;
        }

        public int getOperation() throws SQLException {
            return this.resultSet.getInt(3);
        }

        public Object[] getData() throws SQLException {
            int dataColumnCount = this.resultSet.getMetaData().getColumnCount() - 4;
            Object[] data = new Object[dataColumnCount];
            for (int i = 0; i < dataColumnCount; ++i) {
                data[i] = this.resultSet.getObject(5 + i);
            }
            return data;
        }

        public boolean next() throws SQLException {
            this.completed = !this.resultSet.next();
            TxLogPosition txLogPosition = this.currentChangePosition = this.completed ? TxLogPosition.NULL : TxLogPosition.valueOf(Lsn.valueOf(this.resultSet.getBytes(1)), Lsn.valueOf(this.resultSet.getBytes(2)));
            if (this.completed) {
                LOGGER.trace("Closing result set of change tables for table {}", (Object)this.changeTable);
                this.resultSet.close();
            }
            return !this.completed;
        }

        public boolean isCompleted() {
            return this.completed;
        }

        public int compareTo(ChangeTablePointer o) throws SQLException {
            return this.getChangePosition().compareTo(o.getChangePosition());
        }

        public String toString() {
            return "ChangeTablePointer [changeTable=" + this.changeTable + ", resultSet=" + this.resultSet + ", completed=" + this.completed + ", currentChangePosition=" + this.currentChangePosition + "]";
        }
    }
}

