/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.sqlserver;

import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.SqlServerChangeRecordEmitter;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
import io.debezium.connector.sqlserver.SqlServerOffsetContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqlServerStreamingChangeEventSource
implements StreamingChangeEventSource {
    private static final int COL_COMMIT_LSN = 1;
    private static final int COL_ROW_LSN = 2;
    private static final int COL_OPERATION = 3;
    private static final int COL_DATA = 5;
    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class);
    private final SqlServerConnection connection;
    private final EventDispatcher<TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final SqlServerDatabaseSchema schema;
    private final SqlServerOffsetContext offsetContext;
    private final Duration pollInterval;

    public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext offsetContext, SqlServerConnection connection, EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, SqlServerDatabaseSchema schema) {
        this.connection = connection;
        this.dispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = schema;
        this.offsetContext = offsetContext;
        this.pollInterval = Duration.ofSeconds(1L);
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        Metronome metronome = Metronome.sleeper((Duration)this.pollInterval, (Clock)this.clock);
        try {
            TableId[] tables = this.schema.getCapturedTables().toArray(new TableId[this.schema.getCapturedTables().size()]);
            Lsn lastProcessedLsn = this.offsetContext.getChangeLsn();
            while (context.isRunning()) {
                Lsn currentMaxLsn = this.connection.getMaxLsn();
                if (!currentMaxLsn.isAvailable()) {
                    LOGGER.debug("No maximum LSN recorded in the database");
                    metronome.pause();
                    continue;
                }
                if (currentMaxLsn.equals(lastProcessedLsn)) {
                    LOGGER.debug("No change in the database");
                    metronome.pause();
                    continue;
                }
                Lsn fromLsn = lastProcessedLsn.isAvailable() ? this.connection.incrementLsn(lastProcessedLsn) : lastProcessedLsn;
                this.connection.getChangesForTables(tables, fromLsn, currentMaxLsn, resultSets -> {
                    int tableCount = resultSets.length;
                    ChangeTable[] changeTables = new ChangeTable[tableCount];
                    for (int i = 0; i < tableCount; ++i) {
                        changeTables[i] = new ChangeTable(tables[i], resultSets[i]);
                        changeTables[i].next();
                    }
                    while (true) {
                        ChangeTable tableSmallestLsn = null;
                        for (int i = 0; i < tableCount; ++i) {
                            ChangeTable changeTable = changeTables[i];
                            if (changeTable.isCompleted() || tableSmallestLsn != null && changeTable.compareTo(tableSmallestLsn) >= 0) continue;
                            tableSmallestLsn = changeTable;
                        }
                        if (tableSmallestLsn == null) break;
                        LOGGER.trace("Processing change {}", tableSmallestLsn);
                        TableId tableId = tableSmallestLsn.getTableId();
                        Lsn commitLsn = tableSmallestLsn.getCommitLsn();
                        Lsn rowLsn = tableSmallestLsn.getRowLsn();
                        int operation = tableSmallestLsn.getOperation();
                        Object[] data = tableSmallestLsn.getData();
                        if (!(operation != 3 || tableSmallestLsn.next() && tableSmallestLsn.getOperation() == 4)) {
                            throw new IllegalStateException("The update before event at " + rowLsn + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                        }
                        Object[] dataNext = operation == 3 ? tableSmallestLsn.getData() : null;
                        this.offsetContext.setChangeLsn(rowLsn);
                        this.offsetContext.setCommitLsn(commitLsn);
                        this.offsetContext.setSourceTime(this.connection.timestampOfLsn(commitLsn));
                        this.dispatcher.dispatchDataChangeEvent((DataCollectionId)tableId, (ChangeRecordEmitter)new SqlServerChangeRecordEmitter(this.offsetContext, operation, data, dataNext, this.schema.tableFor(tableId), this.clock));
                        tableSmallestLsn.next();
                    }
                });
                lastProcessedLsn = currentMaxLsn;
            }
        }
        catch (Exception e) {
            throw new ConnectException((Throwable)e);
        }
    }

    public void commitOffset(Map<String, ?> offset) {
    }

    private static class ChangeTable {
        private final TableId tableId;
        private final ResultSet resultSet;
        private boolean completed = false;
        private Lsn currentChangeLsn;

        public ChangeTable(TableId tableId, ResultSet resultSet) {
            this.tableId = tableId;
            this.resultSet = resultSet;
        }

        public TableId getTableId() {
            return this.tableId;
        }

        public Lsn getCommitLsn() throws SQLException {
            return Lsn.valueOf(this.resultSet.getBytes(1));
        }

        public Lsn getRowLsn() throws SQLException {
            return this.currentChangeLsn;
        }

        public int getOperation() throws SQLException {
            return this.resultSet.getInt(3);
        }

        public Object[] getData() throws SQLException {
            int dataColumnCount = this.resultSet.getMetaData().getColumnCount() - 4;
            Object[] data = new Object[dataColumnCount];
            for (int i = 0; i < dataColumnCount; ++i) {
                data[i] = this.resultSet.getObject(5 + i);
            }
            return data;
        }

        public boolean next() throws SQLException {
            this.completed = !this.resultSet.next();
            this.currentChangeLsn = this.completed ? Lsn.NULL : Lsn.valueOf(this.resultSet.getBytes(2));
            return !this.completed;
        }

        public boolean isCompleted() {
            return this.completed;
        }

        public int compareTo(ChangeTable o) throws SQLException {
            return this.getRowLsn().compareTo(o.getRowLsn());
        }

        public String toString() {
            return "ChangeTable [tableId=" + this.tableId + ", resultSet=" + this.resultSet + ", completed=" + this.completed + "]";
        }
    }
}

