/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.PgOid;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.RecordsProducer;
import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.data.Envelope;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.util.LoggingContext;
import io.debezium.util.Strings;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.jdbc.PgConnection;

@ThreadSafe
public class RecordsStreamProducer
extends RecordsProducer {
    private static final String CONTEXT_NAME = "records-stream-producer";
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "records-stream-producer-thread"));
    private final ReplicationConnection replicationConnection;
    private final AtomicReference<ReplicationStream> replicationStream = new AtomicReference();
    private PgConnection typeResolverConnection = null;

    public RecordsStreamProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo) {
        super(taskContext, sourceInfo);
        try {
            this.replicationConnection = taskContext.createReplicationConnection();
        }
        catch (SQLException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    @Override
    protected synchronized void start(Consumer<SourceRecord> recordConsumer) {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            if (this.executorService.isShutdown()) {
                this.logger.info("Streaming will not start, stop already requested");
                return;
            }
            if (this.sourceInfo.hasLastKnownPosition()) {
                Long lsn = this.sourceInfo.lsn();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("retrieved latest position from stored offset '{}'", (Object)ReplicationConnection.format(lsn));
                }
                this.replicationStream.compareAndSet(null, this.replicationConnection.startStreaming(lsn));
            } else {
                this.logger.info("no previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
                this.replicationStream.compareAndSet(null, this.replicationConnection.startStreaming());
            }
            this.taskContext.refreshSchema(true);
            this.executorService.submit(() -> this.streamChanges(recordConsumer));
        }
        catch (Throwable t) {
            throw new ConnectException(t.getCause() != null ? t.getCause() : t);
        }
        finally {
            previousContext.restore();
        }
    }

    private void streamChanges(Consumer<SourceRecord> consumer) {
        ReplicationStream stream = this.replicationStream.get();
        while (!Thread.currentThread().isInterrupted()) {
            try {
                stream.read(x -> this.process(x, stream.lastReceivedLSN(), consumer));
            }
            catch (SQLException e) {
                Throwable cause = e.getCause();
                if (cause != null && cause instanceof IOException) {
                    this.logger.warn("Closing replication stream due to db connection IO exception...");
                } else {
                    this.logger.error("unexpected exception while streaming logical changes", (Throwable)e);
                }
                this.taskContext.failTask(e);
                throw new ConnectException((Throwable)e);
            }
            catch (Throwable e) {
                this.logger.error("unexpected exception while streaming logical changes", e);
                this.taskContext.failTask(e);
                throw new ConnectException(e);
            }
        }
    }

    @Override
    protected synchronized void commit() {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            ReplicationStream replicationStream = this.replicationStream.get();
            if (replicationStream != null) {
                this.logger.debug("flushing offsets to server...");
                replicationStream.flushLSN();
            } else {
                this.logger.debug("streaming has already stopped, ignoring commit callback...");
            }
        }
        catch (SQLException e) {
            throw new ConnectException((Throwable)e);
        }
        finally {
            previousContext.restore();
        }
    }

    @Override
    protected synchronized void stop() {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            if (this.replicationStream.get() == null) {
                this.logger.debug("already stopped....");
                return;
            }
            this.closeConnections();
        }
        finally {
            this.replicationStream.set(null);
            this.executorService.shutdownNow();
            previousContext.restore();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnections() {
        block14: {
            Exception closingException = null;
            try {
                if (this.replicationConnection != null) {
                    this.logger.debug("stopping streaming...");
                    this.replicationConnection.close();
                }
            }
            catch (Exception e) {
                closingException = e;
            }
            finally {
                try {
                    if (this.typeResolverConnection != null) {
                        this.typeResolverConnection.close();
                    }
                }
                catch (Exception e) {
                    ConnectException rethrown = new ConnectException((Throwable)e);
                    if (closingException != null) {
                        rethrown.addSuppressed((Throwable)closingException);
                    }
                    throw rethrown;
                }
                if (closingException == null) break block14;
                throw new ConnectException((Throwable)closingException);
            }
        }
    }

    private void process(ReplicationMessage message, Long lsn, Consumer<SourceRecord> consumer) throws SQLException {
        TableSchema tableSchema;
        if (message == null) {
            return;
        }
        TableId tableId = PostgresSchema.parse(message.getTable());
        assert (tableId != null);
        long commitTimeNs = message.getCommitTime();
        int txId = message.getTransactionId();
        this.sourceInfo.update(lsn, commitTimeNs, txId);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("received new message at position {}\n{}", (Object)ReplicationConnection.format(lsn), (Object)message);
        }
        if ((tableSchema = this.tableSchemaFor(tableId)) == null) {
            return;
        }
        if (tableSchema.keySchema() == null) {
            this.logger.warn("ignoring message for table '{}' because it does not have a primary key defined", (Object)tableId);
        }
        ReplicationMessage.Operation operation = message.getOperation();
        switch (operation) {
            case INSERT: {
                Object[] row = this.columnValues(message.getNewTupleList(), tableId, true, message.hasMetadata());
                this.generateCreateRecord(tableId, row, consumer);
                break;
            }
            case UPDATE: {
                Object[] newRow = this.columnValues(message.getNewTupleList(), tableId, true, message.hasMetadata());
                Object[] oldRow = this.columnValues(message.getOldTupleList(), tableId, false, message.hasMetadata());
                this.generateUpdateRecord(tableId, oldRow, newRow, consumer);
                break;
            }
            case DELETE: {
                Object[] row = this.columnValues(message.getOldTupleList(), tableId, false, message.hasMetadata());
                this.generateDeleteRecord(tableId, row, consumer);
                break;
            }
            default: {
                this.logger.warn("unknown message operation: " + (Object)((Object)operation));
            }
        }
    }

    protected void generateCreateRecord(TableId tableId, Object[] rowData, Consumer<SourceRecord> recordConsumer) {
        if (rowData == null || rowData.length == 0) {
            this.logger.warn("no new values found for table '{}' from update message at '{}';skipping record", (Object)tableId, (Object)this.sourceInfo);
            return;
        }
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        Object key = tableSchema.keyFromColumnData(rowData);
        Struct value = tableSchema.valueFromColumnData(rowData);
        if (key == null || value == null) {
            return;
        }
        Schema keySchema = tableSchema.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor(tableId);
        Envelope envelope = this.createEnvelope(tableSchema, topicName);
        SourceRecord record = new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(), (Object)envelope.create((Object)value, this.sourceInfo.source(), Long.valueOf(this.clock().currentTimeInMillis())));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending create event '{}' to topic '{}'", (Object)record, (Object)topicName);
        }
        recordConsumer.accept(record);
    }

    protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object[] newRowData, Consumer<SourceRecord> recordConsumer) {
        if (newRowData == null || newRowData.length == 0) {
            this.logger.warn("no values found for table '{}' from update message at '{}';skipping record", (Object)tableId, (Object)this.sourceInfo);
            return;
        }
        Schema oldKeySchema = null;
        Struct oldValue = null;
        Object oldKey = null;
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        if (oldRowData != null && oldRowData.length > 0) {
            oldKey = tableSchema.keyFromColumnData(oldRowData);
            oldKeySchema = tableSchema.keySchema();
            oldValue = tableSchema.valueFromColumnData(oldRowData);
        }
        Object newKey = tableSchema.keyFromColumnData(newRowData);
        Struct newValue = tableSchema.valueFromColumnData(newRowData);
        Schema newKeySchema = tableSchema.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor(tableId);
        Envelope envelope = this.createEnvelope(tableSchema, topicName);
        Struct source = this.sourceInfo.source();
        if (oldKey != null && !Objects.equals(oldKey, newKey)) {
            SourceRecord record = new SourceRecord(partition, offset, topicName, null, oldKeySchema, oldKey, envelope.schema(), (Object)envelope.delete((Object)oldValue, source, Long.valueOf(this.clock().currentTimeInMillis())));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("sending delete event '{}' to topic '{}'", (Object)record, (Object)topicName);
            }
            recordConsumer.accept(record);
            record = new SourceRecord(partition, offset, topicName, null, oldKeySchema, oldKey, null, null);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("sending tombstone event '{}' to topic '{}'", (Object)record, (Object)topicName);
            }
            recordConsumer.accept(record);
            record = new SourceRecord(partition, offset, topicName, null, newKeySchema, newKey, envelope.schema(), (Object)envelope.create((Object)newValue, source, Long.valueOf(this.clock().currentTimeInMillis())));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("sending create event '{}' to topic '{}'", (Object)record, (Object)topicName);
            }
            recordConsumer.accept(record);
        } else {
            SourceRecord record = new SourceRecord(partition, offset, topicName, null, newKeySchema, newKey, envelope.schema(), (Object)envelope.update((Object)oldValue, newValue, source, Long.valueOf(this.clock().currentTimeInMillis())));
            recordConsumer.accept(record);
        }
    }

    protected void generateDeleteRecord(TableId tableId, Object[] oldRowData, Consumer<SourceRecord> recordConsumer) {
        if (oldRowData == null || oldRowData.length == 0) {
            this.logger.warn("no values found for table '{}' from update message at '{}';skipping record", (Object)tableId, (Object)this.sourceInfo);
            return;
        }
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        Object key = tableSchema.keyFromColumnData(oldRowData);
        Struct value = tableSchema.valueFromColumnData(oldRowData);
        if (key == null || value == null) {
            return;
        }
        Schema keySchema = tableSchema.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor(tableId);
        Envelope envelope = this.createEnvelope(tableSchema, topicName);
        SourceRecord record = new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(), (Object)envelope.delete((Object)value, this.sourceInfo.source(), Long.valueOf(this.clock().currentTimeInMillis())));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending delete event '{}' to topic '{}'", (Object)record, (Object)topicName);
        }
        recordConsumer.accept(record);
        record = new SourceRecord(partition, offset, topicName, null, keySchema, key, null, null);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending tombstone event '{}' to topic '{}'", (Object)record, (Object)topicName);
        }
        recordConsumer.accept(record);
    }

    private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId tableId, boolean refreshSchemaIfChanged, boolean metadataInMessage) throws SQLException {
        if (columns == null || columns.isEmpty()) {
            return null;
        }
        Table table = this.schema().tableFor(tableId);
        assert (table != null);
        if (refreshSchemaIfChanged && this.schemaChanged(columns, table, metadataInMessage)) {
            try (PostgresConnection connection = this.taskContext.createConnection();){
                this.schema().refresh(connection, tableId);
                if (metadataInMessage) {
                    this.schema().refresh(this.tableFromFromMessage(columns, this.schema().tableFor(tableId)));
                }
                table = this.schema().tableFor(tableId);
            }
        }
        List columnNames = table.columnNames();
        Object[] values = new Object[columns.size() < columnNames.size() ? columnNames.size() : columns.size()];
        columns.forEach(message -> {
            String columnName = Strings.unquoteIdentifierPart((String)message.getName());
            int position = columnNames.indexOf(columnName);
            assert (position >= 0);
            values[position] = message.getValue(this::typeResolverConnection, this.taskContext.config().includeUnknownDatatypes());
        });
        return values;
    }

    private boolean schemaChanged(List<ReplicationMessage.Column> columns, Table table, boolean metadataInMessage) {
        List columnNames = table.columnNames();
        int messagesCount = columns.size();
        if (columnNames.size() != messagesCount) {
            return true;
        }
        return columns.stream().filter(message -> {
            int incomingType;
            String columnName = message.getName();
            Column column = table.columnWithName(columnName);
            if (column == null) {
                this.logger.debug("found new column '{}' present in the server message which is not part of the table metadata; refreshing table schema", (Object)columnName);
                return true;
            }
            int localType = metadataInMessage ? column.jdbcType() : PgOid.typeNameToOid(column.typeName());
            int n = incomingType = metadataInMessage ? this.typeNameToJdbcType(message.getTypeMetadata()) : message.getOidType();
            if (localType != incomingType) {
                this.logger.debug("detected new type for column '{}', old type was '{}', new type is '{}'; refreshing table schema", new Object[]{columnName, localType, incomingType});
                return true;
            }
            return false;
        }).findFirst().isPresent();
    }

    private TableSchema tableSchemaFor(TableId tableId) throws SQLException {
        PostgresSchema schema = this.schema();
        if (schema.isFilteredOut(tableId)) {
            this.logger.debug("table '{}' is filtered out, ignoring", (Object)tableId);
            return null;
        }
        TableSchema tableSchema = schema.schemaFor(tableId);
        if (tableSchema != null) {
            return tableSchema;
        }
        try (PostgresConnection connection = this.taskContext.createConnection();){
            schema.refresh(connection, tableId);
        }
        tableSchema = schema.schemaFor(tableId);
        if (tableSchema == null) {
            this.logger.warn("cannot load schema for table '{}'", (Object)tableId);
            return null;
        }
        this.logger.debug("refreshed DB schema to include table '{}'", (Object)tableId);
        return tableSchema;
    }

    private synchronized PgConnection typeResolverConnection() throws SQLException {
        if (this.typeResolverConnection == null) {
            this.typeResolverConnection = (PgConnection)this.taskContext.createConnection().connection();
        }
        return this.typeResolverConnection;
    }

    private Table tableFromFromMessage(List<ReplicationMessage.Column> columns, Table table) {
        return table.edit().setColumns((Iterable)columns.stream().map(column -> {
            ColumnEditor columnEditor = Column.editor().name(column.getName()).jdbcType(column.getOidType() == 2003 ? 2003 : this.typeNameToJdbcType(column.getTypeMetadata())).type(column.getTypeMetadata().getName()).optional(column.isOptional());
            PgOid.reconcileJdbcOidTypeConstraints(column.getTypeMetadata(), columnEditor);
            if (column.getOidType() == 2003) {
                columnEditor.componentType(column.getComponentOidType());
            }
            if (column.getTypeMetadata().getLength().isPresent()) {
                columnEditor.length(column.getTypeMetadata().getLength().getAsInt());
            }
            if (column.getTypeMetadata().getScale().isPresent()) {
                columnEditor.scale(column.getTypeMetadata().getScale().getAsInt());
            }
            return columnEditor.create();
        }).collect(Collectors.toList())).setPrimaryKeyNames(table.filterColumnNames(c -> table.isPrimaryKeyColumn(c.name()))).create();
    }

    private int typeNameToJdbcType(ReplicationMessage.ColumnTypeMetadata columnTypeMetadata) {
        return this.taskContext.schema().columnTypeNameToJdbcTypeId(columnTypeMetadata.getName());
    }

    @FunctionalInterface
    public static interface PgConnectionSupplier {
        public PgConnection get() throws SQLException;
    }
}

