/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.ChangeEvent;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.RecordsProducer;
import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.data.Envelope;
import io.debezium.function.BlockingConsumer;
import io.debezium.function.Predicates;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;

@ThreadSafe
public class RecordsStreamProducer
extends RecordsProducer {
    private static final String CONTEXT_NAME = "records-stream-producer";
    private final ExecutorService executorService;
    private ReplicationConnection replicationConnection;
    private final AtomicReference<ReplicationStream> replicationStream;
    private final AtomicBoolean cleanupExecuted = new AtomicBoolean();
    private PgConnection typeResolverConnection = null;
    private Long lastCompletelyProcessedLsn;
    private final AtomicLong lastCommittedLsn = new AtomicLong(-1L);
    private final Metronome pauseNoMessage;
    private final Heartbeat heartbeat;

    public RecordsStreamProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo, ReplicationConnection replicationConnection) {
        super(taskContext, sourceInfo);
        this.executorService = Threads.newSingleThreadExecutor(PostgresConnector.class, (String)taskContext.config().getLogicalName(), (String)CONTEXT_NAME);
        this.replicationStream = new AtomicReference();
        this.replicationConnection = replicationConnection;
        this.heartbeat = Heartbeat.create((Configuration)taskContext.config().getConfig(), (String)taskContext.topicSelector().getHeartbeatTopic(), (String)taskContext.config().getLogicalName());
        this.pauseNoMessage = Metronome.sleeper((Duration)taskContext.getConfig().getPollInterval(), (Clock)Clock.SYSTEM);
    }

    public RecordsStreamProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo) {
        this(taskContext, sourceInfo, null);
        try {
            this.replicationConnection = taskContext.createReplicationConnection(false);
        }
        catch (SQLException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    @Override
    protected synchronized void start(BlockingConsumer<ChangeEvent> eventConsumer, Consumer<Throwable> failureConsumer) {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            if (this.executorService.isShutdown()) {
                this.logger.info("Streaming will not start, stop already requested");
                return;
            }
            if (this.sourceInfo.hasLastKnownPosition()) {
                Long lsn = this.sourceInfo.lsn();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("retrieved latest position from stored offset '{}'", (Object)ReplicationConnection.format(lsn));
                }
                this.replicationStream.compareAndSet(null, this.replicationConnection.startStreaming(lsn));
            } else {
                this.logger.info("no previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
                this.replicationStream.compareAndSet(null, this.replicationConnection.startStreaming());
            }
            this.replicationStream.get().startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, (String)this.taskContext.config().getLogicalName(), (String)"records-stream-producer-keep-alive"));
            this.taskContext.refreshSchema(true);
            this.taskContext.schema().assureNonEmptySchema();
            this.lastCompletelyProcessedLsn = this.sourceInfo.lsn();
            this.executorService.submit(() -> this.streamChanges(eventConsumer, failureConsumer));
        }
        catch (Throwable t) {
            throw new ConnectException(t.getCause() != null ? t.getCause() : t);
        }
        finally {
            previousContext.restore();
        }
    }

    private void streamChanges(BlockingConsumer<ChangeEvent> consumer, Consumer<Throwable> failureConsumer) {
        ReplicationStream stream = this.replicationStream.get();
        stream.stopKeepAlive();
        while (!Thread.currentThread().isInterrupted()) {
            try {
                this.flushLatestCommittedLsn(stream);
                if (stream.readPending(x -> this.process(x, stream.lastReceivedLsn(), consumer))) continue;
                if (this.lastCompletelyProcessedLsn != null) {
                    this.heartbeat.heartbeat(this.sourceInfo.partition(), this.sourceInfo.offset(), r -> consumer.accept((Object)new ChangeEvent((SourceRecord)r, this.lastCompletelyProcessedLsn)));
                }
                this.pauseNoMessage.pause();
            }
            catch (SQLException e) {
                Throwable cause = e.getCause();
                if (cause != null && cause instanceof IOException) {
                    this.logger.warn("Closing replication stream due to db connection IO exception...");
                } else {
                    this.logger.error("unexpected exception while streaming logical changes", (Throwable)e);
                }
                failureConsumer.accept(e);
                throw new ConnectException((Throwable)e);
            }
            catch (InterruptedException e) {
                this.logger.info("Interrupted from sleep, producer termination requested");
                Thread.currentThread().interrupt();
            }
            catch (Throwable e) {
                this.logger.error("unexpected exception while streaming logical changes", e);
                failureConsumer.accept(e);
                throw new ConnectException(e);
            }
        }
    }

    private void flushLatestCommittedLsn(ReplicationStream stream) throws SQLException {
        long newLsn = this.lastCommittedLsn.getAndSet(-1L);
        if (newLsn != -1L) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Flushing LSN to server: {}", (Object)LogSequenceNumber.valueOf((long)newLsn));
            }
            stream.flushLsn(newLsn);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected synchronized void commit(long lsn) {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Flushing of LSN '{}' requested", (Object)LogSequenceNumber.valueOf((long)lsn));
            }
            this.lastCommittedLsn.set(lsn);
        }
        finally {
            previousContext.restore();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected synchronized void stop() {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            if (!this.cleanupExecuted.compareAndSet(false, true)) {
                this.logger.debug("already stopped....");
                return;
            }
            ReplicationStream stream = this.replicationStream.get();
            if (stream != null) {
                try {
                    this.flushLatestCommittedLsn(stream);
                }
                catch (SQLException e) {
                    this.logger.error("Failed to execute the final LSN flush", (Throwable)e);
                }
                stream.stopKeepAlive();
            }
            this.closeConnections();
        }
        finally {
            this.replicationStream.set(null);
            this.executorService.shutdownNow();
            previousContext.restore();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnections() {
        block14: {
            Exception closingException = null;
            try {
                if (this.replicationConnection != null) {
                    this.logger.debug("stopping streaming...");
                    this.replicationConnection.close();
                }
            }
            catch (Exception e) {
                closingException = e;
            }
            finally {
                try {
                    if (this.typeResolverConnection != null) {
                        this.typeResolverConnection.close();
                    }
                }
                catch (Exception e) {
                    ConnectException rethrown = new ConnectException((Throwable)e);
                    if (closingException != null) {
                        rethrown.addSuppressed((Throwable)closingException);
                    }
                    throw rethrown;
                }
                if (closingException == null) break block14;
                throw new ConnectException((Throwable)closingException);
            }
        }
    }

    private void process(ReplicationMessage message, Long lsn, BlockingConsumer<ChangeEvent> consumer) throws SQLException, InterruptedException {
        TableSchema tableSchema;
        if (message == null) {
            this.logger.trace("Received empty message");
            this.lastCompletelyProcessedLsn = lsn;
            return;
        }
        if (message.isLastEventForLsn()) {
            this.lastCompletelyProcessedLsn = lsn;
        }
        TableId tableId = PostgresSchema.parse(message.getTable());
        assert (tableId != null);
        Instant commitTime = message.getCommitTime();
        long txId = message.getTransactionId();
        this.sourceInfo.update(lsn, commitTime, txId, tableId, this.taskContext.getSlotXmin());
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("received new message at position {}\n{}", (Object)ReplicationConnection.format(lsn), (Object)message);
        }
        if ((tableSchema = this.tableSchemaFor(tableId)) != null) {
            ReplicationMessage.Operation operation = message.getOperation();
            switch (operation) {
                case INSERT: {
                    Object[] row = this.columnValues(message.getNewTupleList(), tableId, message.shouldSchemaBeSynchronized(), message.hasTypeMetadata());
                    this.generateCreateRecord(tableId, row, consumer);
                    break;
                }
                case UPDATE: {
                    Object[] newRow = this.columnValues(message.getNewTupleList(), tableId, message.shouldSchemaBeSynchronized(), message.hasTypeMetadata());
                    Object[] oldRow = this.columnValues(message.getOldTupleList(), tableId, false, message.hasTypeMetadata());
                    this.generateUpdateRecord(tableId, oldRow, newRow, consumer);
                    break;
                }
                case DELETE: {
                    Object[] row = this.columnValues(message.getOldTupleList(), tableId, false, message.hasTypeMetadata());
                    this.generateDeleteRecord(tableId, row, consumer);
                    break;
                }
                default: {
                    this.logger.warn("unknown message operation: {}", (Object)operation);
                }
            }
        }
    }

    protected void generateCreateRecord(TableId tableId, Object[] rowData, BlockingConsumer<ChangeEvent> recordConsumer) throws InterruptedException {
        if (rowData == null || rowData.length == 0) {
            this.logger.warn("no new values found for table '{}' from update message at '{}'; skipping record", (Object)tableId, (Object)this.sourceInfo);
            return;
        }
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        Object key = tableSchema.keyFromColumnData(rowData);
        this.logger.trace("key value is: {}", key);
        Struct value = tableSchema.valueFromColumnData(rowData);
        if (value == null) {
            this.logger.warn("no values found for table '{}' from create message at '{}'; skipping record", (Object)tableId, (Object)this.sourceInfo);
            return;
        }
        Schema keySchema = tableSchema.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor((DataCollectionId)tableId);
        Envelope envelope = tableSchema.getEnvelopeSchema();
        SourceRecord record = new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(), (Object)envelope.create((Object)value, this.sourceInfo.struct(), Long.valueOf(this.clock().currentTimeInMillis())));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending create event '{}' to topic '{}'", (Object)record, (Object)topicName);
        }
        recordConsumer.accept((Object)new ChangeEvent(record, this.lastCompletelyProcessedLsn));
    }

    protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object[] newRowData, BlockingConsumer<ChangeEvent> recordConsumer) throws InterruptedException {
        if (newRowData == null || newRowData.length == 0) {
            this.logger.warn("no values found for table '{}' from update message at '{}'; skipping record", (Object)tableId, (Object)this.sourceInfo);
            return;
        }
        Schema oldKeySchema = null;
        Struct oldValue = null;
        Object oldKey = null;
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        if (oldRowData != null && oldRowData.length > 0) {
            oldKey = tableSchema.keyFromColumnData(oldRowData);
            oldKeySchema = tableSchema.keySchema();
            oldValue = tableSchema.valueFromColumnData(oldRowData);
        }
        Object newKey = tableSchema.keyFromColumnData(newRowData);
        Struct newValue = tableSchema.valueFromColumnData(newRowData);
        Schema newKeySchema = tableSchema.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor((DataCollectionId)tableId);
        Envelope envelope = tableSchema.getEnvelopeSchema();
        Struct source = this.sourceInfo.struct();
        if (oldKey != null && !Objects.equals(oldKey, newKey)) {
            ChangeEvent changeEvent = new ChangeEvent(new SourceRecord(partition, offset, topicName, null, oldKeySchema, oldKey, envelope.schema(), (Object)envelope.delete((Object)oldValue, source, Long.valueOf(this.clock().currentTimeInMillis()))), this.lastCompletelyProcessedLsn);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("sending delete event '{}' to topic '{}'", (Object)changeEvent.getRecord(), (Object)topicName);
            }
            recordConsumer.accept((Object)changeEvent);
            if (this.taskContext.config().isEmitTombstoneOnDelete()) {
                changeEvent = new ChangeEvent(new SourceRecord(partition, offset, topicName, null, oldKeySchema, oldKey, null, null), this.lastCompletelyProcessedLsn);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("sending tombstone event '{}' to topic '{}'", (Object)changeEvent.getRecord(), (Object)topicName);
                }
                recordConsumer.accept((Object)changeEvent);
            }
            changeEvent = new ChangeEvent(new SourceRecord(partition, offset, topicName, null, newKeySchema, newKey, envelope.schema(), (Object)envelope.create((Object)newValue, source, Long.valueOf(this.clock().currentTimeInMillis()))), this.lastCompletelyProcessedLsn);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("sending create event '{}' to topic '{}'", (Object)changeEvent.getRecord(), (Object)topicName);
            }
            recordConsumer.accept((Object)changeEvent);
        } else {
            SourceRecord record = new SourceRecord(partition, offset, topicName, null, newKeySchema, newKey, envelope.schema(), (Object)envelope.update((Object)oldValue, newValue, source, Long.valueOf(this.clock().currentTimeInMillis())));
            recordConsumer.accept((Object)new ChangeEvent(record, this.lastCompletelyProcessedLsn));
        }
    }

    protected void generateDeleteRecord(TableId tableId, Object[] oldRowData, BlockingConsumer<ChangeEvent> recordConsumer) throws InterruptedException {
        if (oldRowData == null || oldRowData.length == 0) {
            this.logger.warn("no values found for table '{}' from delete message at '{}'; skipping record", (Object)tableId, (Object)this.sourceInfo);
            return;
        }
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        Object key = tableSchema.keyFromColumnData(oldRowData);
        Struct value = tableSchema.valueFromColumnData(oldRowData);
        if (value == null) {
            this.logger.warn("ignoring delete message for table '{}' because it does not have a primary key defined and replica identity for the table is not FULL", (Object)tableId);
            return;
        }
        Schema keySchema = tableSchema.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor((DataCollectionId)tableId);
        Envelope envelope = tableSchema.getEnvelopeSchema();
        ChangeEvent changeEvent = new ChangeEvent(new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(), (Object)envelope.delete((Object)value, this.sourceInfo.struct(), Long.valueOf(this.clock().currentTimeInMillis()))), this.lastCompletelyProcessedLsn);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending delete event '{}' to topic '{}'", (Object)changeEvent.getRecord(), (Object)topicName);
        }
        recordConsumer.accept((Object)changeEvent);
        if (this.taskContext.config().isEmitTombstoneOnDelete()) {
            changeEvent = new ChangeEvent(new SourceRecord(partition, offset, topicName, null, keySchema, key, null, null), this.lastCompletelyProcessedLsn);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("sending tombstone event '{}' to topic '{}'", (Object)changeEvent.getRecord(), (Object)topicName);
            }
            recordConsumer.accept((Object)changeEvent);
        }
    }

    private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId tableId, boolean refreshSchemaIfChanged, boolean metadataInMessage) throws SQLException {
        if (columns == null || columns.isEmpty()) {
            return null;
        }
        Table table = this.schema().tableFor(tableId);
        assert (table != null);
        if (refreshSchemaIfChanged && this.schemaChanged(columns, table, metadataInMessage)) {
            try (PostgresConnection connection = this.taskContext.createConnection();){
                this.schema().refresh(connection, tableId, this.taskContext.config().skipRefreshSchemaOnMissingToastableData());
                if (metadataInMessage) {
                    this.schema().refresh(this.tableFromFromMessage(columns, this.schema().tableFor(tableId)));
                }
                table = this.schema().tableFor(tableId);
            }
        }
        List schemaColumns = table.columns();
        List columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn)).collect(Collectors.toList());
        Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size() ? schemaColumns.size() : columnsWithoutToasted.size()];
        for (ReplicationMessage.Column column : columnsWithoutToasted) {
            String columnName = Strings.unquoteIdentifierPart((String)column.getName());
            Column tableColumn = table.columnWithName(columnName);
            if (tableColumn == null) {
                this.logger.warn("Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.", (Object)column.getName());
                continue;
            }
            int position = tableColumn.position() - 1;
            if (position < 0 || position >= values.length) {
                this.logger.warn("Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.", (Object)column.getName());
                continue;
            }
            values[position] = column.getValue(this::typeResolverConnection, this.taskContext.config().includeUnknownDatatypes());
        }
        return values;
    }

    private boolean schemaChanged(List<ReplicationMessage.Column> columns, Table table, boolean metadataInMessage) {
        boolean msgHasAdditionalColumns;
        int replicationColumnCount;
        boolean msgHasMissingColumns;
        int tableColumnCount = table.columns().size();
        boolean bl = msgHasMissingColumns = tableColumnCount > (replicationColumnCount = columns.size());
        if (msgHasMissingColumns && this.taskContext.config().skipRefreshSchemaOnMissingToastableData()) {
            msgHasMissingColumns = this.hasMissingUntoastedColumns(table, columns);
        }
        boolean bl2 = msgHasAdditionalColumns = tableColumnCount < replicationColumnCount;
        if (msgHasMissingColumns || msgHasAdditionalColumns) {
            this.logger.info("Different column count {} present in the server message as schema in memory contains {}; refreshing table schema", (Object)replicationColumnCount, (Object)tableColumnCount);
            return true;
        }
        return columns.stream().filter(message -> {
            int incomingType;
            String columnName = message.getName();
            Column column = table.columnWithName(columnName);
            if (column == null) {
                this.logger.info("found new column '{}' present in the server message which is not part of the table metadata; refreshing table schema", (Object)columnName);
                return true;
            }
            int localType = column.nativeType();
            if (localType != (incomingType = message.getType().getOid())) {
                this.logger.info("detected new type for column '{}', old type was {} ({}), new type is {} ({}); refreshing table schema", new Object[]{columnName, localType, column.typeName(), incomingType, message.getType().getName()});
                return true;
            }
            if (metadataInMessage) {
                boolean incomingOptional;
                int incomingScale;
                int incomingLength;
                int localLength = column.length();
                if (localLength != (incomingLength = message.getTypeMetadata().getLength())) {
                    this.logger.info("detected new length for column '{}', old length was {}, new length is {}; refreshing table schema", new Object[]{columnName, localLength, incomingLength});
                    return true;
                }
                int localScale = (Integer)column.scale().get();
                if (localScale != (incomingScale = message.getTypeMetadata().getScale())) {
                    this.logger.info("detected new scale for column '{}', old scale was {}, new scale is {}; refreshing table schema", new Object[]{columnName, localScale, incomingScale});
                    return true;
                }
                boolean localOptional = column.isOptional();
                if (localOptional != (incomingOptional = message.isOptional())) {
                    this.logger.info("detected new optional status for column '{}', old value was {}, new value is {}; refreshing table schema", new Object[]{columnName, localOptional, incomingOptional});
                    return true;
                }
            }
            return false;
        }).findFirst().isPresent();
    }

    private boolean hasMissingUntoastedColumns(Table table, List<ReplicationMessage.Column> columns) {
        List msgColumnNames = columns.stream().map(ReplicationMessage.Column::getName).collect(Collectors.toList());
        List missingColumnNames = table.columns().stream().filter(c -> !msgColumnNames.contains(c.name())).map(Column::name).collect(Collectors.toList());
        List<String> toastableColumns = this.schema().getToastableColumnsForTableId(table.id());
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("msg columns: '{}' --- missing columns: '{}' --- toastableColumns: '{}", new Object[]{String.join((CharSequence)",", msgColumnNames), String.join((CharSequence)",", missingColumnNames), String.join((CharSequence)",", toastableColumns)});
        }
        return !toastableColumns.containsAll(missingColumnNames);
    }

    private TableSchema tableSchemaFor(TableId tableId) throws SQLException {
        PostgresSchema schema = this.schema();
        if (schema.isFilteredOut(tableId)) {
            this.logger.debug("table '{}' is filtered out, ignoring", (Object)tableId);
            return null;
        }
        TableSchema tableSchema = schema.schemaFor(tableId);
        if (tableSchema != null) {
            return tableSchema;
        }
        try (PostgresConnection connection = this.taskContext.createConnection();){
            schema.refresh(connection, tableId, this.taskContext.config().skipRefreshSchemaOnMissingToastableData());
        }
        tableSchema = schema.schemaFor(tableId);
        if (tableSchema == null) {
            this.logger.warn("cannot load schema for table '{}'", (Object)tableId);
            return null;
        }
        this.logger.debug("refreshed DB schema to include table '{}'", (Object)tableId);
        return tableSchema;
    }

    private synchronized PgConnection typeResolverConnection() throws SQLException {
        if (this.typeResolverConnection == null) {
            this.typeResolverConnection = (PgConnection)this.taskContext.createConnection().connection();
        }
        return this.typeResolverConnection;
    }

    private Table tableFromFromMessage(List<ReplicationMessage.Column> columns, Table table) {
        TableEditor combinedTable = table.edit().setColumns((Iterable)columns.stream().map(column -> {
            PostgresType type = column.getType();
            ColumnEditor columnEditor = Column.editor().name(column.getName()).jdbcType(type.getJdbcId()).type(type.getName()).optional(column.isOptional()).nativeType(type.getOid());
            columnEditor.length(column.getTypeMetadata().getLength());
            columnEditor.scale(Integer.valueOf(column.getTypeMetadata().getScale()));
            return columnEditor.create();
        }).collect(Collectors.toList()));
        List pkCandidates = table.filterColumnNames(c -> table.isPrimaryKeyColumn(c.name()));
        Iterator itPkCandidates = pkCandidates.iterator();
        while (itPkCandidates.hasNext()) {
            String candidateName = (String)itPkCandidates.next();
            if (combinedTable.hasUniqueValues() || combinedTable.columnWithName(candidateName) != null) continue;
            this.logger.error("Potentional inconsistency in key for message {}", columns);
            itPkCandidates.remove();
        }
        combinedTable.setPrimaryKeyNames(pkCandidates);
        return combinedTable.create();
    }

    boolean isStreamingRunning() {
        return this.replicationStream.get() != null;
    }

    @FunctionalInterface
    public static interface PgConnectionSupplier {
        public PgConnection get() throws SQLException;
    }
}

