/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer.processor;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerChangeRecordEmitter;
import io.debezium.connector.oracle.logminer.LogMinerQueryBuilder;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.SqlUtils;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.ExtendedStringBeginEvent;
import io.debezium.connector.oracle.logminer.events.ExtendedStringWriteEvent;
import io.debezium.connector.oracle.logminer.events.LobEraseEvent;
import io.debezium.connector.oracle.logminer.events.LobWriteEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.events.RedoSqlDmlEvent;
import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent;
import io.debezium.connector.oracle.logminer.events.TruncateEvent;
import io.debezium.connector.oracle.logminer.events.XmlBeginEvent;
import io.debezium.connector.oracle.logminer.events.XmlEndEvent;
import io.debezium.connector.oracle.logminer.events.XmlWriteEvent;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.parser.DmlParserException;
import io.debezium.connector.oracle.logminer.parser.ExtendedStringParser;
import io.debezium.connector.oracle.logminer.parser.LogMinerColumnResolverDmlParser;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser;
import io.debezium.connector.oracle.logminer.parser.SelectLobParser;
import io.debezium.connector.oracle.logminer.parser.XmlBeginParser;
import io.debezium.connector.oracle.logminer.processor.CacheProvider;
import io.debezium.connector.oracle.logminer.processor.InMemoryPendingTransactionsCache;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.Transaction;
import io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer;
import io.debezium.data.Envelope;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.text.ParsingException;
import io.debezium.util.Clock;
import io.debezium.util.Loggings;
import io.debezium.util.Strings;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import oracle.sql.RAW;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractLogMinerEventProcessor<T extends Transaction>
implements LogMinerEventProcessor,
CacheProvider<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class);
    private static final Logger ABANDONED_DETAILS_LOGGER = LoggerFactory.getLogger((String)(AbstractLogMinerEventProcessor.class.getName() + ".AbandonedDetails"));
    private static final String NO_SEQUENCE_TRX_ID_SUFFIX = "ffffffff";
    private static final String XML_WRITE_PREAMBLE = "XML_REDO := ";
    private static final String XML_WRITE_PREAMBLE_NULL = "XML_REDO := NULL";
    private final OracleConnection jdbcConnection;
    private final ChangeEventSource.ChangeEventSourceContext context;
    private final OracleConnectorConfig connectorConfig;
    private final OracleDatabaseSchema schema;
    private final OraclePartition partition;
    private final OracleOffsetContext offsetContext;
    private final EventDispatcher<OraclePartition, TableId> dispatcher;
    private final LogMinerStreamingChangeEventSourceMetrics metrics;
    private final LogMinerDmlParser dmlParser;
    private final LogMinerColumnResolverDmlParser reconstructColumnDmlParser;
    private final SelectLobParser selectLobParser;
    private final ExtendedStringParser extendedStringParser;
    private final XmlBeginParser xmlBeginParser;
    private final Tables.TableFilter tableFilter;
    protected final Counters counters;
    protected final String sqlQuery;
    private Scn currentOffsetScn = Scn.NULL;
    private Map<Integer, Scn> currentOffsetCommitScns = new HashMap<Integer, Scn>();
    private Instant lastProcessedScnChangeTime = null;
    private Scn lastProcessedScn = Scn.NULL;
    private boolean sequenceUnavailable = false;
    private final Set<String> abandonedTransactionsCache = new HashSet<String>();
    private final InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache();
    private static Pattern LOB_WRITE_SQL_PATTERN = Pattern.compile("(?s).* := ((?:HEXTORAW\\()?'.*'(?:\\))?);\\s*dbms_lob.write\\([^,]+,\\s*(\\d+)\\s*,\\s*(\\d+)\\s*,[^,]+\\);.*");

    protected AbstractLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema, OraclePartition partition, OracleOffsetContext offsetContext, EventDispatcher<OraclePartition, TableId> dispatcher, LogMinerStreamingChangeEventSourceMetrics metrics, OracleConnection jdbcConnection) {
        this.context = context;
        this.connectorConfig = connectorConfig;
        this.schema = schema;
        this.partition = partition;
        this.offsetContext = offsetContext;
        this.dispatcher = dispatcher;
        this.metrics = metrics;
        this.tableFilter = connectorConfig.getTableFilters().dataCollectionFilter();
        this.counters = new Counters();
        this.dmlParser = new LogMinerDmlParser(connectorConfig);
        this.reconstructColumnDmlParser = new LogMinerColumnResolverDmlParser(connectorConfig);
        this.selectLobParser = new SelectLobParser();
        this.extendedStringParser = new ExtendedStringParser();
        this.xmlBeginParser = new XmlBeginParser();
        this.sqlQuery = LogMinerQueryBuilder.build(connectorConfig);
        this.jdbcConnection = jdbcConnection;
    }

    protected void reCreateInMemoryCache() {
        this.getTransactionCache().keys(trStream -> trStream.forEach(tr -> this.getEventCache().keys(eventStream -> {
            int count = (int)eventStream.filter(e -> e.startsWith(tr + "-")).count();
            LOGGER.info("Re-creating in memory cache of event count for transaction '" + tr + "'. No of events found: " + count);
            this.inMemoryPendingTransactionsCache.initKey((String)tr, count);
        })));
    }

    protected Set<String> getAbandonedTransactionsCache() {
        return this.abandonedTransactionsCache;
    }

    protected OracleConnectorConfig getConfig() {
        return this.connectorConfig;
    }

    protected OracleDatabaseSchema getSchema() {
        return this.schema;
    }

    protected boolean isRecentlyProcessed(String transactionId) {
        return this.getProcessedTransactionsCache().containsKey(transactionId);
    }

    protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
        return this.getSchemaChangesCache().containsKey(row.getScn().toString());
    }

    protected Scn getLastProcessedScn() {
        return this.lastProcessedScn;
    }

    protected Instant getLastProcessedScnChangeTime() {
        return this.lastProcessedScnChangeTime;
    }

    protected abstract T createTransaction(LogMinerEventRow var1);

    protected void removeEventWithRowId(LogMinerEventRow row) {
        Transaction transaction = (Transaction)this.getTransactionCache().get(row.getTransactionId());
        if (transaction != null) {
            if (this.removeTransactionEventWithRowId(transaction, row)) {
                return;
            }
            Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Cannot undo change on table '{}' since event with row-id {} was not found.", (Object[])new Object[]{row.getTableId(), row.getRowId()});
        } else if (this.isTransactionIdWithNoSequence(row.getTransactionId())) {
            String prefix = this.getTransactionIdPrefix(row.getTransactionId());
            LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", (Object)row.getTransactionId());
            LOGGER.debug("Checking all transactions with prefix '{}'", (Object)prefix);
            if (this.getTransactionCache().streamAndReturn(stream -> stream.filter(entry -> ((String)entry.getKey()).startsWith(prefix)).map(LogMinerCache.Entry::getValue).anyMatch(t -> this.removeTransactionEventWithRowId(t, row))).booleanValue()) {
                return;
            }
            Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Cannot undo change on table '{}' since event with row-id {} was not found.", (Object[])new Object[]{row.getTableId(), row.getRowId()});
        } else if (!this.getConfig().isLobEnabled()) {
            Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Cannot undo change on table '{}' since transaction '{}' was not found.", (Object[])new Object[]{row.getTableId(), row.getTransactionId()});
        } else {
            Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Failed to undo change on table '{}' in transaction '{}' with row-id '{}'", (Object[])new Object[]{row.getTableId(), row.getTransactionId(), row.getRowId()});
        }
    }

    protected boolean removeTransactionEventWithRowId(T transaction, LogMinerEventRow row) {
        for (int i = transaction.getNumberOfEvents() - 1; i >= 0; --i) {
            String eventKey = transaction.getEventId(i);
            LogMinerEvent event = this.getEventCache().get(eventKey);
            if (event == null || !event.getRowId().equals(row.getRowId())) continue;
            this.metrics.increasePartialRollbackCount();
            ++this.counters.partialRollbackCount;
            Loggings.logDebugAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Undo change on table '{}' applied to transaction '{}'", (Object[])new Object[]{row.getTableId(), eventKey});
            this.getEventCache().remove(eventKey);
            this.inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
            return true;
        }
        return false;
    }

    protected int getTransactionEventCount(T transaction) {
        return this.inMemoryPendingTransactionsCache.getNumPending(transaction.getTransactionId());
    }

    protected boolean isTrxIdRawValue() {
        return true;
    }

    @Override
    public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedException {
        this.counters.reset();
        try (PreparedStatement statement = this.createQueryStatement();){
            Scn scn;
            block19: {
                ResultSet resultSet;
                block17: {
                    Scn scn2;
                    block18: {
                        LOGGER.debug("Fetching results for SCN [{}, {}]", (Object)startScn, (Object)endScn);
                        statement.setFetchSize(this.getConfig().getQueryFetchSize());
                        statement.setFetchDirection(1000);
                        statement.setString(1, startScn.toString());
                        statement.setString(2, endScn.toString());
                        Instant queryStart = Instant.now();
                        resultSet = statement.executeQuery();
                        try {
                            this.metrics.setLastDurationOfFetchQuery(Duration.between(queryStart, Instant.now()));
                            Instant startProcessTime = Instant.now();
                            this.processResults(this.partition, resultSet);
                            Duration totalTime = Duration.between(startProcessTime, Instant.now());
                            this.metrics.setLastCapturedDmlCount(this.counters.dmlCount);
                            if (this.counters.dmlCount > 0 || this.counters.commitCount > 0 || this.counters.rollbackCount > 0) {
                                this.warnPotentiallyStuckScn(this.currentOffsetScn, this.currentOffsetCommitScns);
                                this.currentOffsetScn = this.offsetContext.getScn();
                                if (this.offsetContext.getCommitScn() != null) {
                                    this.currentOffsetCommitScns = this.offsetContext.getCommitScn().getCommitScnForAllRedoThreads();
                                }
                            }
                            LOGGER.debug("{}.", (Object)this.counters);
                            LOGGER.debug("Processed in {} ms. Lag: {}. Offset SCN: {}, Offset Commit SCN: {}, Active Transactions: {}, Sleep: {}", new Object[]{totalTime.toMillis(), this.metrics.getLagFromSourceInMilliseconds(), this.offsetContext.getScn(), this.offsetContext.getCommitScn(), this.metrics.getNumberOfActiveTransactions(), this.metrics.getSleepTimeInMilliseconds()});
                            if (this.metrics.getNumberOfActiveTransactions() > 0L && LOGGER.isDebugEnabled()) {
                                this.getTransactionCache().values(values -> LOGGER.debug("All active transactions: {}", (Object)values.map(t -> t.getTransactionId() + " (" + String.valueOf(t.getStartScn()) + ")").collect(Collectors.joining(","))));
                            }
                            this.metrics.setLastProcessedRowsCount(this.counters.rows);
                            if (this.counters.rows != 0L) break block17;
                            scn2 = startScn;
                            if (resultSet == null) break block18;
                        }
                        catch (Throwable throwable) {
                            if (resultSet != null) {
                                try {
                                    resultSet.close();
                                }
                                catch (Throwable throwable2) {
                                    throwable.addSuppressed(throwable2);
                                }
                            }
                            throw throwable;
                        }
                        resultSet.close();
                    }
                    return scn2;
                }
                scn = this.calculateNewStartScn(endScn, this.offsetContext.getCommitScn().getMaxCommittedScn());
                if (resultSet == null) break block19;
                resultSet.close();
            }
            return scn;
        }
    }

    @Override
    public void displayCacheStatistics() {
        LOGGER.info("Overall Cache Statistics:");
        LOGGER.info("\tTransactions        : {}", (Object)this.getTransactionCache().size());
        LOGGER.info("\tRecent Transactions : {}", (Object)this.getProcessedTransactionsCache().size());
        LOGGER.info("\tSchema Changes      : {}", (Object)this.getSchemaChangesCache().size());
        LOGGER.info("\tEvents              : {}", (Object)this.getEventCache().size());
        if (!this.getEventCache().isEmpty() && LOGGER.isDebugEnabled()) {
            this.getEventCache().keys(stream -> stream.forEach(eventKey -> LOGGER.debug("\t\tFound Key: {}", eventKey)));
        }
    }

    protected String getQueryString() {
        return this.sqlQuery;
    }

    protected PreparedStatement createQueryStatement() throws SQLException {
        return this.jdbcConnection.connection().prepareStatement(this.getQueryString(), 1003, 1007, 1);
    }

    protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException {
        Instant minCacheScnChangeTime;
        Scn minCacheScn;
        Optional<T> oldestTransaction = this.getOldestTransactionInCache();
        if (oldestTransaction.isPresent()) {
            minCacheScn = ((Transaction)oldestTransaction.get()).getStartScn();
            minCacheScnChangeTime = ((Transaction)oldestTransaction.get()).getChangeTime();
        } else {
            minCacheScn = Scn.NULL;
            minCacheScnChangeTime = null;
        }
        if (!minCacheScn.isNull()) {
            this.abandonTransactions(this.getConfig().getLogMiningTransactionRetention());
            this.purgeCache(minCacheScn);
        } else {
            this.getSchemaChangesCache().removeIf(e -> true);
        }
        if (this.getConfig().isLobEnabled()) {
            if (this.getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) {
                this.offsetContext.setScn(maxCommittedScn);
                this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
            } else if (!minCacheScn.isNull()) {
                this.getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf((String)entry.getValue()).compareTo(minCacheScn) < 0);
                this.offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1)));
                this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
            }
            return this.offsetContext.getScn();
        }
        if (!this.getLastProcessedScn().isNull() && this.getLastProcessedScn().compareTo(endScn) < 0) {
            endScn = this.getLastProcessedScn();
        }
        this.offsetContext.setScn(minCacheScn.isNull() ? endScn : minCacheScn.subtract(Scn.valueOf(1)));
        this.metrics.setOldestScnDetails(minCacheScn, minCacheScnChangeTime);
        this.metrics.setOffsetScn(this.offsetContext.getScn());
        this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
        return endScn;
    }

    protected void processResults(OraclePartition partition, ResultSet resultSet) throws SQLException, InterruptedException {
        while (this.context.isRunning() && this.hasNextWithMetricsUpdate(resultSet)) {
            ++this.counters.rows;
            this.processRow(partition, LogMinerEventRow.fromResultSet(resultSet, this.getConfig().getCatalogName(), this.isTrxIdRawValue()));
        }
    }

    protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException {
        Map<String, Scn> snapshotPendingTransactions;
        String transactionId = row.getTransactionId();
        if (this.isRecentlyProcessed(transactionId)) {
            LOGGER.debug("Transaction {} has been seen by connector, skipped.", (Object)transactionId);
            return;
        }
        if (!row.getEventType().equals((Object)EventType.MISSING_SCN)) {
            this.lastProcessedScn = row.getScn();
            this.lastProcessedScnChangeTime = row.getChangeTime();
        }
        if (!(row.getScn().compareTo(this.offsetContext.getSnapshotScn()) >= 0 || (snapshotPendingTransactions = this.offsetContext.getSnapshotPendingTransactions()) != null && snapshotPendingTransactions.containsKey(row.getTransactionId()))) {
            LOGGER.debug("Skipping event {} (SCN {}) because it is already encompassed by the initial snapshot", (Object)row.getEventType(), (Object)row.getScn());
            return;
        }
        if (row.getTableId() != null) {
            if (LogWriterFlushStrategy.isFlushTable(row.getTableId(), this.connectorConfig.getJdbcConfig().getUser(), this.connectorConfig.getLogMiningFlushTableName())) {
                LOGGER.trace("Skipped change associated with flush table '{}'", (Object)row.getTableId());
                return;
            }
            if (!EventType.DDL.equals((Object)row.getEventType()) && !this.tableFilter.isIncluded(row.getTableId())) {
                if (this.isUsingHybridStrategy()) {
                    if (!this.isTableLookupByObjectIdRequired(row)) {
                        LOGGER.trace("Skipping change associated with table '{}' which does not match filters.", (Object)row.getTableId());
                        return;
                    }
                    LOGGER.debug("Found DML for dropped table in history with object-id based table name {}.", (Object)row.getTableId().table());
                    TableId tableId = this.schema.getTableIdByObjectId(row.getObjectId(), null);
                    if (tableId != null) {
                        row.setTableId(tableId);
                    }
                }
                if (!this.tableFilter.isIncluded(row.getTableId())) {
                    LOGGER.trace("Skipping change associated with table '{}' which does not match filters.", (Object)row.getTableId());
                    return;
                }
            }
        }
        switch (row.getEventType()) {
            case MISSING_SCN: {
                this.handleMissingScn(row);
            }
            case START: {
                this.handleStart(row);
                break;
            }
            case COMMIT: {
                this.handleCommit(partition, row);
                break;
            }
            case ROLLBACK: {
                this.handleRollback(row);
                break;
            }
            case DDL: {
                this.handleSchemaChange(row);
                break;
            }
            case SELECT_LOB_LOCATOR: {
                this.handleSelectLobLocator(row);
                break;
            }
            case LOB_WRITE: {
                this.handleLobWrite(row);
                break;
            }
            case LOB_ERASE: {
                this.handleLobErase(row);
                break;
            }
            case EXTENDED_STRING_BEGIN: {
                this.handleExtendedStringBegin(row);
                break;
            }
            case EXTENDED_STRING_WRITE: {
                this.handleExtendedStringWrite(row);
                break;
            }
            case EXTENDED_STRING_END: {
                this.handleExtendedStringEnd(row);
                break;
            }
            case XML_BEGIN: {
                this.handleXmlBegin(row);
                break;
            }
            case XML_WRITE: {
                this.handleXmlWrite(row);
                break;
            }
            case XML_END: {
                this.handleXmlEnd(row);
                break;
            }
            case INSERT: 
            case UPDATE: 
            case DELETE: {
                this.handleDataEvent(row);
                break;
            }
            case REPLICATION_MARKER: {
                this.handleReplicationMarker(row);
                break;
            }
            case UNSUPPORTED: {
                this.handleUnsupportedEvent(row);
            }
        }
    }

    protected void handleMissingScn(LogMinerEventRow row) {
        LOGGER.warn("Missing SCN detected. {}", (Object)row);
    }

    protected void handleStart(LogMinerEventRow row) {
        String transactionId = row.getTransactionId();
        Transaction transaction = (Transaction)this.getTransactionCache().get(transactionId);
        if (transaction == null && !this.isRecentlyProcessed(transactionId)) {
            this.getTransactionCache().put(transactionId, this.createTransaction(row));
            this.metrics.setActiveTransactionCount(this.getTransactionCache().size());
        } else if (transaction != null && !this.isRecentlyProcessed(transactionId)) {
            LOGGER.trace("Transaction {} is not yet committed and START event detected.", (Object)transactionId);
            this.resetTransactionToStart(transaction);
        }
    }

    protected void resetTransactionToStart(T transaction) {
        transaction.start();
        this.getTransactionCache().put(transaction.getTransactionId(), transaction);
    }

    protected void handleCommit(OraclePartition partition, LogMinerEventRow row) throws InterruptedException {
        String transactionId = row.getTransactionId();
        if (this.isRecentlyProcessed(transactionId)) {
            LOGGER.debug("\tTransaction is already committed, skipped.");
            return;
        }
        T transaction = this.getAndRemoveTransactionFromCache(transactionId);
        if (transaction == null) {
            if (!this.offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) {
                LOGGER.debug("Transaction {} not found in cache with SCN {}, no events to commit.", (Object)transactionId, (Object)row.getScn());
            }
            this.handleCommitNotFoundInBuffer(row);
        }
        Scn smallestScn = this.calculateSmallestScn();
        Scn commitScn = row.getScn();
        if (this.offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) {
            if (transaction != null) {
                if (transaction.getNumberOfEvents() > 0) {
                    Scn lastCommittedScn = this.offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread());
                    LOGGER.debug("Transaction {} has already been processed. Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.", new Object[]{transactionId, this.offsetContext.getCommitScn(), commitScn, lastCommittedScn});
                }
                this.cleanupAfterTransactionRemovedFromCache(transaction, false);
                this.metrics.setActiveTransactionCount(this.getTransactionCache().size());
            }
            return;
        }
        int numEvents = transaction == null ? 0 : this.getTransactionEventCount(transaction);
        boolean skipCommit = row.getThread() == 0 && numEvents == 0;
        LOGGER.debug("{} transaction {} with {} events (scn: {}, thread: {}, oldest buffer scn: {}): {}", new Object[]{skipCommit ? "Skipping commit for" : "Committing", transactionId, numEvents, row.getScn(), row.getThread(), smallestScn, row});
        if (skipCommit) {
            return;
        }
        ++this.counters.commitCount;
        this.offsetContext.getCommitScn().recordCommit(row);
        Instant start = Instant.now();
        boolean dispatchTransactionCommittedEvent = false;
        if (numEvents > 0) {
            boolean skipExcludedUserName = this.isTransactionUserExcluded(transaction);
            dispatchTransactionCommittedEvent = !skipExcludedUserName;
            ZoneOffset databaseOffset = this.metrics.getDatabaseOffset();
            TransactionCommitConsumer.Handler<LogMinerEvent> delegate = (event, eventsProcessed) -> {
                if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) {
                    this.offsetContext.setScn(event.getScn());
                    this.metrics.setOldestScnDetails(event.getScn(), event.getChangeTime());
                }
                this.offsetContext.setEventScn(event.getScn());
                this.offsetContext.setTransactionId(transactionId);
                this.offsetContext.setUserName(transaction.getUserName());
                this.offsetContext.setSourceTime(event.getChangeTime().minusSeconds(databaseOffset.getTotalSeconds()));
                this.offsetContext.setTableId(event.getTableId());
                this.offsetContext.setRedoThread(row.getThread());
                this.offsetContext.setRsId(event.getRsId());
                this.offsetContext.setRowId(event.getRowId());
                if (event instanceof RedoSqlDmlEvent) {
                    this.offsetContext.setRedoSql(((RedoSqlDmlEvent)event).getRedoSql());
                }
                DmlEvent dmlEvent = (DmlEvent)event;
                if (!skipExcludedUserName) {
                    LogMinerChangeRecordEmitter logMinerChangeRecordEmitter = dmlEvent instanceof TruncateEvent ? new LogMinerChangeRecordEmitter(this.connectorConfig, (Partition)partition, (OffsetContext)this.offsetContext, Envelope.Operation.TRUNCATE, dmlEvent.getDmlEntry().getOldValues(), dmlEvent.getDmlEntry().getNewValues(), this.getSchema().tableFor(event.getTableId()), this.getSchema(), Clock.system()) : new LogMinerChangeRecordEmitter(this.connectorConfig, (Partition)partition, (OffsetContext)this.offsetContext, dmlEvent.getEventType(), dmlEvent.getDmlEntry().getOldValues(), dmlEvent.getDmlEntry().getNewValues(), this.getSchema().tableFor(event.getTableId()), this.getSchema(), Clock.system());
                    this.dispatcher.dispatchDataChangeEvent((Partition)partition, (DataCollectionId)event.getTableId(), (ChangeRecordEmitter)logMinerChangeRecordEmitter);
                }
                this.offsetContext.setRedoSql(null);
            };
            try (TransactionCommitConsumer commitConsumer = new TransactionCommitConsumer(delegate, this.connectorConfig, this.schema);){
                int dispatchedEventCount = 0;
                Iterator<LogMinerEvent> iterator = this.getTransactionEventIterator(transaction);
                while (iterator.hasNext()) {
                    if (!this.context.isRunning()) {
                        return;
                    }
                    LogMinerEvent event2 = iterator.next();
                    LOGGER.trace("Dispatching event {} {}", (Object)(++dispatchedEventCount), (Object)event2.getEventType());
                    commitConsumer.accept(event2);
                }
            }
        }
        this.offsetContext.setEventScn(commitScn);
        this.offsetContext.setRsId(row.getRsId());
        this.offsetContext.setRowId("");
        if (dispatchTransactionCommittedEvent) {
            this.dispatcher.dispatchTransactionCommittedEvent((Partition)partition, (OffsetContext)this.offsetContext, transaction.getChangeTime());
        } else {
            this.dispatcher.dispatchHeartbeatEvent((Partition)partition, (OffsetContext)this.offsetContext);
        }
        this.metrics.calculateLagFromSource(row.getChangeTime());
        if (transaction != null) {
            this.finalizeTransactionCommit(transactionId, commitScn);
            this.cleanupAfterTransactionRemovedFromCache(transaction, false);
            this.metrics.setActiveTransactionCount(this.getTransactionCache().size());
        }
        this.metrics.incrementCommittedTransactionCount();
        this.metrics.setCommitScn(commitScn);
        this.metrics.setOffsetScn(this.offsetContext.getScn());
        this.metrics.setLastCommitDuration(Duration.between(start, Instant.now()));
    }

    private Scn calculateSmallestScn() {
        Scn smallestScn;
        Optional<T> oldestTransaction = this.getOldestTransactionInCache();
        if (oldestTransaction.isPresent()) {
            smallestScn = ((Transaction)oldestTransaction.get()).getStartScn();
            this.metrics.setOldestScnDetails(smallestScn, ((Transaction)oldestTransaction.get()).getChangeTime());
        } else {
            smallestScn = Scn.NULL;
            this.metrics.setOldestScnDetails(Scn.valueOf(-1), null);
        }
        return smallestScn;
    }

    protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) {
        this.abandonedTransactionsCache.remove(row.getTransactionId());
    }

    protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) {
        this.abandonedTransactionsCache.remove(row.getTransactionId());
    }

    protected void purgeCache(Scn minCacheScn) {
        this.getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf((String)entry.getValue()).compareTo(minCacheScn) < 0);
        this.getSchemaChangesCache().removeIf(entry -> Scn.valueOf((String)entry.getKey()).compareTo(minCacheScn) < 0);
    }

    protected T getAndRemoveTransactionFromCache(String transactionId) {
        return (T)((Transaction)this.getTransactionCache().remove(transactionId));
    }

    protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean isAbandoned) {
        if (isAbandoned) {
            this.abandonedTransactionsCache.add(transaction.getTransactionId());
        } else {
            this.abandonedTransactionsCache.remove(transaction.getTransactionId());
        }
        this.removeEventsWithTransaction(transaction);
    }

    protected Iterator<LogMinerEvent> getTransactionEventIterator(final T transaction) {
        return new Iterator<LogMinerEvent>(){
            private final int count;
            private LogMinerEvent nextEvent;
            private int index;
            {
                this.count = transaction.getNumberOfEvents();
                this.index = 0;
            }

            @Override
            public boolean hasNext() {
                while (this.index < this.count) {
                    this.nextEvent = AbstractLogMinerEventProcessor.this.getEventCache().get(transaction.getEventId(this.index));
                    if (this.nextEvent != null) break;
                    LOGGER.debug("Event {} must have been undone, skipped.", (Object)this.index);
                    ++this.index;
                }
                return this.index < this.count;
            }

            @Override
            public LogMinerEvent next() {
                ++this.index;
                return this.nextEvent;
            }
        };
    }

    protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
        this.getAbandonedTransactionsCache().remove(transactionId);
        if (this.getConfig().isLobEnabled()) {
            this.getProcessedTransactionsCache().put(transactionId, commitScn.toString());
        }
    }

    protected String getFirstActiveTransactionKey() {
        return this.getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getKey).findFirst()).orElse(null);
    }

    protected boolean isTransactionUserExcluded(T transaction) {
        if (transaction != null) {
            if (transaction.getUserName() == null && this.getTransactionEventCount(transaction) > 0) {
                LOGGER.debug("Detected transaction with null username {}", transaction);
                return false;
            }
            if (this.connectorConfig.getLogMiningUsernameExcludes().contains(transaction.getUserName())) {
                LOGGER.debug("Skipped transaction with excluded username {}", transaction);
                return true;
            }
        }
        return false;
    }

    protected void handleRollback(LogMinerEventRow row) {
        if (this.getTransactionCache().containsKey(row.getTransactionId())) {
            LOGGER.debug("Transaction {} was rolled back.", (Object)row.getTransactionId());
            this.finalizeTransactionRollback(row.getTransactionId(), row.getScn());
            this.metrics.setActiveTransactionCount(this.getTransactionCache().size());
        } else {
            LOGGER.debug("Transaction {} not found in cache, no events to rollback.", (Object)row.getTransactionId());
            this.handleRollbackNotFoundInBuffer(row);
        }
        this.metrics.incrementRolledBackTransactionCount();
        this.metrics.addRolledBackTransactionId(row.getTransactionId());
        ++this.counters.rollbackCount;
    }

    protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) {
        Transaction transaction = (Transaction)this.getTransactionCache().get(transactionId);
        if (transaction != null) {
            this.removeEventsWithTransaction(transaction);
            this.getTransactionCache().remove(transactionId);
        }
        this.getAbandonedTransactionsCache().remove(transactionId);
        if (this.getConfig().isLobEnabled()) {
            this.getProcessedTransactionsCache().put(transactionId, rollbackScn.toString());
        }
    }

    protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
        if (row.getTableId() != null) {
            boolean isExcluded = this.connectorConfig.getLogMiningSchemaChangesUsernameExcludes().stream().anyMatch(userName -> userName.equalsIgnoreCase(row.getUserName()));
            if (isExcluded) {
                LOGGER.trace("User '{}' is in schema change exclusions, DDL '{}' skipped.", (Object)row.getUserName(), (Object)row.getRedoSql());
                return;
            }
            if (row.getInfo() != null && row.getInfo().startsWith("INTERNAL DDL")) {
                LOGGER.trace("Internal DDL '{}' skipped", (Object)row.getRedoSql());
                return;
            }
            if (this.schema.storeOnlyCapturedTables() && !this.tableFilter.isIncluded(row.getTableId())) {
                LOGGER.trace("Skipping DDL associated with table '{}', schema history only stores included tables only.", (Object)row.getTableId());
                return;
            }
        }
        if (this.hasSchemaChangeBeenSeen(row)) {
            LOGGER.trace("DDL: Scn {}, SQL '{}' has already been processed, skipped.", (Object)row.getScn(), (Object)row.getRedoSql());
            return;
        }
        if (this.offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) {
            Scn commitScn = this.offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread());
            LOGGER.trace("DDL: SQL '{}' skipped with {} (SCN) <= {} (commit SCN for redo thread {})", new Object[]{row.getRedoSql(), row.getScn(), commitScn, row.getThread()});
            return;
        }
        LOGGER.trace("DDL: '{}' {}", (Object)row.getRedoSql(), (Object)row);
        if (row.getTableName() != null) {
            String transactionId;
            ++this.counters.ddlCount;
            TableId tableId = row.getTableId();
            int activeTransactions = this.getTransactionCache().size();
            boolean advanceLowerScnBoundary = false;
            if (activeTransactions == 0) {
                advanceLowerScnBoundary = true;
            } else if (activeTransactions == 1 && (transactionId = this.getFirstActiveTransactionKey()).equals(row.getTransactionId())) {
                advanceLowerScnBoundary = true;
            }
            if (advanceLowerScnBoundary) {
                LOGGER.debug("Schema change advanced offset SCN to {}", (Object)row.getScn());
                this.offsetContext.setScn(row.getScn());
            }
            LOGGER.debug("Schema change advanced offset commit SCN to {} for thread {}", (Object)row.getScn(), (Object)row.getThread());
            this.offsetContext.getCommitScn().recordCommit(row);
            this.offsetContext.setEventScn(row.getScn());
            this.offsetContext.setRedoThread(row.getThread());
            this.offsetContext.setRsId(row.getRsId());
            this.offsetContext.setRowId("");
            if (this.getConfig().isLobEnabled()) {
                this.getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier());
            }
            this.dispatcher.dispatchSchemaChangeEvent((Partition)this.partition, (OffsetContext)this.offsetContext, (DataCollectionId)tableId, (SchemaChangeEventEmitter)new OracleSchemaChangeEventEmitter(this.getConfig(), this.partition, this.offsetContext, tableId, tableId.catalog(), tableId.schema(), row.getObjectId(), row.getObjectId(), row.getRedoSql(), this.getSchema(), row.getChangeTime(), this.metrics, () -> this.processTruncateEvent(row)));
            if (this.isUsingHybridStrategy()) {
                this.reconstructColumnDmlParser.removeTableFromCache(tableId);
            }
        }
    }

    private void processTruncateEvent(LogMinerEventRow row) {
        LOGGER.debug("Handling truncate event");
        try {
            Table table = this.getTableForDataEvent(row);
            if (table == null) {
                return;
            }
        }
        catch (InterruptedException | SQLException e) {
            LOGGER.warn("Failed to process truncate event.", (Throwable)e);
            return;
        }
        this.addToTransaction(row.getTransactionId(), row, () -> {
            LogMinerDmlEntry dmlEntry = LogMinerDmlEntryImpl.forValuelessDdl();
            dmlEntry.setObjectName(row.getTableName());
            dmlEntry.setObjectOwner(row.getTablespaceName());
            return new TruncateEvent(row, dmlEntry);
        });
    }

    protected void handleSelectLobLocator(LogMinerEventRow row) {
        if (!this.getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, SEL_LOB_LOCATOR '{}' skipped.", (Object)row.getRedoSql());
            return;
        }
        LOGGER.debug("SEL_LOB_LOCATOR: {}", (Object)row);
        TableId tableId = row.getTableId();
        Table table = this.getSchema().tableFor(tableId);
        if (table == null) {
            LOGGER.warn("SEL_LOB_LOCATOR for table '{}' is not known, skipped.", (Object)tableId);
            return;
        }
        this.addToTransaction(row.getTransactionId(), row, () -> {
            LogMinerDmlEntry dmlEntry = this.selectLobParser.parse(row.getRedoSql(), table);
            dmlEntry.setObjectName(row.getTableName());
            dmlEntry.setObjectOwner(row.getTablespaceName());
            return new SelectLobLocatorEvent(row, dmlEntry, this.selectLobParser.getColumnName(), this.selectLobParser.isBinary());
        });
        this.metrics.incrementTotalChangesCount();
    }

    protected void handleLobWrite(LogMinerEventRow row) {
        if (!this.getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, LOB_WRITE scn={}, tableId={} skipped", (Object)row.getScn(), (Object)row.getTableId());
            return;
        }
        LOGGER.debug("LOB_WRITE: scn={}, tableId={}, changeTime={}, transactionId={}", new Object[]{row.getScn(), row.getTableId(), row.getChangeTime(), row.getTransactionId()});
        TableId tableId = row.getTableId();
        Table table = this.getSchema().tableFor(tableId);
        if (table == null) {
            LOGGER.warn("LOB_WRITE for table '{}' is not known, skipped", (Object)tableId);
            return;
        }
        if (row.getRedoSql() != null) {
            this.addToTransaction(row.getTransactionId(), row, () -> {
                ParsedLobWriteSql parsed = this.parseLobWriteSql(row.getRedoSql());
                return new LobWriteEvent(row, parsed.data, parsed.offset, parsed.length);
            });
        }
    }

    private void handleLobErase(LogMinerEventRow row) {
        if (!this.getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, LOB_ERASE '{}' skipped", (Object)row);
            return;
        }
        LOGGER.debug("LOB_ERASE: {}", (Object)row);
        TableId tableId = row.getTableId();
        Table table = this.getSchema().tableFor(tableId);
        if (table == null) {
            LOGGER.warn("LOB_ERASE for table '{}' is not known, skipped", (Object)tableId);
            return;
        }
        this.addToTransaction(row.getTransactionId(), row, () -> new LobEraseEvent(row));
    }

    private void handleExtendedStringBegin(LogMinerEventRow row) {
        if (!this.getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, 32K_START '{}' skipped.", (Object)row.getRedoSql());
            return;
        }
        LOGGER.debug("32K_BEGIN: {}", (Object)row);
        TableId tableId = row.getTableId();
        Table table = this.getSchema().tableFor(tableId);
        if (table == null) {
            LOGGER.warn("32K_BEGIN for table '{}' is not known, skipped.", (Object)tableId);
            return;
        }
        this.addToTransaction(row.getTransactionId(), row, () -> {
            LogMinerDmlEntry dmlEntry = this.extendedStringParser.parse(row.getRedoSql(), table);
            dmlEntry.setObjectName(row.getTableName());
            dmlEntry.setObjectOwner(row.getTablespaceName());
            String columnName = this.extendedStringParser.getColumnName();
            return new ExtendedStringBeginEvent(row, dmlEntry, columnName);
        });
        this.metrics.incrementTotalChangesCount();
    }

    private void handleExtendedStringWrite(LogMinerEventRow row) {
        if (!this.getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, 32K_WRITE '{}' skipped.", (Object)row.getRedoSql());
            return;
        }
        LOGGER.debug("32K_WRITE: scn={}, tableId={}, changeTime={}, transactionId={}", new Object[]{row.getScn(), row.getTableId(), row.getChangeTime(), row.getTransactionId()});
        TableId tableId = row.getTableId();
        Table table = this.getSchema().tableFor(tableId);
        if (table == null) {
            LOGGER.warn("32K_WRITE for table '{}' is not known, skipped", (Object)tableId);
            return;
        }
        if (row.getRedoSql() != null) {
            this.addToTransaction(row.getTransactionId(), row, () -> {
                String data = this.parseExtendedStringWriteSql(row.getRedoSql());
                return new ExtendedStringWriteEvent(row, data);
            });
        }
    }

    private void handleExtendedStringEnd(LogMinerEventRow row) {
        if (!this.getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, 32K_END '{}' skipped.", (Object)row.getRedoSql());
        }
    }

    private void handleXmlBegin(LogMinerEventRow row) {
        if (!this.getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, XML_BEGIN '{}' skipped.", (Object)row.getRedoSql());
            return;
        }
        LOGGER.trace("XML_BEGIN: {}", (Object)row);
        TableId tableId = row.getTableId();
        Table table = this.getSchema().tableFor(tableId);
        if (table == null) {
            LOGGER.warn("XML_BEGIN for table '{}' is not known, skipped.", (Object)tableId);
            return;
        }
        this.addToTransaction(row.getTransactionId(), row, () -> {
            LogMinerDmlEntry dmlEntry = this.xmlBeginParser.parse(row.getRedoSql(), table);
            dmlEntry.setObjectName(row.getTableName());
            dmlEntry.setObjectOwner(row.getTablespaceName());
            return new XmlBeginEvent(row, dmlEntry, this.xmlBeginParser.getColumnName());
        });
    }

    private void handleXmlWrite(LogMinerEventRow row) {
        if (!this.getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, XML_WRITE '{}' skipped.", (Object)row.getRedoSql());
            return;
        }
        LOGGER.trace("XML_WRITE: {}", (Object)row);
        TableId tableId = row.getTableId();
        Table table = this.getSchema().tableFor(tableId);
        if (table == null) {
            LOGGER.warn("XML_WRITE for table '{}' is not known, skipped.", (Object)tableId);
            return;
        }
        this.addToTransaction(row.getTransactionId(), row, () -> this.getXmlWriteEventFromRow(row));
    }

    private XmlWriteEvent getXmlWriteEventFromRow(LogMinerEventRow row) {
        String sql = row.getRedoSql();
        if (!sql.startsWith(XML_WRITE_PREAMBLE)) {
            throw new ParsingException(null, "XML write operation does not start with XML_REDO preamble");
        }
        try {
            String xml;
            if (XML_WRITE_PREAMBLE_NULL.equals(sql)) {
                return new XmlWriteEvent(row, null, 0);
            }
            if (sql.charAt(XML_WRITE_PREAMBLE.length()) == '\'') {
                int lastQuoteIndex = sql.lastIndexOf(39);
                if (lastQuoteIndex == -1) {
                    throw new IllegalStateException("Failed to find end of XML document");
                }
                xml = sql.substring(XML_WRITE_PREAMBLE.length() + 1, lastQuoteIndex);
            } else {
                int lastParenIndex = sql.lastIndexOf(41);
                if (lastParenIndex == -1) {
                    throw new IllegalStateException("Failed to find end of XML document");
                }
                String xmlHex = sql.substring(XML_WRITE_PREAMBLE.length(), lastParenIndex + 1);
                if (!xmlHex.startsWith("HEXTORAW('") || !xmlHex.endsWith(")")) {
                    throw new IllegalStateException("Invalid HEXTORAW XML decoded data");
                }
                xmlHex = xmlHex.endsWith("')") ? xmlHex.substring(10, xmlHex.length() - 2) : xmlHex.substring(10, xmlHex.length() - 1);
                xml = new String(RAW.hexString2Bytes((String)xmlHex), StandardCharsets.UTF_8);
            }
            int lastColonIndex = sql.lastIndexOf(58);
            if (lastColonIndex == -1) {
                throw new IllegalStateException("Failed to find XML document length");
            }
            Integer length = Integer.parseInt(sql.substring(lastColonIndex + 1).trim());
            return new XmlWriteEvent(row, xml, length);
        }
        catch (Exception e) {
            throw new ParsingException(null, "Failed to parse XML write data", (Throwable)e);
        }
    }

    private void handleXmlEnd(LogMinerEventRow row) {
        if (!this.getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, XML_END skipped.");
            return;
        }
        LOGGER.trace("XML_END: {}", (Object)row);
        TableId tableId = row.getTableId();
        Table table = this.getSchema().tableFor(tableId);
        if (table == null) {
            LOGGER.warn("XM_END for table ' {}' is not known, skipped.", (Object)tableId);
            return;
        }
        this.addToTransaction(row.getTransactionId(), row, () -> new XmlEndEvent(row));
    }

    protected void handleDataEvent(LogMinerEventRow row) throws SQLException, InterruptedException {
        if (row.getRedoSql() == null) {
            return;
        }
        LOGGER.debug("DML: {}", (Object)row);
        LOGGER.trace("\t{}", (Object)row.getRedoSql());
        if (row.getStatus() == 2 && !Strings.isNullOrBlank((String)row.getInfo()) && (!this.isUsingHybridStrategy() || this.isUsingHybridStrategy() && !this.isTableKnown(row))) {
            switch (this.connectorConfig.getEventProcessingFailureHandlingMode()) {
                case FAIL: {
                    LOGGER.error("Oracle LogMiner is unable to re-construct the SQL for '{}'", (Object)row);
                    throw new DebeziumException("Oracle failed to re-construct redo SQL '" + row.getRedoSql() + "'");
                }
                case WARN: {
                    LOGGER.warn("Oracle LogMiner event '{}' cannot be parsed. This event will be ignored and skipped.", (Object)row);
                    return;
                }
            }
            LOGGER.debug("Oracle LogMiner event '{}' cannot be parsed. This event will be ignored and skipped.", (Object)row);
            return;
        }
        ++this.counters.dmlCount;
        switch (row.getEventType()) {
            case INSERT: {
                ++this.counters.insertCount;
                break;
            }
            case UPDATE: {
                ++this.counters.updateCount;
                break;
            }
            case DELETE: {
                ++this.counters.deleteCount;
            }
        }
        Table table = this.getTableForDataEvent(row);
        if (table == null) {
            return;
        }
        if (row.isRollbackFlag()) {
            this.removeEventWithRowId(row);
            return;
        }
        this.addToTransaction(row.getTransactionId(), row, () -> {
            LogMinerDmlEntry dmlEntry = this.parseDmlStatement(row, table);
            dmlEntry.setObjectName(row.getTableName());
            dmlEntry.setObjectOwner(row.getTablespaceName());
            if (this.connectorConfig.isLogMiningIncludeRedoSql()) {
                return new RedoSqlDmlEvent(row, dmlEntry, row.getRedoSql());
            }
            return new DmlEvent(row, dmlEntry);
        });
        this.metrics.incrementTotalChangesCount();
    }

    protected void handleReplicationMarker(LogMinerEventRow row) {
        String transactionId = row.getTransactionId();
        Transaction transaction = (Transaction)this.getTransactionCache().get(transactionId);
        if (transaction != null) {
            LOGGER.debug("Skipping GoldenGate replication marker for transaction {} with SCN {}", (Object)transactionId, (Object)row.getScn());
            this.removeEventsWithTransaction(transaction);
            this.getTransactionCache().remove(transactionId);
        }
        this.getAbandonedTransactionsCache().remove(transactionId);
    }

    protected void handleUnsupportedEvent(LogMinerEventRow row) {
        if (!Strings.isNullOrEmpty((String)row.getTableName())) {
            LOGGER.warn("An unsupported operation detected for table '{}' in transaction {} with SCN {} on redo thread {}.", new Object[]{row.getTableId(), row.getTransactionId(), row.getScn(), row.getThread()});
            LOGGER.debug("\t{}", (Object)row);
        }
    }

    protected void warnPotentiallyStuckScn(Scn previousOffsetScn, Map<Integer, Scn> previousOffsetCommitScns) {
        if (this.offsetContext != null && this.offsetContext.getCommitScn() != null) {
            Scn scn = this.offsetContext.getScn();
            Map<Integer, Scn> commitScns = this.offsetContext.getCommitScn().getCommitScnForAllRedoThreads();
            if (previousOffsetScn.equals(scn) && !previousOffsetCommitScns.equals(commitScns)) {
                ++this.counters.stuckCount;
                if (this.counters.stuckCount == 25) {
                    LOGGER.warn("Offset SCN {} has not changed in 25 mining session iterations. This indicates long running transaction(s) are active.  Commit SCNs {}.", (Object)previousOffsetScn, previousOffsetCommitScns);
                    this.metrics.incrementScnFreezeCount();
                    this.counters.stuckCount = 0;
                }
            } else {
                this.metrics.setScnFreezeCount(0L);
                this.counters.stuckCount = 0;
            }
        }
    }

    private Table getTableForDataEvent(LogMinerEventRow row) throws SQLException, InterruptedException {
        TableId tableId = this.getTableIdForDataEvent(row);
        Table table = this.getSchema().tableFor(tableId);
        if (table == null) {
            if (!this.getConfig().getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
                return null;
            }
            table = this.dispatchSchemaChangeEventAndGetTableForNewCapturedTable(tableId, this.offsetContext, this.dispatcher);
        }
        return table;
    }

    private TableId getTableIdForDataEvent(LogMinerEventRow row) throws SQLException {
        TableId tableId = row.getTableId();
        if (this.isUsingHybridStrategy()) {
            if (tableId.table().startsWith("BIN$")) {
                try (OracleConnection connection = new OracleConnection(this.connectorConfig.getJdbcConfig());){
                    TableId tableId2 = (TableId)connection.prepareQueryAndMap("SELECT OWNER, ORIGINAL_NAME FROM DBA_RECYCLEBIN WHERE OBJECT_NAME=?", ps -> ps.setString(1, row.getTableId().table()), rs -> {
                        if (rs.next()) {
                            return new TableId(row.getTableId().catalog(), rs.getString(1), rs.getString(2));
                        }
                        return row.getTableId();
                    });
                    return tableId2;
                }
            }
            if (tableId.table().equalsIgnoreCase("UNKNOWN")) {
                TableId resolvedTableId = this.schema.getTableIdByObjectId(row.getObjectId(), row.getDataObjectId());
                if (resolvedTableId != null) {
                    return resolvedTableId;
                }
                throw new DebeziumException("Failed to resolve UNKNOWN table name by object id lookup");
            }
        }
        return tableId;
    }

    private boolean isTableKnown(LogMinerEventRow row) {
        return !row.getTableId().table().equalsIgnoreCase("UNKNOWN");
    }

    private boolean isTableLookupByObjectIdRequired(LogMinerEventRow row) {
        String tableName = row.getTableId().table();
        if (tableName.startsWith("OBJ# ")) {
            return true;
        }
        return tableName.startsWith("BIN$") && tableName.endsWith("==$0") && tableName.length() == 30;
    }

    private boolean hasNextWithMetricsUpdate(ResultSet resultSet) throws SQLException {
        Instant start = Instant.now();
        boolean result = false;
        try {
            if (resultSet.next()) {
                this.metrics.setLastResultSetNextDuration(Duration.between(start, Instant.now()));
                result = true;
            }
            if (this.sequenceUnavailable) {
                LOGGER.debug("The previous batch's unavailable log problem has been cleared.");
                this.sequenceUnavailable = false;
            }
        }
        catch (SQLException e) {
            if (!e.getMessage().startsWith("ORA-00310")) {
                throw e;
            }
            if (this.sequenceUnavailable) {
                LOGGER.error("The log availability error '{}' wasn't cleared, stop requested.", (Object)e.getMessage());
                throw e;
            }
            LOGGER.debug("A mined log is no longer available: {}", (Object)e.getMessage());
            LOGGER.warn("Restarting mining session after a log became unavailable.");
            this.sequenceUnavailable = true;
        }
        return result;
    }

    protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
        LogMinerEvent event;
        if (this.getAbandonedTransactionsCache().contains(transactionId)) {
            LOGGER.warn("Event for abandoned transaction {}, skipped.", (Object)transactionId);
            return;
        }
        if (this.isRecentlyProcessed(transactionId)) {
            LOGGER.warn("Event for transaction {} skipped as transaction has been processed.", (Object)transactionId);
            return;
        }
        Transaction transaction = (Transaction)this.getTransactionCache().get(transactionId);
        if (transaction == null) {
            LOGGER.trace("Transaction {} is not in cache, creating.", (Object)transactionId);
            transaction = this.createTransaction(row);
        }
        if (this.isTransactionOverEventThreshold(transaction)) {
            this.abandonTransactionOverEventThreshold(transaction);
            return;
        }
        try {
            event = eventSupplier.get();
        }
        catch (DmlParserException e) {
            switch (this.connectorConfig.getEventProcessingFailureHandlingMode()) {
                case FAIL: {
                    Loggings.logErrorAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Failed to parse SQL for event", (Object[])new Object[0]);
                    throw e;
                }
                case WARN: {
                    Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Failed to parse redo SQL, event is being ignored and skipped.", (Object[])new Object[0]);
                    return;
                }
            }
            Loggings.logDebugAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Failed to parse redo SQL, event is being ignored and skipped.", (Object[])new Object[0]);
            return;
        }
        String eventKey = transaction.getEventId(transaction.getNextEventId());
        if (!this.getEventCache().containsKey(eventKey)) {
            LOGGER.trace("Transaction {}, adding event reference at key {}", (Object)transactionId, (Object)eventKey);
            this.getEventCache().put(eventKey, event);
            this.metrics.calculateLagFromSource(row.getChangeTime());
            this.inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId());
        }
        this.getTransactionCache().put(transactionId, transaction);
        this.metrics.setActiveTransactionCount(this.getTransactionCache().size());
    }

    public Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId tableId, OracleOffsetContext offsetContext, EventDispatcher<OraclePartition, TableId> dispatcher) throws SQLException, InterruptedException {
        Table table;
        LOGGER.warn("Obtaining schema for table {}, which should be already loaded, this may signal potential bug in fetching table schemas.", (Object)tableId);
        OracleConnection connection = new OracleConnection(this.connectorConfig.getJdbcConfig(), false);
        try {
            connection.setAutoCommit(false);
            if (!Strings.isNullOrBlank((String)this.connectorConfig.getPdbName())) {
                connection.setSessionToPdb(this.connectorConfig.getPdbName());
            }
            String tableDdl = this.getTableMetadataDdl(connection, tableId);
            Long objectId = connection.getTableObjectId(tableId);
            Long dataObjectId = connection.getTableDataObjectId(tableId);
            dispatcher.dispatchSchemaChangeEvent((Partition)this.partition, (OffsetContext)offsetContext, (DataCollectionId)tableId, (SchemaChangeEventEmitter)new OracleSchemaChangeEventEmitter(this.connectorConfig, this.partition, offsetContext, tableId, tableId.catalog(), tableId.schema(), objectId, dataObjectId, tableDdl, this.getSchema(), Instant.now(), this.metrics, null));
            table = this.getSchema().tableFor(tableId);
        }
        catch (Throwable throwable) {
            try {
                try {
                    connection.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (OracleConnection.NonRelationalTableException e) {
                LOGGER.warn("Table {} is not a relational table and will be skipped.", (Object)tableId);
                this.metrics.incrementWarningCount();
                return null;
            }
        }
        connection.close();
        return table;
    }

    public String getTableMetadataDdl(OracleConnection connection, TableId tableId) throws SQLException, OracleConnection.NonRelationalTableException {
        ++this.counters.tableMetadataCount;
        LOGGER.info("Getting database metadata for table '{}'", (Object)tableId);
        return connection.getTableMetadataDdl(tableId);
    }

    private LogMinerDmlEntry parseDmlStatement(LogMinerEventRow row, Table table) {
        LogMinerDmlEntry dmlEntry;
        String redoSql = row.getRedoSql();
        try {
            Instant parseStart = Instant.now();
            dmlEntry = this.resolveParser(row).parse(redoSql, table);
            this.metrics.setLastParseTimeDuration(Duration.between(parseStart, Instant.now()));
        }
        catch (DmlParserException e) {
            String message = "DML statement couldn't be parsed. Please open a Jira issue with the statement '" + redoSql + "'.";
            throw new DmlParserException(message, (Throwable)((Object)e));
        }
        if (dmlEntry.getOldValues().length == 0 && (EventType.UPDATE == dmlEntry.getEventType() || EventType.DELETE == dmlEntry.getEventType())) {
            LOGGER.warn("The DML event '{}' contained no before state.", (Object)redoSql);
            this.metrics.incrementWarningCount();
        }
        return dmlEntry;
    }

    private LogMinerDmlParser resolveParser(LogMinerEventRow row) {
        if (row.getStatus() == 2 && !Strings.isNullOrBlank((String)row.getInfo()) && this.isUsingHybridStrategy()) {
            return this.reconstructColumnDmlParser;
        }
        return this.dmlParser;
    }

    private boolean isUsingHybridStrategy() {
        return OracleConnectorConfig.LogMiningStrategy.HYBRID.equals((Object)this.connectorConfig.getLogMiningStrategy());
    }

    private ParsedLobWriteSql parseLobWriteSql(String sql) {
        if (sql == null) {
            return null;
        }
        Matcher m = LOB_WRITE_SQL_PATTERN.matcher(sql.trim());
        if (!m.matches()) {
            throw new DebeziumException("Unable to parse unsupported LOB_WRITE SQL: " + sql);
        }
        String data = m.group(1);
        if (data.startsWith("'")) {
            data = data.substring(1, data.length() - 1);
        }
        int length = Integer.parseInt(m.group(2));
        int offset = Integer.parseInt(m.group(3)) - 1;
        if (data.contains("''")) {
            data = data.replaceAll("''", "'");
        }
        return new ParsedLobWriteSql(offset, length, data);
    }

    private String parseExtendedStringWriteSql(String sql) {
        int endIndex = sql.lastIndexOf(";");
        if (endIndex == -1) {
            throw new DebeziumException("Failed to find end index on 32K_WRITE operation");
        }
        if ((endIndex = sql.lastIndexOf(";", endIndex - 1)) == -1) {
            throw new DebeziumException("Failed to find end index on 32K_WRITE operation");
        }
        return sql.substring(12, endIndex - 1);
    }

    protected Scn getTransactionCacheMinimumScn() {
        return this.getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue).map(Transaction::getStartScn).min(Scn::compareTo).orElse(Scn.NULL));
    }

    protected Optional<T> getOldestTransactionInCache() {
        return this.getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue).min(this::oldestTransactionComparison));
    }

    protected boolean isTransactionIdWithNoSequence(String transactionId) {
        return transactionId.endsWith(NO_SEQUENCE_TRX_ID_SUFFIX);
    }

    protected String getTransactionIdPrefix(String transactionId) {
        return transactionId.substring(0, 8);
    }

    protected boolean isTransactionOverEventThreshold(T transaction) {
        if (this.getConfig().getLogMiningBufferTransactionEventsThreshold() == 0L) {
            return false;
        }
        return (long)this.getTransactionEventCount(transaction) >= this.getConfig().getLogMiningBufferTransactionEventsThreshold();
    }

    protected void abandonTransactionOverEventThreshold(T transaction) {
        LOGGER.warn("Transaction {} exceeds maximum allowed number of events, transaction will be abandoned.", (Object)transaction.getTransactionId());
        this.metrics.incrementWarningCount();
        this.getAndRemoveTransactionFromCache(transaction.getTransactionId());
        this.abandonedTransactionsCache.add(transaction.getTransactionId());
        this.metrics.incrementOversizedTransactionCount();
    }

    @Override
    public void abandonTransactions(Duration retention) throws InterruptedException {
        Optional<Scn> lastScnToAbandonTransactions;
        if (!Duration.ZERO.equals(retention) && (lastScnToAbandonTransactions = this.getLastScnToAbandon(this.jdbcConnection, retention)).isPresent()) {
            Scn thresholdScn = lastScnToAbandonTransactions.get();
            Scn smallestScn = this.getTransactionCacheMinimumScn();
            if (!smallestScn.isNull() && thresholdScn.compareTo(smallestScn) >= 0) {
                Optional<T> oldestTransaction;
                LogMinerCache<String, Map> transactionCache = this.getTransactionCache();
                Map abandoned = transactionCache.streamAndReturn(stream -> stream.filter(e -> ((Transaction)e.getValue()).getStartScn().compareTo(thresholdScn) <= 0).collect(Collectors.toMap(LogMinerCache.Entry::getKey, LogMinerCache.Entry::getValue)));
                boolean first = true;
                for (Map.Entry entry : abandoned.entrySet()) {
                    if (first) {
                        LOGGER.warn("All transactions with SCN <= {} will be abandoned.", (Object)thresholdScn);
                        first = false;
                    }
                    String key = (String)entry.getKey();
                    Transaction value = (Transaction)entry.getValue();
                    LOGGER.warn("Transaction {} (start SCN {}, change time {}, redo thread {}, {} events{}) is being abandoned.", new Object[]{key, value.getStartScn(), value.getChangeTime(), value.getRedoThreadId(), value.getNumberOfEvents(), this.getLoggedAbandonedTransactionTableNames(value)});
                    this.cleanupAfterTransactionRemovedFromCache(value, true);
                    transactionCache.remove(key);
                    this.metrics.addAbandonedTransactionId(key);
                    this.metrics.setActiveTransactionCount(transactionCache.size());
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("List of transactions in the cache before transactions being abandoned: [{}]", (Object)String.join((CharSequence)",", abandoned.keySet()));
                    transactionCache.keys(keys -> LOGGER.debug("List of transactions in the cache after transactions being abandoned: [{}]", (Object)keys.collect(Collectors.joining(","))));
                }
                if ((oldestTransaction = this.getOldestTransactionInCache()).isPresent()) {
                    Transaction transaction = (Transaction)oldestTransaction.get();
                    this.metrics.setOldestScnDetails(transaction.getStartScn(), transaction.getChangeTime());
                } else {
                    this.metrics.setOldestScnDetails(Scn.NULL, null);
                }
                this.offsetContext.setScn(thresholdScn);
            }
            this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
        }
    }

    protected String getLoggedAbandonedTransactionTableNames(T transaction) {
        if (ABANDONED_DETAILS_LOGGER.isDebugEnabled()) {
            HashSet<String> tableNames = new HashSet<String>();
            Iterator<LogMinerEvent> eventIterator = this.getTransactionEventIterator(transaction);
            while (eventIterator.hasNext()) {
                LogMinerEvent event = eventIterator.next();
                tableNames.add(event.getTableId().identifier());
            }
            return String.format(", %d tables [%s]", tableNames.size(), String.join((CharSequence)",", tableNames));
        }
        return "";
    }

    protected Optional<Scn> getLastScnToAbandon(OracleConnection connection, Duration retention) {
        try {
            if (this.getLastProcessedScn().isNull()) {
                return Optional.empty();
            }
            BigInteger scnToAbandon = (BigInteger)connection.singleOptionalValue(SqlUtils.getScnByTimeDeltaQuery(this.getLastProcessedScn(), retention), rs -> rs.getBigDecimal(1).toBigInteger());
            return Optional.of(new Scn(scnToAbandon));
        }
        catch (SQLException e) {
            Scn calculatedLastScn;
            if (this.getLastProcessedScnChangeTime() != null && !(calculatedLastScn = this.getLastScnToAbandonFallbackByTransactionChangeTime(retention)).isNull()) {
                return Optional.of(calculatedLastScn);
            }
            LOGGER.error(String.format("Cannot fetch SCN %s by given duration to calculate SCN to abandon", this.getLastProcessedScn()), (Throwable)e);
            this.metrics.incrementErrorCount();
            return Optional.empty();
        }
    }

    private Scn getLastScnToAbandonFallbackByTransactionChangeTime(Duration retention) {
        LOGGER.debug("Getting abandon SCN breakpoint based on change time {} (retention {} minutes).", (Object)this.getLastProcessedScnChangeTime(), (Object)retention.toMinutes());
        return this.getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue).filter(t -> {
            Instant changeTime = t.getChangeTime();
            long diffMinutes = Duration.between(this.getLastProcessedScnChangeTime(), changeTime).abs().toMinutes();
            LOGGER.debug("Transaction {} with SCN {} started at {}, age is {} minutes.", new Object[]{t.getTransactionId(), t.getStartScn(), changeTime, diffMinutes});
            return diffMinutes > 0L && diffMinutes > retention.toMinutes();
        }).max(this::compareStartScn).map(Transaction::getStartScn).orElse(Scn.NULL));
    }

    private void removeEventsWithTransaction(T transaction) {
        for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
            this.getEventCache().remove(transaction.getEventId(i));
        }
        this.inMemoryPendingTransactionsCache.remove(transaction.getTransactionId());
    }

    protected int compareStartScn(T first, T second) {
        return first.getStartScn().compareTo(second.getStartScn());
    }

    protected int oldestTransactionComparison(T first, T second) {
        int comparison = this.compareStartScn(first, second);
        if (comparison == 0) {
            comparison = first.getChangeTime().compareTo(second.getChangeTime());
        }
        return comparison;
    }

    protected static class Counters {
        public int stuckCount;
        public int dmlCount;
        public int ddlCount;
        public int insertCount;
        public int updateCount;
        public int deleteCount;
        public int commitCount;
        public int rollbackCount;
        public int tableMetadataCount;
        public long rows;
        public long partialRollbackCount;

        protected Counters() {
        }

        public void reset() {
            this.dmlCount = 0;
            this.ddlCount = 0;
            this.insertCount = 0;
            this.updateCount = 0;
            this.deleteCount = 0;
            this.commitCount = 0;
            this.rollbackCount = 0;
            this.tableMetadataCount = 0;
            this.rows = 0L;
            this.partialRollbackCount = 0L;
        }

        public String toString() {
            return "Counters{rows=" + this.rows + ", stuckCount=" + this.stuckCount + ", dmlCount=" + this.dmlCount + ", ddlCount=" + this.ddlCount + ", insertCount=" + this.insertCount + ", updateCount=" + this.updateCount + ", deleteCount=" + this.deleteCount + ", commitCount=" + this.commitCount + ", rollbackCount=" + this.rollbackCount + ", tableMetadataCount=" + this.tableMetadataCount + ", partialRollbackCount=" + this.partialRollbackCount + "}";
        }
    }

    private static class ParsedLobWriteSql {
        final int offset;
        final int length;
        final String data;

        ParsedLobWriteSql(int _offset, int _length, String _data) {
            this.offset = _offset;
            this.length = _length;
            this.data = _data;
        }
    }

    private static class EventKeySortComparator
    implements Comparator<String> {
        public static final EventKeySortComparator INSTANCE = new EventKeySortComparator();

        private EventKeySortComparator() {
        }

        @Override
        public int compare(String o1, String o2) {
            String[] s2;
            if (o1 == null || !o1.contains("-")) {
                throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
            }
            if (o2 == null || !o2.contains("-")) {
                throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
            }
            String[] s1 = o1.split("-");
            int result = s1[0].compareTo((s2 = o2.split("-"))[0]);
            if (result == 0) {
                result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1]));
            }
            return result;
        }
    }
}

