/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql.connection.pgoutput;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.UnchangedToastedReplicationMessageColumn;
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.AbstractReplicationMessageColumn;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.MessageDecoderContext;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.TransactionMessage;
import io.debezium.connector.postgresql.connection.WalPositionLocator;
import io.debezium.connector.postgresql.connection.pgoutput.ColumnMetaData;
import io.debezium.connector.postgresql.connection.pgoutput.PgOutputRelationMetaData;
import io.debezium.connector.postgresql.connection.pgoutput.PgOutputReplicationMessage;
import io.debezium.connector.postgresql.connection.pgoutput.PgOutputTruncateReplicationMessage;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.HexConverter;
import io.debezium.util.Strings;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PgOutputMessageDecoder
extends AbstractMessageDecoder {
    private static final Logger LOGGER = LoggerFactory.getLogger(PgOutputMessageDecoder.class);
    private static final Instant PG_EPOCH = LocalDate.of(2000, 1, 1).atStartOfDay().toInstant(ZoneOffset.UTC);
    private static final byte SPACE = 32;
    private final MessageDecoderContext decoderContext;
    private final PostgresConnection connection;
    private Instant commitTimestamp;
    private int transactionId;

    public PgOutputMessageDecoder(MessageDecoderContext decoderContext) {
        this.decoderContext = decoderContext;
        this.connection = new PostgresConnection(decoderContext.getConfig().jdbcConfig(), decoderContext.getSchema().getTypeRegistry());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean shouldMessageBeSkipped(ByteBuffer buffer, Lsn lastReceivedLsn, Lsn startLsn, WalPositionLocator walPosition) {
        int position = buffer.position();
        try {
            MessageType type = MessageType.forType((char)buffer.get());
            LOGGER.trace("Message Type: {}", (Object)type);
            boolean candidateForSkipping = super.shouldMessageBeSkipped(buffer, lastReceivedLsn, startLsn, walPosition);
            switch (type) {
                case COMMIT: 
                case BEGIN: 
                case RELATION: {
                    LOGGER.trace("{} messages are always reprocessed", (Object)type);
                    boolean bl = false;
                    return bl;
                }
            }
            boolean bl = candidateForSkipping;
            return bl;
        }
        finally {
            buffer.position(position);
        }
    }

    @Override
    public void processNotEmptyMessage(ByteBuffer buffer, ReplicationStream.ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
        if (LOGGER.isTraceEnabled()) {
            if (!buffer.hasArray()) {
                throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
            }
            byte[] source = buffer.array();
            byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length + 2);
            int lastPos = content.length - 1;
            content[lastPos - 1] = 32;
            content[lastPos] = 32;
            LOGGER.trace("Message arrived from database {}", (Object)HexConverter.convertToHexString((byte[])content));
        }
        MessageType messageType = MessageType.forType((char)buffer.get());
        switch (messageType) {
            case BEGIN: {
                this.handleBeginMessage(buffer, processor);
                break;
            }
            case COMMIT: {
                this.handleCommitMessage(buffer, processor);
                break;
            }
            case RELATION: {
                this.handleRelationMessage(buffer, typeRegistry);
                break;
            }
            case INSERT: {
                this.decodeInsert(buffer, typeRegistry, processor);
                break;
            }
            case UPDATE: {
                this.decodeUpdate(buffer, typeRegistry, processor);
                break;
            }
            case DELETE: {
                this.decodeDelete(buffer, typeRegistry, processor);
                break;
            }
            case TRUNCATE: {
                if (this.decoderContext.getConfig().truncateHandlingMode() == PostgresConnectorConfig.TruncateHandlingMode.INCLUDE) {
                    this.decodeTruncate(buffer, typeRegistry, processor);
                    break;
                }
                LOGGER.trace("Message Type {} skipped, not processed.", (Object)messageType);
                break;
            }
            default: {
                LOGGER.trace("Message Type {} skipped, not processed.", (Object)messageType);
            }
        }
    }

    @Override
    public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder) {
        return builder.withSlotOption("proto_version", 1).withSlotOption("publication_names", this.decoderContext.getConfig().publicationName());
    }

    @Override
    public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder) {
        return builder;
    }

    private void handleBeginMessage(ByteBuffer buffer, ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
        Lsn lsn = Lsn.valueOf(buffer.getLong());
        this.commitTimestamp = PG_EPOCH.plus(buffer.getLong(), ChronoUnit.MICROS);
        this.transactionId = buffer.getInt();
        LOGGER.trace("Event: {}", (Object)MessageType.BEGIN);
        LOGGER.trace("Final LSN of transaction: {}", (Object)lsn);
        LOGGER.trace("Commit timestamp of transaction: {}", (Object)this.commitTimestamp);
        LOGGER.trace("XID of transaction: {}", (Object)this.transactionId);
        processor.process(new TransactionMessage(ReplicationMessage.Operation.BEGIN, this.transactionId, this.commitTimestamp));
    }

    private void handleCommitMessage(ByteBuffer buffer, ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
        byte flags = buffer.get();
        Lsn lsn = Lsn.valueOf(buffer.getLong());
        Lsn endLsn = Lsn.valueOf(buffer.getLong());
        Instant commitTimestamp = PG_EPOCH.plus(buffer.getLong(), ChronoUnit.MICROS);
        LOGGER.trace("Event: {}", (Object)MessageType.COMMIT);
        LOGGER.trace("Flags: {} (currently unused and most likely 0)", (Object)flags);
        LOGGER.trace("Commit LSN: {}", (Object)lsn);
        LOGGER.trace("End LSN of transaction: {}", (Object)endLsn);
        LOGGER.trace("Commit timestamp of transaction: {}", (Object)commitTimestamp);
        processor.process(new TransactionMessage(ReplicationMessage.Operation.COMMIT, this.transactionId, commitTimestamp));
    }

    private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) throws SQLException {
        int relationId = buffer.getInt();
        String schemaName = PgOutputMessageDecoder.readString(buffer);
        String tableName = PgOutputMessageDecoder.readString(buffer);
        byte replicaIdentityId = buffer.get();
        short columnCount = buffer.getShort();
        LOGGER.trace("Event: {}, RelationId: {}, Replica Identity: {}, Columns: {}", new Object[]{MessageType.RELATION, relationId, (int)replicaIdentityId, columnCount});
        LOGGER.trace("Schema: '{}', Table: '{}'", (Object)schemaName, (Object)tableName);
        DatabaseMetaData databaseMetadata = this.connection.connection().getMetaData();
        TableId tableId = new TableId(null, schemaName, tableName);
        List<Column> readColumns = this.getTableColumnsFromDatabase(this.connection, databaseMetadata, tableId);
        Map<String, Optional> columnDefaults = readColumns.stream().filter(Column::hasDefaultValue).collect(Collectors.toMap(Column::name, column -> Optional.ofNullable(column.defaultValue())));
        Map<String, Boolean> columnOptionality = readColumns.stream().collect(Collectors.toMap(Column::name, Column::isOptional));
        List primaryKeyColumns = this.connection.readPrimaryKeyNames(databaseMetadata, tableId);
        if (primaryKeyColumns == null || primaryKeyColumns.isEmpty()) {
            LOGGER.warn("Primary keys are not defined for table '{}', defaulting to unique indices", (Object)tableName);
            primaryKeyColumns = this.connection.readTableUniqueIndices(databaseMetadata, tableId);
        }
        ArrayList<ColumnMetaData> columns = new ArrayList<ColumnMetaData>();
        HashSet<String> columnNames = new HashSet<String>();
        for (short i = 0; i < columnCount; i = (short)(i + 1)) {
            byte flags = buffer.get();
            String columnName = Strings.unquoteIdentifierPart((String)PgOutputMessageDecoder.readString(buffer));
            int columnType = buffer.getInt();
            int attypmod = buffer.getInt();
            PostgresType postgresType = typeRegistry.get(columnType);
            boolean key = this.isColumnInPrimaryKey(schemaName, tableName, columnName, primaryKeyColumns);
            Boolean optional = columnOptionality.get(columnName);
            if (optional == null) {
                LOGGER.warn("Column '{}' optionality could not be determined, defaulting to true", (Object)columnName);
                optional = true;
            }
            boolean hasDefault = columnDefaults.containsKey(columnName);
            Object defaultValue = columnDefaults.getOrDefault(columnName, Optional.empty()).orElse(null);
            columns.add(new ColumnMetaData(columnName, postgresType, key, optional, hasDefault, defaultValue, attypmod));
            columnNames.add(columnName);
        }
        primaryKeyColumns.retainAll(columnNames);
        Table table = this.resolveRelationFromMetadata(new PgOutputRelationMetaData(relationId, schemaName, tableName, columns, primaryKeyColumns));
        this.decoderContext.getSchema().applySchemaChangesForTable(relationId, table);
    }

    private List<Column> getTableColumnsFromDatabase(PostgresConnection connection, DatabaseMetaData databaseMetadata, TableId tableId) {
        ArrayList<Column> readColumns = new ArrayList<Column>();
        try (ResultSet columnMetadata = databaseMetadata.getColumns(null, tableId.schema(), tableId.table(), null);){
            while (columnMetadata.next()) {
                connection.readColumnForDecoder(columnMetadata, tableId, this.decoderContext.getConfig().getColumnFilter()).ifPresent(readColumns::add);
            }
        }
        catch (SQLException e) {
            LOGGER.warn("Failed to read column metadata for '{}.{}'", (Object)tableId.schema(), (Object)tableId.table());
        }
        return readColumns;
    }

    private boolean isColumnInPrimaryKey(String schemaName, String tableName, String columnName, List<String> primaryKeyColumns) {
        Table existingTable;
        if (!primaryKeyColumns.isEmpty() && primaryKeyColumns.contains(columnName)) {
            return true;
        }
        return primaryKeyColumns.isEmpty() && (existingTable = this.decoderContext.getSchema().tableFor(new TableId(null, schemaName, tableName))) != null && existingTable.primaryKeyColumnNames().contains(columnName);
    }

    private void decodeInsert(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
        int relationId = buffer.getInt();
        char tupleType = (char)buffer.get();
        LOGGER.trace("Event: {}, Relation Id: {}, Tuple Type: {}", new Object[]{MessageType.INSERT, relationId, Character.valueOf(tupleType)});
        Optional<Table> resolvedTable = this.resolveRelation(relationId);
        if (!resolvedTable.isPresent()) {
            processor.process(new ReplicationMessage.NoopMessage(this.transactionId, this.commitTimestamp));
        } else {
            Table table = resolvedTable.get();
            List<ReplicationMessage.Column> columns = PgOutputMessageDecoder.resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
            processor.process(new PgOutputReplicationMessage(ReplicationMessage.Operation.INSERT, table.id().toDoubleQuotedString(), this.commitTimestamp, this.transactionId, null, columns));
        }
    }

    private void decodeUpdate(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
        int relationId = buffer.getInt();
        LOGGER.trace("Event: {}, RelationId: {}", (Object)MessageType.UPDATE, (Object)relationId);
        Optional<Table> resolvedTable = this.resolveRelation(relationId);
        if (!resolvedTable.isPresent()) {
            processor.process(new ReplicationMessage.NoopMessage(this.transactionId, this.commitTimestamp));
        } else {
            Table table = resolvedTable.get();
            List<ReplicationMessage.Column> oldColumns = null;
            char tupleType = (char)buffer.get();
            if ('O' == tupleType || 'K' == tupleType) {
                oldColumns = PgOutputMessageDecoder.resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
                tupleType = (char)buffer.get();
            }
            List<ReplicationMessage.Column> columns = PgOutputMessageDecoder.resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
            processor.process(new PgOutputReplicationMessage(ReplicationMessage.Operation.UPDATE, table.id().toDoubleQuotedString(), this.commitTimestamp, this.transactionId, oldColumns, columns));
        }
    }

    private void decodeDelete(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
        int relationId = buffer.getInt();
        char tupleType = (char)buffer.get();
        LOGGER.trace("Event: {}, RelationId: {}, Tuple Type: {}", new Object[]{MessageType.DELETE, relationId, Character.valueOf(tupleType)});
        Optional<Table> resolvedTable = this.resolveRelation(relationId);
        if (!resolvedTable.isPresent()) {
            processor.process(new ReplicationMessage.NoopMessage(this.transactionId, this.commitTimestamp));
        } else {
            Table table = resolvedTable.get();
            List<ReplicationMessage.Column> columns = PgOutputMessageDecoder.resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
            processor.process(new PgOutputReplicationMessage(ReplicationMessage.Operation.DELETE, table.id().toDoubleQuotedString(), this.commitTimestamp, this.transactionId, columns, null));
        }
    }

    private void decodeTruncate(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
        int numberOfRelations = buffer.getInt();
        byte optionBits = buffer.get();
        List<String> truncateOptions = this.getTruncateOptions(optionBits);
        int[] relationIds = new int[numberOfRelations];
        for (int i = 0; i < numberOfRelations; ++i) {
            relationIds[i] = buffer.getInt();
        }
        ArrayList tables = new ArrayList();
        for (int relationId : relationIds) {
            Optional<Table> resolvedTable = this.resolveRelation(relationId);
            resolvedTable.ifPresent(tables::add);
        }
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Event: {}, RelationIds: {}, OptionBits: {}", new Object[]{MessageType.TRUNCATE, Arrays.toString(relationIds), (int)optionBits});
        }
        int noOfResolvedTables = tables.size();
        for (int i = 0; i < noOfResolvedTables; ++i) {
            Table table = (Table)tables.get(i);
            boolean lastTableInTruncate = i + 1 == noOfResolvedTables;
            processor.process(new PgOutputTruncateReplicationMessage(ReplicationMessage.Operation.TRUNCATE, table.id().toDoubleQuotedString(), this.commitTimestamp, this.transactionId, lastTableInTruncate));
        }
    }

    private List<String> getTruncateOptions(int flag) {
        switch (flag) {
            case 1: {
                return Collections.singletonList("CASCADE");
            }
            case 2: {
                return Collections.singletonList("RESTART IDENTITY");
            }
            case 3: {
                return Arrays.asList("RESTART IDENTITY", "CASCADE");
            }
        }
        return null;
    }

    private Optional<Table> resolveRelation(int relationId) {
        return Optional.ofNullable(this.decoderContext.getSchema().tableFor(relationId));
    }

    private Table resolveRelationFromMetadata(PgOutputRelationMetaData metadata) {
        ArrayList<Column> columns = new ArrayList<Column>();
        for (ColumnMetaData columnMetadata : metadata.getColumns()) {
            ColumnEditor editor = Column.editor().name(columnMetadata.getColumnName()).jdbcType(columnMetadata.getPostgresType().getRootType().getJdbcId()).nativeType(columnMetadata.getPostgresType().getRootType().getOid()).optional(columnMetadata.isOptional()).type(columnMetadata.getPostgresType().getName(), columnMetadata.getTypeName()).length(columnMetadata.getLength()).scale(Integer.valueOf(columnMetadata.getScale()));
            if (columnMetadata.hasDefaultValue()) {
                editor.defaultValue(columnMetadata.getDefaultValue());
            }
            columns.add(editor.create());
        }
        Table table = Table.editor().addColumns(columns).setPrimaryKeyNames(metadata.getPrimaryKeyNames()).tableId(metadata.getTableId()).create();
        LOGGER.trace("Resolved '{}' as '{}'", (Object)table.id(), (Object)table);
        return table;
    }

    private static String readString(ByteBuffer buffer) {
        StringBuilder sb = new StringBuilder();
        byte b = 0;
        while ((b = buffer.get()) != 0) {
            sb.append((char)b);
        }
        return sb.toString();
    }

    private static String readColumnValueAsString(ByteBuffer buffer) {
        int length = buffer.getInt();
        byte[] value = new byte[length];
        buffer.get(value, 0, length);
        return new String(value, Charset.forName("UTF-8"));
    }

    private static List<ReplicationMessage.Column> resolveColumnsFromStreamTupleData(ByteBuffer buffer, final TypeRegistry typeRegistry, Table table) {
        short numberOfColumns = buffer.getShort();
        ArrayList<ReplicationMessage.Column> columns = new ArrayList<ReplicationMessage.Column>(numberOfColumns);
        for (short i = 0; i < numberOfColumns; i = (short)(i + 1)) {
            Column column = (Column)table.columns().get(i);
            final String columnName = column.name();
            String typeName = column.typeName();
            final PostgresType columnType = typeRegistry.get(typeName);
            final String typeExpression = column.typeExpression();
            boolean optional = column.isOptional();
            char type = (char)buffer.get();
            if (type == 't') {
                final String valueStr = PgOutputMessageDecoder.readColumnValueAsString(buffer);
                columns.add(new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, optional, true){

                    @Override
                    public Object getValue(PostgresStreamingChangeEventSource.PgConnectionSupplier connection, boolean includeUnknownDatatypes) {
                        return PgOutputReplicationMessage.getValue(columnName, columnType, typeExpression, valueStr, connection, includeUnknownDatatypes, typeRegistry);
                    }

                    public String toString() {
                        return columnName + "(" + typeExpression + ")=" + valueStr;
                    }
                });
                continue;
            }
            if (type == 'u') {
                columns.add(new UnchangedToastedReplicationMessageColumn(columnName, columnType, typeExpression, optional, true){

                    public String toString() {
                        return columnName + "(" + typeExpression + ") - Unchanged toasted column";
                    }
                });
                continue;
            }
            if (type != 'n') continue;
            columns.add(new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, true, true){

                @Override
                public Object getValue(PostgresStreamingChangeEventSource.PgConnectionSupplier connection, boolean includeUnknownDatatypes) {
                    return null;
                }
            });
        }
        columns.forEach(c -> LOGGER.trace("Column: {}", c));
        return columns;
    }

    @Override
    public void close() {
        this.connection.close();
    }

    public static enum MessageType {
        RELATION,
        BEGIN,
        COMMIT,
        INSERT,
        UPDATE,
        DELETE,
        TYPE,
        ORIGIN,
        TRUNCATE;


        public static MessageType forType(char type) {
            switch (type) {
                case 'R': {
                    return RELATION;
                }
                case 'B': {
                    return BEGIN;
                }
                case 'C': {
                    return COMMIT;
                }
                case 'I': {
                    return INSERT;
                }
                case 'U': {
                    return UPDATE;
                }
                case 'D': {
                    return DELETE;
                }
                case 'Y': {
                    return TYPE;
                }
                case 'O': {
                    return ORIGIN;
                }
                case 'T': {
                    return TRUNCATE;
                }
            }
            throw new IllegalArgumentException("Unsupported message type: " + type);
        }
    }
}

