package io.debezium.connector.oracle.logminer;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.parser.DmlParser;
import io.debezium.connector.oracle.logminer.parser.DmlParserException;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser;
import io.debezium.connector.oracle.logminer.parser.SelectLobParser;
import io.debezium.connector.oracle.logminer.parser.SimpleDmlParser;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.class */
class LogMinerQueryResultProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerQueryResultProcessor.class);
    private final ChangeEventSource.ChangeEventSourceContext context;
    private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
    private final TransactionalBuffer transactionalBuffer;
    private final DmlParser dmlParser;
    private final OracleOffsetContext offsetContext;
    private final OracleDatabaseSchema schema;
    private final EventDispatcher<TableId> dispatcher;
    private final OracleConnectorConfig connectorConfig;
    private final HistoryRecorder historyRecorder;
    private Scn currentOffsetScn = Scn.NULL;
    private Scn currentOffsetCommitScn = Scn.NULL;
    private Scn lastProcessedScn = Scn.NULL;
    private long stuckScnCounter = 0;
    private final SelectLobParser selectLobParser = new SelectLobParser();

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleConnectorConfig oracleConnectorConfig, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics, TransactionalBuffer transactionalBuffer, OracleOffsetContext oracleOffsetContext, OracleDatabaseSchema oracleDatabaseSchema, EventDispatcher<TableId> eventDispatcher, HistoryRecorder historyRecorder) {
        this.context = changeEventSourceContext;
        this.streamingMetrics = oracleStreamingChangeEventSourceMetrics;
        this.transactionalBuffer = transactionalBuffer;
        this.offsetContext = oracleOffsetContext;
        this.schema = oracleDatabaseSchema;
        this.dispatcher = eventDispatcher;
        this.historyRecorder = historyRecorder;
        this.connectorConfig = oracleConnectorConfig;
        this.dmlParser = resolveParser(oracleConnectorConfig, oracleDatabaseSchema.getValueConverters());
    }

    private static DmlParser resolveParser(OracleConnectorConfig oracleConnectorConfig, OracleValueConverters oracleValueConverters) {
        return oracleConnectorConfig.getLogMiningDmlParser().equals(OracleConnectorConfig.LogMiningDmlParser.LEGACY) ? new SimpleDmlParser(oracleConnectorConfig.getCatalogName(), oracleValueConverters) : new LogMinerDmlParser();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Failed to find 'out' block for switch in B:113:0x04b1. Please report as an issue. */
    /* JADX WARN: Failed to find 'out' block for switch in B:12:0x0130. Please report as an issue. */
    public void processResult(ResultSet resultSet) throws SQLException {
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        int i5 = 0;
        int i6 = 0;
        long j = 0;
        Instant now = Instant.now();
        while (this.context.isRunning() && hasNext(resultSet)) {
            j++;
            Scn scn = RowMapper.getScn(resultSet);
            if (scn.isNull()) {
                throw new DebeziumException("Unexpected null SCN detected in LogMiner results");
            }
            String tableName = RowMapper.getTableName(resultSet);
            String segOwner = RowMapper.getSegOwner(resultSet);
            int operationCode = RowMapper.getOperationCode(resultSet);
            Timestamp changeTime = RowMapper.getChangeTime(resultSet);
            String transactionId = RowMapper.getTransactionId(resultSet);
            String operation = RowMapper.getOperation(resultSet);
            String username = RowMapper.getUsername(resultSet);
            String rowId = RowMapper.getRowId(resultSet);
            int rollbackFlag = RowMapper.getRollbackFlag(resultSet);
            Object rsId = RowMapper.getRsId(resultSet);
            String sqlRedo = RowMapper.getSqlRedo(resultSet, isDmlOperation(operationCode), this.historyRecorder, scn, tableName, segOwner, operationCode, changeTime, transactionId);
            LOGGER.trace("scn={}, operationCode={}, operation={}, table={}, segOwner={}, userName={}, rowId={}, rollbackFlag={}", new Object[]{scn, Integer.valueOf(operationCode), operation, tableName, segOwner, username, rowId, Integer.valueOf(rollbackFlag)});
            String format = String.format("transactionId=%s, SCN=%s, table_name=%s, segOwner=%s, operationCode=%s, offsetSCN=%s,  commitOffsetSCN=%s", transactionId, scn, tableName, segOwner, Integer.valueOf(operationCode), this.offsetContext.getScn(), this.offsetContext.getCommitScn());
            if (operationCode != 34) {
                this.lastProcessedScn = scn;
            }
            switch (operationCode) {
                case RowMapper.INSERT /* 1 */:
                case RowMapper.DELETE /* 2 */:
                case RowMapper.UPDATE /* 3 */:
                    LOGGER.trace("DML, {}, sql {}", format, sqlRedo);
                    if (sqlRedo != null) {
                        TableId tableId = RowMapper.getTableId(this.connectorConfig.getCatalogName(), resultSet);
                        i++;
                        switch (operationCode) {
                            case RowMapper.INSERT /* 1 */:
                                i2++;
                                break;
                            case RowMapper.DELETE /* 2 */:
                                i4++;
                                break;
                            case RowMapper.UPDATE /* 3 */:
                                i3++;
                                break;
                        }
                        Table tableForDmlEvent = getTableForDmlEvent(tableId);
                        if (rollbackFlag == 1) {
                            this.transactionalBuffer.undoDmlOperation(transactionId, rowId, tableId);
                        } else {
                            this.transactionalBuffer.registerDmlOperation(operationCode, transactionId, scn, tableId, () -> {
                                LogMinerDmlEntry parse = parse(sqlRedo, tableForDmlEvent, transactionId);
                                parse.setObjectOwner(segOwner);
                                parse.setObjectName(tableName);
                                return parse;
                            }, changeTime.toInstant(), rowId, rsId);
                        }
                    } else {
                        LOGGER.trace("Redo SQL was empty, DML operation skipped.");
                    }
                case RowMapper.DDL /* 5 */:
                    if (this.transactionalBuffer.isDdlOperationRegistered(scn)) {
                        LOGGER.trace("DDL: {} has already been seen, skipped.", sqlRedo);
                    } else {
                        this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, transactionId, 0, sqlRedo);
                        LOGGER.info("DDL: {}, REDO_SQL: {}", format, sqlRedo);
                        if (tableName != null) {
                            try {
                                TableId tableId2 = RowMapper.getTableId(this.connectorConfig.getCatalogName(), resultSet);
                                this.transactionalBuffer.registerDdlOperation(scn);
                                this.dispatcher.dispatchSchemaChangeEvent(tableId2, new OracleSchemaChangeEventEmitter(this.connectorConfig, this.offsetContext, tableId2, tableId2.catalog(), tableId2.schema(), sqlRedo, this.schema, changeTime.toInstant(), this.streamingMetrics));
                            } catch (InterruptedException e) {
                                throw new DebeziumException("Failed to dispatch DDL event", e);
                            }
                        }
                        this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, transactionId, 0, sqlRedo);
                        LogMinerHelper.logWarn(this.streamingMetrics, "Missing SCN, {}", format);
                    }
                case RowMapper.START /* 6 */:
                    this.transactionalBuffer.registerTransaction(transactionId, scn);
                case RowMapper.COMMIT /* 7 */:
                    if (this.transactionalBuffer.isTransactionRegistered(transactionId)) {
                        this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, transactionId, 0, sqlRedo);
                        if (this.transactionalBuffer.commit(transactionId, scn, this.offsetContext, changeTime, this.context, format, this.dispatcher)) {
                            LOGGER.trace("COMMIT, {}", format);
                            i5++;
                        }
                    }
                case RowMapper.SELECT_LOB_LOCATOR /* 9 */:
                    if (this.connectorConfig.isLobEnabled()) {
                        LOGGER.trace("SEL_LOB_LOCATOR: {}, REDO_SQL: {}", format, sqlRedo);
                        TableId tableId3 = RowMapper.getTableId(this.connectorConfig.getCatalogName(), resultSet);
                        if (this.schema.tableFor(tableId3) == null) {
                            LogMinerHelper.logWarn(this.streamingMetrics, "SEL_LOB_LOCATOR for table '{}' is not known to the connector, skipped.", tableId3);
                        } else {
                            this.transactionalBuffer.registerSelectLobOperation(operationCode, transactionId, scn, tableId3, changeTime.toInstant(), rowId, rsId, segOwner, tableName, sqlRedo, this.schema.tableFor(tableId3), this.selectLobParser);
                        }
                    } else {
                        LOGGER.trace("SEL_LOB_LOCATOR operation skipped for '{}', LOB not enabled.", sqlRedo);
                    }
                case RowMapper.LOB_WRITE /* 10 */:
                    if (this.connectorConfig.isLobEnabled()) {
                        TableId tableId4 = RowMapper.getTableId(this.connectorConfig.getCatalogName(), resultSet);
                        if (this.schema.tableFor(tableId4) == null) {
                            LogMinerHelper.logWarn(this.streamingMetrics, "LOB_WRITE for table '{}' is not known to the connector, skipped.", tableId4);
                        } else {
                            this.transactionalBuffer.registerLobWriteOperation(operationCode, transactionId, scn, tableId4, sqlRedo, changeTime.toInstant(), rowId, rsId);
                        }
                    } else {
                        LOGGER.trace("LOB_WRITE operation skipped, LOB not enabled.");
                    }
                case RowMapper.LOB_ERASE /* 29 */:
                    if (this.connectorConfig.isLobEnabled()) {
                        TableId tableId5 = RowMapper.getTableId(this.connectorConfig.getCatalogName(), resultSet);
                        if (this.schema.tableFor(tableId5) == null) {
                            LogMinerHelper.logWarn(this.streamingMetrics, "LOB_ERASE for table '{}' is not known to the connector, skipped.", tableId5);
                        } else {
                            this.transactionalBuffer.registerLobEraseOperation(operationCode, transactionId, scn, tableId5, changeTime.toInstant(), rowId, rsId);
                        }
                    } else {
                        LOGGER.trace("LOB_ERASE operation skipped, LOB not enabled.");
                    }
                case RowMapper.MISSING_SCN /* 34 */:
                    this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, transactionId, 0, sqlRedo);
                    LogMinerHelper.logWarn(this.streamingMetrics, "Missing SCN, {}", format);
                case RowMapper.ROLLBACK /* 36 */:
                    if (this.transactionalBuffer.isTransactionRegistered(transactionId)) {
                        this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, transactionId, 0, sqlRedo);
                        if (this.transactionalBuffer.rollback(transactionId, format)) {
                            LOGGER.trace("ROLLBACK, {}", format);
                            i6++;
                        }
                    }
            }
        }
        Duration between = Duration.between(now, Instant.now());
        if (i > 0 || i5 > 0 || i6 > 0) {
            this.streamingMetrics.setLastCapturedDmlCount(i);
            this.streamingMetrics.setLastDurationOfBatchProcessing(between);
            warnStuckScn();
            this.currentOffsetScn = this.offsetContext.getScn();
            if (this.offsetContext.getCommitScn() != null) {
                this.currentOffsetCommitScn = this.offsetContext.getCommitScn();
            }
        }
        LOGGER.debug("{} Rows, {} DMLs, {} Commits, {} Rollbacks, {} Inserts, {} Updates, {} Deletes. Processed in {} millis. Lag:{}. Offset scn:{}. Offset commit scn:{}. Active transactions:{}. Sleep time:{}", new Object[]{Long.valueOf(j), Integer.valueOf(i), Integer.valueOf(i5), Integer.valueOf(i6), Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i4), Long.valueOf(between.toMillis()), Long.valueOf(this.streamingMetrics.getLagFromSourceInMilliseconds()), this.offsetContext.getScn(), this.offsetContext.getCommitScn(), Long.valueOf(this.streamingMetrics.getNumberOfActiveTransactions()), Long.valueOf(this.streamingMetrics.getMillisecondToSleepBetweenMiningQuery())});
        this.streamingMetrics.addProcessedRows(Long.valueOf(j));
        this.historyRecorder.flush();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Scn getLastProcessedScn() {
        return this.lastProcessedScn;
    }

    private boolean hasNext(ResultSet resultSet) throws SQLException {
        Instant now = Instant.now();
        if (!resultSet.next()) {
            return false;
        }
        this.streamingMetrics.addCurrentResultSetNext(Duration.between(now, Instant.now()));
        return true;
    }

    private Table getTableForDmlEvent(TableId tableId) throws SQLException {
        Table tableFor = this.schema.tableFor(tableId);
        if (tableFor == null) {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
                tableFor = dispatchSchemaChangeEventAndGetTableForNewCapturedTable(tableId);
            } else {
                LogMinerHelper.logWarn(this.streamingMetrics, "DML for table '{}' that is not known to this connector, skipping.", tableId);
            }
        }
        return tableFor;
    }

    private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId tableId) throws SQLException {
        try {
            LOGGER.info("Table {} is new and will be captured.", tableId);
            this.offsetContext.event(tableId, Instant.now());
            this.dispatcher.dispatchSchemaChangeEvent(tableId, new OracleSchemaChangeEventEmitter(this.connectorConfig, this.offsetContext, tableId, tableId.catalog(), tableId.schema(), getTableMetadataDdl(tableId), this.schema, Instant.now(), this.streamingMetrics));
            return this.schema.tableFor(tableId);
        } catch (InterruptedException e) {
            throw new DebeziumException("Failed to dispatch schema change event", e);
        }
    }

    private String getTableMetadataDdl(TableId tableId) throws SQLException {
        String pdbName = this.connectorConfig.getPdbName();
        OracleConnection oracleConnection = new OracleConnection(this.connectorConfig.getJdbcConfig(), () -> {
            return getClass().getClassLoader();
        });
        if (pdbName != null) {
            try {
                oracleConnection.setSessionToPdb(pdbName);
            } catch (Throwable th) {
                try {
                    oracleConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        String tableMetadataDdl = oracleConnection.getTableMetadataDdl(tableId);
        oracleConnection.close();
        return tableMetadataDdl;
    }

    private void warnStuckScn() {
        if (this.offsetContext == null || this.offsetContext.getCommitScn() == null) {
            return;
        }
        Scn scn = this.offsetContext.getScn();
        Scn commitScn = this.offsetContext.getCommitScn();
        if (!this.currentOffsetScn.equals(scn) || this.currentOffsetCommitScn.equals(commitScn)) {
            this.stuckScnCounter = 0L;
            return;
        }
        this.stuckScnCounter++;
        if (this.stuckScnCounter == 25) {
            LogMinerHelper.logWarn(this.streamingMetrics, "Offset SCN {} is not changing. It indicates long transaction(s). Offset commit SCN: {}", this.currentOffsetScn, commitScn);
            this.streamingMetrics.incrementScnFreezeCount();
        }
    }

    private LogMinerDmlEntry parse(String str, Table table, String str2) {
        try {
            Instant now = Instant.now();
            LogMinerDmlEntry parse = this.dmlParser.parse(str, table, str2);
            this.streamingMetrics.addCurrentParseTime(Duration.between(now, Instant.now()));
            if (parse.getOldValues().length == 0 && (3 == parse.getOperation() || 2 == parse.getOperation())) {
                LOGGER.warn("The DML event '{}' contained no before state.", str);
                this.streamingMetrics.incrementWarningCount();
            }
            return parse;
        } catch (DmlParserException e) {
            StringBuilder sb = new StringBuilder();
            sb.append("DML statement couldn't be parsed.");
            sb.append(" Please open a Jira issue with the statement '").append(str).append("'.");
            if (OracleConnectorConfig.LogMiningDmlParser.FAST.equals(this.connectorConfig.getLogMiningDmlParser())) {
                sb.append(" You can set internal.log.mining.dml.parser='legacy' as a workaround until the parse error is fixed.");
            }
            throw new DmlParserException(sb.toString(), e);
        }
    }

    private static boolean isDmlOperation(int i) {
        switch (i) {
            case RowMapper.INSERT /* 1 */:
            case RowMapper.DELETE /* 2 */:
            case RowMapper.UPDATE /* 3 */:
                return true;
            default:
                return false;
        }
    }
}
