/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.RecordsProducer;
import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.data.Envelope;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.util.LoggingContext;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.geometric.PGpoint;

@ThreadSafe
public class RecordsStreamProducer
extends RecordsProducer {
    private static final String CONTEXT_NAME = "records-stream-producer";
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "records-stream-producer-thread"));
    private final ReplicationConnection replicationConnection;
    private final AtomicReference<ReplicationStream> replicationStream = new AtomicReference();

    public RecordsStreamProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo) {
        super(taskContext, sourceInfo);
        try {
            this.replicationConnection = taskContext.createReplicationConnection();
        }
        catch (SQLException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    @Override
    protected void start(Consumer<SourceRecord> recordConsumer) {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            if (this.sourceInfo.hasLastKnownPosition()) {
                Long lsn = this.sourceInfo.lsn();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("retrieved latest position from stored offset '{}'", (Object)ReplicationConnection.format(lsn));
                }
                this.replicationStream.compareAndSet(null, this.replicationConnection.startStreaming(lsn));
            } else {
                this.logger.info("no previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
                this.replicationStream.compareAndSet(null, this.replicationConnection.startStreaming());
            }
            this.taskContext.refreshSchema(true);
            this.executorService.submit(() -> this.streamChanges(recordConsumer));
        }
        catch (Throwable t) {
            throw new ConnectException(t.getCause() != null ? t.getCause() : t);
        }
        finally {
            previousContext.restore();
        }
    }

    private void streamChanges(Consumer<SourceRecord> consumer) {
        ReplicationStream stream = this.replicationStream.get();
        while (!Thread.currentThread().isInterrupted()) {
            try {
                PgProto.RowMessage message = stream.read();
                this.process(message, stream.lastReceivedLSN(), consumer);
            }
            catch (SQLException e) {
                Throwable cause = e.getCause();
                if (cause != null && cause instanceof IOException) {
                    this.logger.warn("Closing replication stream due to db connection IO exception...");
                } else {
                    this.logger.error("unexpected exception while streaming logical changes", (Throwable)e);
                }
                this.taskContext.failTask(e);
                throw new ConnectException((Throwable)e);
            }
        }
    }

    @Override
    protected synchronized void commit() {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            ReplicationStream replicationStream = this.replicationStream.get();
            if (replicationStream != null) {
                this.logger.debug("flushing offsets to server...");
                replicationStream.flushLSN();
            } else {
                this.logger.debug("streaming has already stopped, ignoring commit callback...");
            }
        }
        catch (SQLException e) {
            throw new ConnectException((Throwable)e);
        }
        finally {
            previousContext.restore();
        }
    }

    @Override
    protected synchronized void stop() {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            if (this.replicationStream.get() == null) {
                this.logger.debug("already stopped....");
                return;
            }
            if (this.replicationConnection != null) {
                this.logger.debug("stopping streaming...");
                this.replicationConnection.close();
            }
        }
        catch (Exception e) {
            throw new ConnectException(e.getCause() != null ? e.getCause() : e);
        }
        finally {
            this.replicationStream.set(null);
            this.executorService.shutdownNow();
            previousContext.restore();
        }
    }

    private void process(PgProto.RowMessage message, Long lsn, Consumer<SourceRecord> consumer) throws SQLException {
        TableSchema tableSchema;
        if (message == null) {
            return;
        }
        TableId tableId = PostgresSchema.parse(message.getTable());
        assert (tableId != null);
        long commitTimeNs = message.getCommitTime();
        int txId = message.getTransactionId();
        this.sourceInfo.update(lsn, commitTimeNs, txId);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("received new message at position {}\n{}", (Object)ReplicationConnection.format(lsn), (Object)message);
        }
        if ((tableSchema = this.tableSchemaFor(tableId)) == null) {
            return;
        }
        if (tableSchema.keySchema() == null) {
            this.logger.warn("ignoring message for table '{}' because it does not have a primary key defined", (Object)tableId);
        }
        PgProto.Op operation = message.getOp();
        switch (operation) {
            case INSERT: {
                Object[] row = this.columnValues(message.getNewTupleList(), tableId, true);
                this.generateCreateRecord(tableId, row, consumer);
                break;
            }
            case UPDATE: {
                Object[] oldRow = this.columnValues(message.getOldTupleList(), tableId, true);
                Object[] newRow = this.columnValues(message.getNewTupleList(), tableId, true);
                this.generateUpdateRecord(tableId, oldRow, newRow, consumer);
                break;
            }
            case DELETE: {
                Object[] row = this.columnValues(message.getOldTupleList(), tableId, false);
                this.generateDeleteRecord(tableId, row, consumer);
                break;
            }
            default: {
                this.logger.warn("unknown message operation: " + (Object)((Object)operation));
            }
        }
    }

    protected void generateCreateRecord(TableId tableId, Object[] rowData, Consumer<SourceRecord> recordConsumer) {
        if (rowData == null || rowData.length == 0) {
            this.logger.warn("no new values found for table '{}' from update message at '{}';skipping record", (Object)tableId, (Object)this.sourceInfo);
            return;
        }
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        Object key = tableSchema.keyFromColumnData(rowData);
        Struct value = tableSchema.valueFromColumnData(rowData);
        if (key == null || value == null) {
            return;
        }
        Schema keySchema = tableSchema.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor(tableId);
        Envelope envelope = this.createEnvelope(tableSchema, topicName);
        SourceRecord record = new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(), (Object)envelope.create((Object)value, this.sourceInfo.source(), Long.valueOf(this.clock().currentTimeInMillis())));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending create event '{}' to topic '{}'", (Object)record, (Object)topicName);
        }
        recordConsumer.accept(record);
    }

    protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object[] newRowData, Consumer<SourceRecord> recordConsumer) {
        if (newRowData == null || newRowData.length == 0) {
            this.logger.warn("no values found for table '{}' from update message at '{}';skipping record", (Object)tableId, (Object)this.sourceInfo);
            return;
        }
        Schema oldKeySchema = null;
        Struct oldValue = null;
        Object oldKey = null;
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        if (oldRowData != null && oldRowData.length > 0) {
            oldKey = tableSchema.keyFromColumnData(oldRowData);
            oldKeySchema = tableSchema.keySchema();
            oldValue = tableSchema.valueFromColumnData(oldRowData);
        }
        Object newKey = tableSchema.keyFromColumnData(newRowData);
        Struct newValue = tableSchema.valueFromColumnData(newRowData);
        Schema newKeySchema = tableSchema.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor(tableId);
        Envelope envelope = this.createEnvelope(tableSchema, topicName);
        Struct source = this.sourceInfo.source();
        if (oldKey != null && !Objects.equals(oldKey, newKey)) {
            SourceRecord record = new SourceRecord(partition, offset, topicName, null, oldKeySchema, oldKey, envelope.schema(), (Object)envelope.delete((Object)oldValue, source, Long.valueOf(this.clock().currentTimeInMillis())));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("sending delete event '{}' to topic '{}'", (Object)record, (Object)topicName);
            }
            recordConsumer.accept(record);
            record = new SourceRecord(partition, offset, topicName, null, oldKeySchema, oldKey, null, null);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("sending tombstone event '{}' to topic '{}'", (Object)record, (Object)topicName);
            }
            recordConsumer.accept(record);
            record = new SourceRecord(partition, offset, topicName, null, newKeySchema, newKey, envelope.schema(), (Object)envelope.create((Object)newValue, source, Long.valueOf(this.clock().currentTimeInMillis())));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("sending create event '{}' to topic '{}'", (Object)record, (Object)topicName);
            }
            recordConsumer.accept(record);
        } else {
            SourceRecord record = new SourceRecord(partition, offset, topicName, null, newKeySchema, newKey, envelope.schema(), (Object)envelope.update((Object)oldValue, newValue, source, Long.valueOf(this.clock().currentTimeInMillis())));
            recordConsumer.accept(record);
        }
    }

    protected void generateDeleteRecord(TableId tableId, Object[] oldRowData, Consumer<SourceRecord> recordConsumer) {
        if (oldRowData == null || oldRowData.length == 0) {
            this.logger.warn("no values found for table '{}' from update message at '{}';skipping record", (Object)tableId, (Object)this.sourceInfo);
            return;
        }
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        Object key = tableSchema.keyFromColumnData(oldRowData);
        Struct value = tableSchema.valueFromColumnData(oldRowData);
        if (key == null || value == null) {
            return;
        }
        Schema keySchema = tableSchema.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor(tableId);
        Envelope envelope = this.createEnvelope(tableSchema, topicName);
        SourceRecord record = new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(), (Object)envelope.delete((Object)value, this.sourceInfo.source(), Long.valueOf(this.clock().currentTimeInMillis())));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending delete event '{}' to topic '{}'", (Object)record, (Object)topicName);
        }
        recordConsumer.accept(record);
        record = new SourceRecord(partition, offset, topicName, null, keySchema, key, null, null);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending tombstone event '{}' to topic '{}'", (Object)record, (Object)topicName);
        }
        recordConsumer.accept(record);
    }

    private Object[] columnValues(List<PgProto.DatumMessage> messageList, TableId tableId, boolean refreshSchemaIfChanged) throws SQLException {
        if (messageList == null || messageList.isEmpty()) {
            return null;
        }
        Table table = this.schema().tableFor(tableId);
        assert (table != null);
        if (refreshSchemaIfChanged && this.schemaChanged(messageList, table)) {
            this.schema().refresh(this.taskContext.createConnection(), tableId);
            table = this.schema().tableFor(tableId);
        }
        List columnNames = table.columnNames();
        Object[] values = new Object[messageList.size()];
        messageList.forEach(message -> {
            int position = columnNames.indexOf(message.getColumnName());
            assert (position >= 0);
            values[position] = this.extractValueFromMessage((PgProto.DatumMessage)message);
        });
        return values;
    }

    private boolean schemaChanged(List<PgProto.DatumMessage> messageList, Table table) {
        List columnNames = table.columnNames();
        int messagesCount = messageList.size();
        if (columnNames.size() != messagesCount) {
            return true;
        }
        return messageList.stream().filter(message -> {
            String columnName = message.getColumnName();
            Column column = table.columnWithName(columnName);
            if (column == null) {
                this.logger.debug("found new column '{}' present in the server message which is not part of the table metadata; refreshing table schema", (Object)columnName);
                return true;
            }
            if (!this.schema().isType(column.typeName(), column.jdbcType())) {
                this.logger.debug("detected new type for column '{}', old type was '{}', new type is '{}'; refreshing table schema", new Object[]{columnName, column.jdbcType(), message.getColumnType()});
                return true;
            }
            return false;
        }).findFirst().isPresent();
    }

    private TableSchema tableSchemaFor(TableId tableId) throws SQLException {
        PostgresSchema schema = this.schema();
        if (schema.isFilteredOut(tableId)) {
            this.logger.debug("table '{}' is filtered out, ignoring", (Object)tableId);
            return null;
        }
        TableSchema tableSchema = schema.schemaFor(tableId);
        if (tableSchema != null) {
            return tableSchema;
        }
        schema.refresh(this.taskContext.createConnection(), tableId);
        tableSchema = schema.schemaFor(tableId);
        if (tableSchema == null) {
            this.logger.warn("cannot load schema for table '{}'", (Object)tableId);
            return null;
        }
        this.logger.debug("refreshed DB schema to include table '{}'", (Object)tableId);
        return tableSchema;
    }

    protected Object extractValueFromMessage(PgProto.DatumMessage datumMessage) {
        int columnType = (int)datumMessage.getColumnType();
        switch (columnType) {
            case 16: {
                return datumMessage.hasDatumBool() ? Boolean.valueOf(datumMessage.getDatumBool()) : null;
            }
            case 21: 
            case 23: {
                return datumMessage.hasDatumInt32() ? Integer.valueOf(datumMessage.getDatumInt32()) : null;
            }
            case 20: 
            case 26: 
            case 790: {
                return datumMessage.hasDatumInt64() ? Long.valueOf(datumMessage.getDatumInt64()) : null;
            }
            case 700: {
                return datumMessage.hasDatumFloat() ? Float.valueOf(datumMessage.getDatumFloat()) : null;
            }
            case 701: 
            case 1700: {
                return datumMessage.hasDatumDouble() ? Double.valueOf(datumMessage.getDatumDouble()) : null;
            }
            case 18: 
            case 25: 
            case 114: 
            case 142: 
            case 1042: 
            case 1043: 
            case 1560: 
            case 1562: 
            case 2950: 
            case 3802: {
                return datumMessage.hasDatumString() ? datumMessage.getDatumString() : null;
            }
            case 1082: {
                return datumMessage.hasDatumInt32() ? Long.valueOf(datumMessage.getDatumInt32()) : null;
            }
            case 1083: 
            case 1114: 
            case 1184: {
                if (!datumMessage.hasDatumInt64()) {
                    return null;
                }
                return TimeUnit.NANOSECONDS.convert(datumMessage.getDatumInt64(), TimeUnit.MICROSECONDS);
            }
            case 1266: {
                if (!datumMessage.hasDatumDouble()) {
                    return null;
                }
                return BigDecimal.valueOf(datumMessage.getDatumDouble() * 1000.0).longValue();
            }
            case 1186: {
                return datumMessage.hasDatumDouble() ? Double.valueOf(datumMessage.getDatumDouble()) : null;
            }
            case 17: {
                return datumMessage.hasDatumBytes() ? datumMessage.getDatumBytes().toByteArray() : null;
            }
            case 600: {
                PgProto.Point datumPoint = datumMessage.getDatumPoint();
                return new PGpoint(datumPoint.getX(), datumPoint.getY());
            }
            case 3910: {
                return datumMessage.hasDatumBytes() ? new String(datumMessage.getDatumBytes().toByteArray(), Charset.forName("UTF-8")) : null;
            }
        }
        this.logger.warn("processing column '{}' with unknown data type '{}' as byte array", (Object)datumMessage.getColumnName(), (Object)datumMessage.getColumnType());
        return datumMessage.hasDatumBytes() ? datumMessage.getDatumBytes().toByteArray() : null;
    }
}

