/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.UnchangedToastedReplicationMessageColumn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.data.Envelope;
import io.debezium.function.Predicates;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresChangeRecordEmitter
extends RelationalChangeRecordEmitter<PostgresPartition> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresChangeRecordEmitter.class);
    private final ReplicationMessage message;
    private final PostgresSchema schema;
    private final PostgresConnectorConfig connectorConfig;
    private final PostgresConnection connection;
    private final TableId tableId;
    private final Map<String, Object> cachedOldToastedValues = new HashMap<String, Object>();

    public PostgresChangeRecordEmitter(PostgresPartition partition, OffsetContext offset, Clock clock, PostgresConnectorConfig connectorConfig, PostgresSchema schema, PostgresConnection connection, TableId tableId, ReplicationMessage message) {
        super((Partition)partition, offset, clock, (RelationalDatabaseConnectorConfig)connectorConfig);
        this.schema = schema;
        this.message = message;
        this.connectorConfig = connectorConfig;
        this.connection = connection;
        this.tableId = tableId;
        Objects.requireNonNull(this.tableId);
    }

    public Envelope.Operation getOperation() {
        switch (this.message.getOperation()) {
            case INSERT: {
                return Envelope.Operation.CREATE;
            }
            case UPDATE: {
                return Envelope.Operation.UPDATE;
            }
            case DELETE: {
                return Envelope.Operation.DELETE;
            }
            case TRUNCATE: {
                return Envelope.Operation.TRUNCATE;
            }
        }
        throw new IllegalArgumentException("Received event of unexpected command type: " + String.valueOf((Object)this.message.getOperation()));
    }

    public void emitChangeRecords(DataCollectionSchema schema, ChangeRecordEmitter.Receiver<PostgresPartition> receiver) throws InterruptedException {
        schema = this.synchronizeTableSchema(schema);
        super.emitChangeRecords(schema, receiver);
    }

    protected void emitTruncateRecord(ChangeRecordEmitter.Receiver receiver, TableSchema tableSchema) throws InterruptedException {
        Struct envelope = tableSchema.getEnvelopeSchema().truncate(this.getOffset().getSourceInfo(), this.getClock().currentTimeAsInstant());
        receiver.changeRecord(this.getPartition(), (DataCollectionSchema)tableSchema, Envelope.Operation.TRUNCATE, null, envelope, this.getOffset(), null);
    }

    protected Object[] getOldColumnValues() {
        try {
            switch (this.getOperation()) {
                case CREATE: {
                    return null;
                }
                case UPDATE: {
                    return this.columnValues(this.message.getOldTupleList(), this.tableId, true, true, true);
                }
            }
            return this.columnValues(this.message.getOldTupleList(), this.tableId, true, false, true);
        }
        catch (SQLException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    protected Object[] getNewColumnValues() {
        try {
            switch (this.getOperation()) {
                case CREATE: {
                    return this.columnValues(this.message.getNewTupleList(), this.tableId, true, false, false);
                }
                case UPDATE: {
                    return this.columnValues(this.message.getNewTupleList(), this.tableId, true, false, false);
                }
            }
            return null;
        }
        catch (SQLException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    private DataCollectionSchema synchronizeTableSchema(DataCollectionSchema tableSchema) {
        if (this.getOperation() == Envelope.Operation.DELETE || !this.message.shouldSchemaBeSynchronized()) {
            return tableSchema;
        }
        TableId tableId = (TableId)tableSchema.id();
        Table table = this.schema.tableFor(tableId);
        List<ReplicationMessage.Column> columns = this.message.getNewTupleList();
        if (this.schemaChanged(columns, table)) {
            this.refreshTableFromDatabase(tableId);
            this.schema.refresh(this.tableFromFromMessage(columns, this.schema.tableFor(tableId)));
        }
        return this.schema.schemaFor(tableId);
    }

    private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId tableId, boolean refreshSchemaIfChanged, boolean sourceOfToasted, boolean oldValues) throws SQLException {
        if (columns == null || columns.isEmpty()) {
            return null;
        }
        Table table = this.schema.tableFor(tableId);
        Objects.requireNonNull(table);
        List schemaColumns = table.columns();
        List columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn)).collect(Collectors.toList());
        Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size() ? schemaColumns.size() : columnsWithoutToasted.size()];
        HashSet<String> undeliveredToastableColumns = new HashSet<String>(this.schema.getToastableColumnsForTableId(table.id()));
        for (ReplicationMessage.Column column : columns) {
            Object candidate;
            String columnName = Strings.unquoteIdentifierPart((String)column.getName());
            undeliveredToastableColumns.remove(columnName);
            int position = this.getPosition(columnName, table, values);
            if (position == -1) continue;
            Object value = column.getValue(() -> (BaseConnection)this.connection.connection(), this.connectorConfig.includeUnknownDatatypes());
            if (sourceOfToasted) {
                this.cachedOldToastedValues.put(columnName, value);
            } else if (UnchangedToastedReplicationMessageColumn.isUnchangedToastedValue(value) && (candidate = this.cachedOldToastedValues.get(columnName)) != null) {
                value = candidate;
            }
            values[position] = value;
        }
        return values;
    }

    private int getPosition(String columnName, Table table, Object[] values) {
        Column tableColumn = table.columnWithName(columnName);
        if (tableColumn == null) {
            LOGGER.warn("Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.", (Object)columnName);
            return -1;
        }
        int position = tableColumn.position() - 1;
        if (position < 0 || position >= values.length) {
            LOGGER.warn("Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.", (Object)columnName);
            return -1;
        }
        return position;
    }

    private Optional<DataCollectionSchema> newTable(TableId tableId) {
        LOGGER.debug("Schema for table '{}' is missing", (Object)tableId);
        this.refreshTableFromDatabase(tableId);
        TableSchema tableSchema = this.schema.schemaFor(tableId);
        if (tableSchema == null) {
            LOGGER.warn("cannot load schema for table '{}'", (Object)tableId);
            return Optional.empty();
        }
        LOGGER.debug("refreshed DB schema to include table '{}'", (Object)tableId);
        return Optional.of(tableSchema);
    }

    private void refreshTableFromDatabase(TableId tableId) {
        try {
            this.schema.refresh(this.connection, tableId, this.connectorConfig.skipRefreshSchemaOnMissingToastableData());
        }
        catch (SQLException e) {
            throw new ConnectException("Database error while refresing table schema", (Throwable)e);
        }
    }

    static Optional<DataCollectionSchema> updateSchema(Partition partition, DataCollectionId tableId, ChangeRecordEmitter<PostgresPartition> changeRecordEmitter) {
        return ((PostgresChangeRecordEmitter)changeRecordEmitter).newTable((TableId)tableId);
    }

    private boolean schemaChanged(List<ReplicationMessage.Column> columns, Table table) {
        boolean msgHasAdditionalColumns;
        int replicationColumnCount;
        boolean msgHasMissingColumns;
        int tableColumnCount = table.columns().size();
        boolean bl = msgHasMissingColumns = tableColumnCount > (replicationColumnCount = columns.size());
        if (msgHasMissingColumns && this.connectorConfig.skipRefreshSchemaOnMissingToastableData()) {
            msgHasMissingColumns = this.hasMissingUntoastedColumns(table, columns);
        }
        boolean bl2 = msgHasAdditionalColumns = tableColumnCount < replicationColumnCount;
        if (msgHasMissingColumns || msgHasAdditionalColumns) {
            LOGGER.info("Different column count {} present in the server message as schema in memory contains {}; refreshing table schema", (Object)replicationColumnCount, (Object)tableColumnCount);
            return true;
        }
        return columns.stream().anyMatch(message -> {
            boolean incomingOptional;
            int incomingScale;
            int incomingLength;
            int incomingRootType;
            int incomingType;
            String columnName = message.getName();
            Column column = table.columnWithName(columnName);
            if (column == null) {
                LOGGER.info("found new column '{}' present in the server message which is not part of the table metadata; refreshing table schema", (Object)columnName);
                return true;
            }
            int localType = column.nativeType();
            if (localType != (incomingType = message.getType().getOid()) && localType != (incomingRootType = message.getType().getRootType().getOid())) {
                LOGGER.info("detected new type for column '{}', old type was {} ({}), new type is {} ({}); refreshing table schema", new Object[]{columnName, localType, column.typeName(), incomingType, message.getType().getName()});
                return true;
            }
            int localLength = column.length();
            if (localLength != (incomingLength = message.getTypeMetadata().getLength())) {
                LOGGER.info("detected new length for column '{}', old length was {}, new length is {}; refreshing table schema", new Object[]{columnName, localLength, incomingLength});
                return true;
            }
            int localScale = column.scale().orElseGet(() -> 0);
            if (localScale != (incomingScale = message.getTypeMetadata().getScale())) {
                LOGGER.info("detected new scale for column '{}', old scale was {}, new scale is {}; refreshing table schema", new Object[]{columnName, localScale, incomingScale});
                return true;
            }
            boolean localOptional = column.isOptional();
            if (localOptional != (incomingOptional = message.isOptional())) {
                LOGGER.info("detected new optional status for column '{}', old value was {}, new value is {}; refreshing table schema", new Object[]{columnName, localOptional, incomingOptional});
                return true;
            }
            return false;
        });
    }

    private boolean hasMissingUntoastedColumns(Table table, List<ReplicationMessage.Column> columns) {
        List msgColumnNames = columns.stream().map(ReplicationMessage.Column::getName).collect(Collectors.toList());
        List missingColumnNames = table.columns().stream().filter(c -> !msgColumnNames.contains(c.name())).map(Column::name).collect(Collectors.toList());
        List<String> toastableColumns = this.schema.getToastableColumnsForTableId(table.id());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("msg columns: '{}' --- missing columns: '{}' --- toastableColumns: '{}", new Object[]{String.join((CharSequence)",", msgColumnNames), String.join((CharSequence)",", missingColumnNames), String.join((CharSequence)",", toastableColumns)});
        }
        return !toastableColumns.containsAll(missingColumnNames);
    }

    private Table tableFromFromMessage(List<ReplicationMessage.Column> columns, Table table) {
        TableEditor combinedTable = table.edit().setColumns((Iterable)columns.stream().map(column -> {
            PostgresType type = column.getType();
            ColumnEditor columnEditor = Column.editor().name(column.getName()).jdbcType(type.getRootType().getJdbcId()).type(type.getName()).optional(column.isOptional()).nativeType(type.getRootType().getOid());
            columnEditor.length(column.getTypeMetadata().getLength());
            columnEditor.scale(Integer.valueOf(column.getTypeMetadata().getScale()));
            Optional.ofNullable(table.columnWithName(column.getName())).flatMap(Column::defaultValueExpression).ifPresent(arg_0 -> ((ColumnEditor)columnEditor).defaultValueExpression(arg_0));
            return columnEditor.create();
        }).collect(Collectors.toList()));
        ArrayList pkCandidates = new ArrayList(table.primaryKeyColumnNames());
        Iterator itPkCandidates = pkCandidates.iterator();
        while (itPkCandidates.hasNext()) {
            String candidateName = (String)itPkCandidates.next();
            if (combinedTable.hasUniqueValues() || combinedTable.columnWithName(candidateName) != null) continue;
            LOGGER.error("Potentional inconsistency in key for message {}", columns);
            itPkCandidates.remove();
        }
        combinedTable.setPrimaryKeyNames(pkCandidates);
        return combinedTable.create();
    }

    protected boolean skipEmptyMessages() {
        return true;
    }
}

