/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds.stream;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.ColumnType;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHelper;
import org.opensearch.dataprepper.plugins.source.rds.model.MessageType;
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.stream.LogicalReplicationClient;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointer;
import org.postgresql.replication.LogSequenceNumber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogicalReplicationEventProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(LogicalReplicationEventProcessor.class);
    static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60L);
    static final int DEFAULT_BUFFER_BATCH_SIZE = 1000;
    static final int NUM_OF_RETRIES = 3;
    static final int BACKOFF_IN_MILLIS = 500;
    static final String CHANGE_EVENTS_PROCESSED_COUNT = "changeEventsProcessed";
    static final String CHANGE_EVENTS_PROCESSING_ERROR_COUNT = "changeEventsProcessingErrors";
    static final String BYTES_RECEIVED = "bytesReceived";
    static final String BYTES_PROCESSED = "bytesProcessed";
    static final String REPLICATION_LOG_EVENT_PROCESSING_TIME = "replicationLogEntryProcessingTime";
    static final String REPLICATION_LOG_PROCESSING_ERROR_COUNT = "replicationLogEntryProcessingErrors";
    private final StreamPartition streamPartition;
    private final RdsSourceConfig sourceConfig;
    private final StreamRecordConverter recordConverter;
    private final Buffer<Record<Event>> buffer;
    private final BufferAccumulator<Record<Event>> bufferAccumulator;
    private final List<Event> pipelineEvents;
    private final PluginMetrics pluginMetrics;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final LogicalReplicationClient logicalReplicationClient;
    private final StreamCheckpointer streamCheckpointer;
    private final StreamCheckpointManager streamCheckpointManager;
    private final Counter changeEventSuccessCounter;
    private final Counter changeEventErrorCounter;
    private final DistributionSummary bytesReceivedSummary;
    private final DistributionSummary bytesProcessedSummary;
    private final Timer eventProcessingTimer;
    private final Counter eventProcessingErrorCounter;
    private long currentLsn;
    private long currentEventTimestamp;
    private long bytesReceived;
    private Map<Long, TableMetadata> tableMetadataMap;

    public LogicalReplicationEventProcessor(StreamPartition streamPartition, RdsSourceConfig sourceConfig, Buffer<Record<Event>> buffer, String s3Prefix, PluginMetrics pluginMetrics, LogicalReplicationClient logicalReplicationClient, StreamCheckpointer streamCheckpointer, AcknowledgementSetManager acknowledgementSetManager) {
        this.streamPartition = streamPartition;
        this.sourceConfig = sourceConfig;
        this.recordConverter = new StreamRecordConverter(s3Prefix, sourceConfig.getPartitionCount());
        this.buffer = buffer;
        this.bufferAccumulator = BufferAccumulator.create(buffer, (int)1000, (Duration)BUFFER_TIMEOUT);
        this.pluginMetrics = pluginMetrics;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.logicalReplicationClient = logicalReplicationClient;
        this.streamCheckpointer = streamCheckpointer;
        this.streamCheckpointManager = new StreamCheckpointManager(streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(), acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(), sourceConfig.getEngine(), pluginMetrics);
        this.streamCheckpointManager.start();
        this.tableMetadataMap = new HashMap<Long, TableMetadata>();
        this.pipelineEvents = new ArrayList<Event>();
        this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT);
        this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT);
        this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
        this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
        this.eventProcessingTimer = pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME);
        this.eventProcessingErrorCounter = pluginMetrics.counter(REPLICATION_LOG_PROCESSING_ERROR_COUNT);
    }

    public static LogicalReplicationEventProcessor create(StreamPartition streamPartition, RdsSourceConfig sourceConfig, Buffer<Record<Event>> buffer, String s3Prefix, PluginMetrics pluginMetrics, LogicalReplicationClient logicalReplicationClient, StreamCheckpointer streamCheckpointer, AcknowledgementSetManager acknowledgementSetManager) {
        return new LogicalReplicationEventProcessor(streamPartition, sourceConfig, buffer, s3Prefix, pluginMetrics, logicalReplicationClient, streamCheckpointer, acknowledgementSetManager);
    }

    public void process(ByteBuffer msg) {
        MessageType messageType;
        char typeChar = '\u0000';
        try {
            typeChar = (char)msg.get();
            messageType = MessageType.from(typeChar);
        }
        catch (IllegalArgumentException e) {
            LOG.warn("Unknown message type {} received from stream. Skipping.", (Object)Character.valueOf(typeChar));
            return;
        }
        switch (messageType) {
            case BEGIN: {
                this.handleMessageWithRetries(msg, this::processBeginMessage, messageType);
                break;
            }
            case RELATION: {
                this.handleMessageWithRetries(msg, this::processRelationMessage, messageType);
                break;
            }
            case INSERT: {
                this.handleMessageWithRetries(msg, this::processInsertMessage, messageType);
                break;
            }
            case UPDATE: {
                this.handleMessageWithRetries(msg, this::processUpdateMessage, messageType);
                break;
            }
            case DELETE: {
                this.handleMessageWithRetries(msg, this::processDeleteMessage, messageType);
                break;
            }
            case COMMIT: {
                this.handleMessageWithRetries(msg, this::processCommitMessage, messageType);
                break;
            }
            case TYPE: {
                this.handleMessageWithRetries(msg, this::processTypeMessage, messageType);
                break;
            }
            default: {
                LOG.debug("Replication message type '{}' is not supported. Skipping.", (Object)messageType);
            }
        }
    }

    public void stopClient() {
        try {
            this.logicalReplicationClient.disconnect();
            LOG.info("Logical replication client disconnected.");
        }
        catch (Exception e) {
            LOG.error("Logical replication client failed to disconnect.", (Throwable)e);
        }
    }

    public void stopCheckpointManager() {
        this.streamCheckpointManager.stop();
    }

    void processBeginMessage(ByteBuffer msg) {
        this.currentLsn = msg.getLong();
        long epochMicro = msg.getLong();
        this.currentEventTimestamp = this.convertPostgresEventTimestamp(epochMicro);
        int transaction_xid = msg.getInt();
        LOG.debug("Processed BEGIN message with LSN: {}, Timestamp: {}, TransactionId: {}", new Object[]{this.currentLsn, this.currentEventTimestamp, transaction_xid});
    }

    void processRelationMessage(ByteBuffer msg) {
        long tableId = msg.getInt();
        String databaseName = this.sourceConfig.getDatabase();
        String schemaName = this.getNullTerminatedString(msg);
        String tableName = this.getNullTerminatedString(msg);
        byte replicaId = msg.get();
        int numberOfColumns = msg.getShort();
        ArrayList<String> columnNames = new ArrayList<String>();
        ArrayList<String> columnTypes = new ArrayList<String>();
        for (int i = 0; i < numberOfColumns; ++i) {
            ColumnType columnType;
            byte flag = msg.get();
            String columnName = this.getNullTerminatedString(msg);
            try {
                columnType = ColumnType.getByTypeId(msg.getInt());
            }
            catch (IllegalArgumentException e) {
                Set<String> enumColumns = this.getEnumColumns(databaseName, schemaName, tableName);
                columnType = enumColumns != null && enumColumns.contains(columnName) ? ColumnType.getByTypeId(-1) : ColumnType.UNKNOWN;
            }
            String columnTypeName = columnType.getTypeName();
            columnTypes.add(columnTypeName);
            int typeModifier = msg.getInt();
            if (columnType == ColumnType.VARCHAR) {
                int n = typeModifier - 4;
            } else if (columnType == ColumnType.NUMERIC) {
                int precision = typeModifier - 4 >> 16;
                int n = typeModifier - 4 & 0xFFFF;
            }
            columnNames.add(columnName);
        }
        List<String> primaryKeys = this.getPrimaryKeys(databaseName, schemaName, tableName);
        TableMetadata tableMetadata = TableMetadata.builder().withDatabaseName(databaseName).withSchemaName(schemaName).withTableName(tableName).withColumnNames(columnNames).withColumnTypes(columnTypes).withPrimaryKeys(primaryKeys).build();
        this.tableMetadataMap.put(tableId, tableMetadata);
        LOG.debug("Processed an Relation message with RelationId: {} Namespace: {} RelationName: {} ReplicaId: {}", new Object[]{tableId, schemaName, tableName, (int)replicaId});
    }

    void processCommitMessage(ByteBuffer msg) {
        byte flag = msg.get();
        long commitLsn = msg.getLong();
        long endLsn = msg.getLong();
        long epochMicro = msg.getLong();
        if (this.currentLsn != commitLsn) {
            this.pipelineEvents.clear();
            throw new RuntimeException("Commit LSN does not match current LSN, skipping");
        }
        long recordCount = this.pipelineEvents.size();
        if (recordCount == 0L) {
            LOG.debug("No records found in current batch. Skipping.");
            return;
        }
        AcknowledgementSet acknowledgementSet = null;
        if (this.sourceConfig.isAcknowledgmentsEnabled()) {
            acknowledgementSet = this.streamCheckpointManager.createAcknowledgmentSet(LogSequenceNumber.valueOf((long)this.currentLsn), recordCount);
        }
        this.writeToBuffer(this.bufferAccumulator, acknowledgementSet);
        this.bytesProcessedSummary.record((double)this.bytesReceived);
        LOG.debug("Processed a COMMIT message with Flag: {} CommitLsn: {} EndLsn: {} Timestamp: {}", new Object[]{(int)flag, commitLsn, endLsn, epochMicro});
        if (this.sourceConfig.isAcknowledgmentsEnabled()) {
            acknowledgementSet.complete();
        } else {
            this.streamCheckpointManager.saveChangeEventsStatus(LogSequenceNumber.valueOf((long)this.currentLsn), recordCount);
        }
    }

    void processInsertMessage(ByteBuffer msg) {
        long tableId = msg.getInt();
        char n_char = (char)msg.get();
        TableMetadata tableMetadata = this.tableMetadataMap.get(tableId);
        List<String> columnNames = tableMetadata.getColumnNames();
        List<String> primaryKeys = tableMetadata.getPrimaryKeys();
        long eventTimestampMillis = this.currentEventTimestamp;
        this.doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX, StreamEventType.INSERT);
        LOG.debug("Processed an INSERT message with table id: {}", (Object)tableId);
    }

    void processUpdateMessage(ByteBuffer msg) {
        long tableId = msg.getInt();
        TableMetadata tableMetadata = this.tableMetadataMap.get(tableId);
        List<String> columnNames = tableMetadata.getColumnNames();
        List<String> primaryKeys = tableMetadata.getPrimaryKeys();
        List<String> columnTypes = tableMetadata.getColumnTypes();
        long eventTimestampMillis = this.currentEventTimestamp;
        TupleDataType tupleDataType = TupleDataType.fromValue((char)msg.get());
        if (tupleDataType == TupleDataType.NEW) {
            this.doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX, StreamEventType.UPDATE);
        } else if (tupleDataType == TupleDataType.OLD || tupleDataType == TupleDataType.KEY) {
            Map<String, Object> oldRowDataMap = this.getRowDataMap(msg, columnNames, columnTypes);
            msg.get();
            Map<String, Object> newRowDataMap = this.getRowDataMap(msg, columnNames, columnTypes);
            if (this.isPrimaryKeyChanged(oldRowDataMap, newRowDataMap, primaryKeys)) {
                LOG.debug("Primary keys were changed");
                this.createPipelineEvent(oldRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE, StreamEventType.UPDATE);
            }
            this.createPipelineEvent(newRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX, StreamEventType.UPDATE);
        }
        LOG.debug("Processed an UPDATE message with table id: {}", (Object)tableId);
    }

    private boolean isPrimaryKeyChanged(Map<String, Object> oldRowDataMap, Map<String, Object> newRowDataMap, List<String> primaryKeys) {
        for (String primaryKey : primaryKeys) {
            if (oldRowDataMap.get(primaryKey).equals(newRowDataMap.get(primaryKey))) continue;
            return true;
        }
        return false;
    }

    void processDeleteMessage(ByteBuffer msg) {
        long tableId = msg.getInt();
        char n_char = (char)msg.get();
        TableMetadata tableMetadata = this.tableMetadataMap.get(tableId);
        List<String> columnNames = tableMetadata.getColumnNames();
        List<String> primaryKeys = tableMetadata.getPrimaryKeys();
        long eventTimestampMillis = this.currentEventTimestamp;
        this.doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE, StreamEventType.DELETE);
        LOG.debug("Processed a DELETE message with table id: {}", (Object)tableId);
    }

    void processTypeMessage(ByteBuffer msg) {
        int dataTypeId = msg.getInt();
        String schemaName = this.getNullTerminatedString(msg);
        String typeName = this.getNullTerminatedString(msg);
        LOG.debug("Processed a TYPE message with TypeId: {} Namespace: {} TypeName: {}", new Object[]{dataTypeId, schemaName, typeName});
    }

    private void doProcess(ByteBuffer msg, List<String> columnNames, TableMetadata tableMetadata, List<String> primaryKeys, long eventTimestampMillis, OpenSearchBulkActions bulkAction, StreamEventType streamEventType) {
        this.bytesReceived = msg.capacity();
        this.bytesReceivedSummary.record((double)this.bytesReceived);
        List<String> columnTypes = tableMetadata.getColumnTypes();
        Map<String, Object> rowDataMap = this.getRowDataMap(msg, columnNames, columnTypes);
        this.createPipelineEvent(rowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, bulkAction, streamEventType);
    }

    private Map<String, Object> getRowDataMap(ByteBuffer msg, List<String> columnNames, List<String> columnTypes) {
        HashMap<String, Object> rowDataMap = new HashMap<String, Object>();
        int numberOfColumns = msg.getShort();
        for (int i = 0; i < numberOfColumns; ++i) {
            char type = (char)msg.get();
            if (type == 'n') {
                rowDataMap.put(columnNames.get(i), null);
                continue;
            }
            if (type == 't') {
                int length = msg.getInt();
                byte[] bytes = new byte[length];
                msg.get(bytes);
                String value = new String(bytes, StandardCharsets.UTF_8);
                String columnName = columnNames.get(i);
                String columnType = columnTypes.get(i);
                Object data = PostgresDataTypeHelper.getDataByColumnType(PostgresDataType.byDataType(columnType), columnName, value);
                rowDataMap.put(columnNames.get(i), data);
                continue;
            }
            LOG.warn("Unknown column type: {}", (Object)Character.valueOf(type));
        }
        return rowDataMap;
    }

    private void createPipelineEvent(Map<String, Object> rowDataMap, TableMetadata tableMetadata, List<String> primaryKeys, long eventTimestampMillis, OpenSearchBulkActions bulkAction, StreamEventType streamEventType) {
        JacksonEvent dataPrepperEvent = JacksonEvent.builder().withEventType("event").withData(rowDataMap).build();
        Event pipelineEvent = this.recordConverter.convert((Event)dataPrepperEvent, tableMetadata.getDatabaseName(), tableMetadata.getSchemaName(), tableMetadata.getTableName(), bulkAction, primaryKeys, eventTimestampMillis, eventTimestampMillis, streamEventType);
        this.pipelineEvents.add(pipelineEvent);
    }

    private void writeToBuffer(BufferAccumulator<Record<Event>> bufferAccumulator, AcknowledgementSet acknowledgementSet) {
        for (Event pipelineEvent : this.pipelineEvents) {
            this.addToBufferAccumulator(bufferAccumulator, (Record<Event>)new Record((Object)pipelineEvent));
            if (acknowledgementSet == null) continue;
            acknowledgementSet.add(pipelineEvent);
        }
        this.flushBufferAccumulator(bufferAccumulator, this.pipelineEvents.size());
        this.pipelineEvents.clear();
    }

    private void addToBufferAccumulator(BufferAccumulator<Record<Event>> bufferAccumulator, Record<Event> record) {
        try {
            bufferAccumulator.add(record);
        }
        catch (Exception e) {
            LOG.error(DataPrepperMarkers.NOISY, "Failed to add event to buffer", (Throwable)e);
        }
    }

    private void flushBufferAccumulator(BufferAccumulator<Record<Event>> bufferAccumulator, int eventCount) {
        try {
            bufferAccumulator.flush();
            this.changeEventSuccessCounter.increment((double)eventCount);
        }
        catch (Exception e) {
            LOG.error(DataPrepperMarkers.NOISY, "Failed to flush buffer", (Throwable)e);
            this.changeEventErrorCounter.increment((double)eventCount);
        }
    }

    private long convertPostgresEventTimestamp(long postgresMicro) {
        long offsetMicro = 946684800000000L;
        return (postgresMicro + offsetMicro) / 1000L;
    }

    private String getNullTerminatedString(ByteBuffer msg) {
        byte b;
        StringBuilder sb = new StringBuilder();
        while (msg.hasRemaining() && (b = msg.get()) != 0) {
            sb.append((char)b);
        }
        return sb.toString();
    }

    private List<String> getPrimaryKeys(String databaseName, String schemaName, String tableName) {
        StreamProgressState progressState = this.streamPartition.getProgressState().get();
        return progressState.getPrimaryKeyMap().get(databaseName + "." + schemaName + "." + tableName);
    }

    private Set<String> getEnumColumns(String databaseName, String schemaName, String tableName) {
        StreamProgressState progressState = this.streamPartition.getProgressState().get();
        return progressState.getPostgresStreamState().getEnumColumnsByTable().get(databaseName + "." + schemaName + "." + tableName);
    }

    private void handleMessageWithRetries(ByteBuffer message, Consumer<ByteBuffer> function, MessageType messageType) {
        for (int retry = 0; retry <= 3; ++retry) {
            try {
                this.eventProcessingTimer.record(() -> function.accept(message));
                return;
            }
            catch (Exception e) {
                LOG.warn(DataPrepperMarkers.NOISY, "Error when processing change event of type {}, will retry", (Object)messageType, (Object)e);
                this.applyBackoff();
                continue;
            }
        }
        LOG.error("Failed to process change event of type {} after {} retries", (Object)messageType, (Object)3);
        this.eventProcessingErrorCounter.increment();
    }

    private void applyBackoff() {
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    static enum TupleDataType {
        NEW('N'),
        KEY('K'),
        OLD('O');

        private final char value;

        private TupleDataType(char value) {
            this.value = value;
        }

        public char getValue() {
            return this.value;
        }

        public static TupleDataType fromValue(char value) {
            for (TupleDataType type : TupleDataType.values()) {
                if (type.getValue() != value) continue;
                return type;
            }
            throw new IllegalArgumentException("Invalid TupleDataType value: " + value);
        }
    }
}

