/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds.stream;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHelper;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.model.ParentTable;
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector;
import org.opensearch.dataprepper.plugins.source.rds.stream.ChangeEventStatus;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinlogEventListener
implements BinaryLogClient.EventListener {
    private static final Logger LOG = LoggerFactory.getLogger(BinlogEventListener.class);
    static final int DEFAULT_NUM_WORKERS = 1;
    static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60L);
    static final int DEFAULT_BUFFER_BATCH_SIZE = 1000;
    static final String DATA_PREPPER_EVENT_TYPE = "event";
    static final String CHANGE_EVENTS_PROCESSED_COUNT = "changeEventsProcessed";
    static final String CHANGE_EVENTS_PROCESSING_ERROR_COUNT = "changeEventsProcessingErrors";
    static final String BYTES_RECEIVED = "bytesReceived";
    static final String BYTES_PROCESSED = "bytesProcessed";
    static final String REPLICATION_LOG_EVENT_PROCESSING_TIME = "replicationLogEntryProcessingTime";
    static final String REPLICATION_LOG_PROCESSING_ERROR_COUNT = "replicationLogEntryProcessingErrors";
    static final String SEPARATOR = ".";
    private final Map<Long, TableMetadata> tableMetadataMap;
    private final Map<String, ParentTable> parentTableMap;
    private final StreamPartition streamPartition;
    private final StreamRecordConverter recordConverter;
    private final BinaryLogClient binaryLogClient;
    private final Buffer<Record<Event>> buffer;
    private final Set<String> tableNames;
    private final String s3Prefix;
    private final boolean isAcknowledgmentsEnabled;
    private final PluginMetrics pluginMetrics;
    private final List<Event> pipelineEvents;
    private final StreamCheckpointManager streamCheckpointManager;
    private final DbTableMetadata dbTableMetadata;
    private final ExecutorService binlogEventExecutorService;
    private final CascadingActionDetector cascadeActionDetector;
    private final Counter changeEventSuccessCounter;
    private final Counter changeEventErrorCounter;
    private final DistributionSummary bytesReceivedSummary;
    private final DistributionSummary bytesProcessedSummary;
    private final Timer eventProcessingTimer;
    private final Counter eventProcessingErrorCounter;
    private BinlogCoordinate currentBinlogCoordinate;

    public BinlogEventListener(StreamPartition streamPartition, Buffer<Record<Event>> buffer, RdsSourceConfig sourceConfig, String s3Prefix, PluginMetrics pluginMetrics, BinaryLogClient binaryLogClient, StreamCheckpointer streamCheckpointer, AcknowledgementSetManager acknowledgementSetManager, DbTableMetadata dbTableMetadata, CascadingActionDetector cascadeActionDetector) {
        this.streamPartition = streamPartition;
        this.buffer = buffer;
        this.binaryLogClient = binaryLogClient;
        this.tableMetadataMap = new HashMap<Long, TableMetadata>();
        this.recordConverter = new StreamRecordConverter(s3Prefix, sourceConfig.getPartitionCount());
        this.s3Prefix = s3Prefix;
        this.tableNames = dbTableMetadata.getTableColumnDataTypeMap().keySet();
        this.isAcknowledgmentsEnabled = sourceConfig.isAcknowledgmentsEnabled();
        this.pluginMetrics = pluginMetrics;
        this.pipelineEvents = new ArrayList<Event>();
        this.binlogEventExecutorService = Executors.newFixedThreadPool(1, (ThreadFactory)BackgroundThreadFactory.defaultExecutorThreadFactory((String)"rds-source-binlog-processor"));
        this.dbTableMetadata = dbTableMetadata;
        this.streamCheckpointManager = new StreamCheckpointManager(streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(), acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(), sourceConfig.getEngine(), pluginMetrics);
        this.streamCheckpointManager.start();
        this.cascadeActionDetector = cascadeActionDetector;
        this.parentTableMap = cascadeActionDetector.getParentTableMap(streamPartition);
        this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT);
        this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT);
        this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
        this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
        this.eventProcessingTimer = pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME);
        this.eventProcessingErrorCounter = pluginMetrics.counter(REPLICATION_LOG_PROCESSING_ERROR_COUNT);
    }

    public static BinlogEventListener create(StreamPartition streamPartition, Buffer<Record<Event>> buffer, RdsSourceConfig sourceConfig, String s3Prefix, PluginMetrics pluginMetrics, BinaryLogClient binaryLogClient, StreamCheckpointer streamCheckpointer, AcknowledgementSetManager acknowledgementSetManager, DbTableMetadata dbTableMetadata, CascadingActionDetector cascadeActionDetector) {
        return new BinlogEventListener(streamPartition, buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager, dbTableMetadata, cascadeActionDetector);
    }

    public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) {
        EventType eventType = event.getHeader().getEventType();
        switch (eventType) {
            case ROTATE: {
                this.processEvent(event, this::handleRotateEvent);
                break;
            }
            case TABLE_MAP: {
                this.processEvent(event, this::handleTableMapEvent);
                break;
            }
            case WRITE_ROWS: 
            case EXT_WRITE_ROWS: {
                this.processEvent(event, this::handleInsertEvent);
                break;
            }
            case UPDATE_ROWS: 
            case EXT_UPDATE_ROWS: {
                this.processEvent(event, this::handleUpdateEvent);
                break;
            }
            case DELETE_ROWS: 
            case EXT_DELETE_ROWS: {
                this.processEvent(event, this::handleDeleteEvent);
            }
        }
    }

    public void stopClient() {
        try {
            this.binaryLogClient.disconnect();
            this.binaryLogClient.unregisterEventListener((BinaryLogClient.EventListener)this);
            this.binlogEventExecutorService.shutdownNow();
            LOG.info("Binary log client disconnected.");
        }
        catch (Exception e) {
            LOG.error("Binary log client failed to disconnect.", (Throwable)e);
        }
    }

    public void stopCheckpointManager() {
        this.streamCheckpointManager.stop();
    }

    void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
        RotateEventData data = (RotateEventData)event.getData();
        this.currentBinlogCoordinate = new BinlogCoordinate(data.getBinlogFilename(), data.getBinlogPosition());
        if (this.streamCheckpointManager.getChangeEventStatuses().isEmpty()) {
            ChangeEventStatus changeEventStatus = this.streamCheckpointManager.saveChangeEventsStatus(this.currentBinlogCoordinate, 0L);
            if (this.isAcknowledgmentsEnabled) {
                changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.POSITIVE_ACK);
            }
        }
    }

    void handleTableMapEvent(com.github.shyiko.mysql.binlog.event.Event event) {
        String tableName;
        TableMapEventData eventData = (TableMapEventData)event.getData();
        String databaseName = eventData.getDatabase();
        String fullTableName = databaseName + SEPARATOR + (tableName = eventData.getTable());
        if (!this.isTableOfInterest(fullTableName)) {
            return;
        }
        TableMapEventMetadata tableMapEventMetadata = eventData.getEventMetadata();
        List columnNames = tableMapEventMetadata.getColumnNames();
        List<String> primaryKeys = tableMapEventMetadata.getSimplePrimaryKeys().stream().map(columnNames::get).collect(Collectors.toList());
        TableMetadata tableMetadata = TableMetadata.builder().withTableName(tableName).withDatabaseName(databaseName).withColumnNames(columnNames).withPrimaryKeys(primaryKeys).withSetStrValues(this.getSetStrValues(eventData)).withEnumStrValues(this.getEnumStrValues(eventData)).build();
        this.tableMetadataMap.put(eventData.getTableId(), tableMetadata);
    }

    private Map<String, String[]> getSetStrValues(TableMapEventData eventData) {
        return this.getStrValuesMap(eventData, MySQLDataType.SET);
    }

    private Map<String, String[]> getEnumStrValues(TableMapEventData eventData) {
        return this.getStrValuesMap(eventData, MySQLDataType.ENUM);
    }

    private Map<String, String[]> getStrValuesMap(TableMapEventData eventData, MySQLDataType columnType) {
        HashMap<String, String[]> strValuesMap = new HashMap<String, String[]>();
        List columnNames = eventData.getEventMetadata().getColumnNames();
        List<String[]> strValues = this.getStrValues(eventData, columnType);
        Map<String, String> tbMetadata = this.dbTableMetadata.getTableColumnDataTypeMap().get(eventData.getDatabase() + SEPARATOR + eventData.getTable());
        int j = 0;
        for (int i = 0; i < columnNames.size(); ++i) {
            String dataType = tbMetadata.get(columnNames.get(i));
            if (MySQLDataType.byDataType(dataType) != columnType) continue;
            strValuesMap.put((String)columnNames.get(i), strValues.get(j++));
        }
        return strValuesMap;
    }

    private List<String[]> getStrValues(TableMapEventData eventData, MySQLDataType columnType) {
        if (columnType == MySQLDataType.ENUM) {
            return eventData.getEventMetadata().getEnumStrValues();
        }
        if (columnType == MySQLDataType.SET) {
            return eventData.getEventMetadata().getSetStrValues();
        }
        return Collections.emptyList();
    }

    void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) {
        LOG.debug("Handling insert event");
        WriteRowsEventData data = (WriteRowsEventData)event.getData();
        if (!this.isValidTableId(data.getTableId())) {
            return;
        }
        this.handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.INDEX), StreamEventType.INSERT);
    }

    void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
        LOG.debug("Handling update event");
        UpdateRowsEventData data = (UpdateRowsEventData)event.getData();
        if (!this.isValidTableId(data.getTableId())) {
            return;
        }
        this.cascadeActionDetector.detectCascadingUpdates(event, this.parentTableMap, this.tableMetadataMap.get(data.getTableId()));
        TableMetadata tableMetadata = this.tableMetadataMap.get(data.getTableId());
        ArrayList<OpenSearchBulkActions> bulkActions = new ArrayList<OpenSearchBulkActions>();
        ArrayList<Serializable[]> rows = new ArrayList<Serializable[]>();
        for (int rowNum = 0; rowNum < data.getRows().size(); ++rowNum) {
            Map.Entry row = (Map.Entry)data.getRows().get(rowNum);
            for (int i = 0; i < ((Serializable[])row.getKey()).length; ++i) {
                if (!tableMetadata.getPrimaryKeys().contains(tableMetadata.getColumnNames().get(i)) || ((Serializable[])row.getKey())[i].equals(((Serializable[])row.getValue())[i])) continue;
                LOG.debug("Primary keys were updated");
                rows.add((Serializable[])row.getKey());
                bulkActions.add(OpenSearchBulkActions.DELETE);
                break;
            }
            rows.add((Serializable[])row.getValue());
            bulkActions.add(OpenSearchBulkActions.INDEX);
        }
        this.handleRowChangeEvent(event, data.getTableId(), rows, bulkActions, StreamEventType.UPDATE);
    }

    void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) {
        LOG.debug("Handling delete event");
        DeleteRowsEventData data = (DeleteRowsEventData)event.getData();
        if (!this.isValidTableId(data.getTableId())) {
            return;
        }
        this.cascadeActionDetector.detectCascadingDeletes(event, this.parentTableMap, this.tableMetadataMap.get(data.getTableId()));
        this.handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.DELETE), StreamEventType.DELETE);
    }

    boolean isValidTableId(long tableId) {
        if (!this.tableMetadataMap.containsKey(tableId)) {
            LOG.debug("Cannot find table metadata, the event is likely not from a table of interest or the table metadata was not read");
            return false;
        }
        if (!this.isTableOfInterest(this.tableMetadataMap.get(tableId).getFullTableName())) {
            LOG.debug("The event is not from a table of interest");
            return false;
        }
        return true;
    }

    void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, long tableId, List<Serializable[]> rows, List<OpenSearchBulkActions> bulkActions, StreamEventType streamEventType) {
        if (this.currentBinlogCoordinate != null) {
            EventHeaderV4 eventHeader = (EventHeaderV4)event.getHeader();
            this.currentBinlogCoordinate = new BinlogCoordinate(this.currentBinlogCoordinate.getBinlogFilename(), eventHeader.getNextPosition());
            LOG.debug("Current binlog coordinate after receiving a row change event: " + String.valueOf(this.currentBinlogCoordinate));
        }
        long recordCount = rows.size();
        AcknowledgementSet acknowledgementSet = null;
        if (this.isAcknowledgmentsEnabled) {
            acknowledgementSet = this.streamCheckpointManager.createAcknowledgmentSet(this.currentBinlogCoordinate, recordCount);
        }
        long bytes = event.toString().getBytes().length;
        this.bytesReceivedSummary.record((double)bytes);
        TableMetadata tableMetadata = this.tableMetadataMap.get(tableId);
        List<String> columnNames = tableMetadata.getColumnNames();
        List<String> primaryKeys = tableMetadata.getPrimaryKeys();
        long eventTimestampMillis = event.getHeader().getTimestamp();
        BufferAccumulator bufferAccumulator = BufferAccumulator.create(this.buffer, (int)1000, (Duration)BUFFER_TIMEOUT);
        for (int rowNum = 0; rowNum < rows.size(); ++rowNum) {
            Object[] rowDataArray = rows.get(rowNum);
            OpenSearchBulkActions bulkAction = bulkActions.get(rowNum);
            HashMap<String, Object> rowDataMap = new HashMap<String, Object>();
            for (int i = 0; i < rowDataArray.length; ++i) {
                Map<String, String> tbColumnDatatypeMap = this.dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName());
                String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i));
                Object data = MySQLDataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i), rowDataArray[i], tableMetadata);
                rowDataMap.put(columnNames.get(i), data);
            }
            JacksonEvent dataPrepperEvent = JacksonEvent.builder().withEventType(DATA_PREPPER_EVENT_TYPE).withData(rowDataMap).build();
            Event pipelineEvent = this.recordConverter.convert((Event)dataPrepperEvent, tableMetadata.getDatabaseName(), tableMetadata.getDatabaseName(), tableMetadata.getTableName(), bulkAction, primaryKeys, eventTimestampMillis, eventTimestampMillis, streamEventType);
            this.pipelineEvents.add(pipelineEvent);
        }
        this.writeToBuffer((BufferAccumulator<Record<Event>>)bufferAccumulator, acknowledgementSet);
        this.bytesProcessedSummary.record((double)bytes);
        if (this.isAcknowledgmentsEnabled) {
            acknowledgementSet.complete();
        } else {
            this.streamCheckpointManager.saveChangeEventsStatus(this.currentBinlogCoordinate, recordCount);
        }
    }

    private boolean isTableOfInterest(String tableName) {
        return this.tableNames.contains(tableName);
    }

    private void writeToBuffer(BufferAccumulator<Record<Event>> bufferAccumulator, AcknowledgementSet acknowledgementSet) {
        for (Event pipelineEvent : this.pipelineEvents) {
            this.addToBufferAccumulator(bufferAccumulator, (Record<Event>)new Record((Object)pipelineEvent));
            if (acknowledgementSet == null) continue;
            acknowledgementSet.add(pipelineEvent);
        }
        this.flushBufferAccumulator(bufferAccumulator, this.pipelineEvents.size());
        this.pipelineEvents.clear();
    }

    private void addToBufferAccumulator(BufferAccumulator<Record<Event>> bufferAccumulator, Record<Event> record) {
        try {
            bufferAccumulator.add(record);
        }
        catch (Exception e) {
            LOG.error(DataPrepperMarkers.NOISY, "Failed to add event to buffer", (Throwable)e);
        }
    }

    private void flushBufferAccumulator(BufferAccumulator<Record<Event>> bufferAccumulator, int eventCount) {
        try {
            bufferAccumulator.flush();
            this.changeEventSuccessCounter.increment((double)eventCount);
        }
        catch (Exception e) {
            LOG.error(DataPrepperMarkers.NOISY, "Failed to flush buffer", (Throwable)e);
            this.changeEventErrorCounter.increment((double)eventCount);
        }
    }

    private void processEvent(com.github.shyiko.mysql.binlog.event.Event event, Consumer<com.github.shyiko.mysql.binlog.event.Event> function) {
        this.binlogEventExecutorService.submit(() -> this.handleEventAndErrors(event, function));
    }

    private void handleEventAndErrors(com.github.shyiko.mysql.binlog.event.Event event, Consumer<com.github.shyiko.mysql.binlog.event.Event> function) {
        try {
            this.eventProcessingTimer.record(() -> function.accept(event));
        }
        catch (Exception e) {
            LOG.error(DataPrepperMarkers.NOISY, "Failed to process change event of type {}", (Object)event.getHeader().getEventType(), (Object)e);
            this.eventProcessingErrorCounter.increment();
        }
    }
}

