/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.PgOid;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.RecordsProducer;
import io.debezium.connector.postgresql.RecordsStreamProducer;
import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.data.Envelope;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.util.LoggingContext;
import io.debezium.util.Strings;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.util.PGmoney;

@ThreadSafe
public class RecordsSnapshotProducer
extends RecordsProducer {
    private static final String CONTEXT_NAME = "records-snapshot-producer";
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "records-snapshot-producer-thread"));
    private final Optional<RecordsStreamProducer> streamProducer;
    private AtomicReference<SourceRecord> currentRecord = new AtomicReference();

    public RecordsSnapshotProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo, boolean continueStreamingAfterCompletion) {
        super(taskContext, sourceInfo);
        this.streamProducer = continueStreamingAfterCompletion ? Optional.of(new RecordsStreamProducer(taskContext, sourceInfo)) : Optional.empty();
    }

    @Override
    protected void start(Consumer<SourceRecord> recordConsumer) {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            ((CompletableFuture)CompletableFuture.runAsync(() -> this.takeSnapshot(recordConsumer), this.executorService).thenRun(() -> this.startStreaming(recordConsumer))).exceptionally(this::handleException);
        }
        finally {
            previousContext.restore();
        }
    }

    private Void handleException(Throwable t) {
        this.logger.error("unexpected exception", t.getCause() != null ? t.getCause() : t);
        this.stop();
        this.taskContext.failTask(t);
        return null;
    }

    private void startStreaming(Consumer<SourceRecord> consumer) {
        try {
            this.streamProducer.ifPresent(producer -> {
                this.logger.info("Snapshot finished, continuing streaming changes from {}", (Object)ReplicationConnection.format(this.sourceInfo.lsn()));
                producer.start(consumer);
            });
        }
        finally {
            this.cleanup();
        }
    }

    @Override
    protected void commit() {
        this.streamProducer.ifPresent(RecordsStreamProducer::commit);
    }

    @Override
    protected void stop() {
        try {
            this.streamProducer.ifPresent(RecordsStreamProducer::stop);
        }
        finally {
            this.cleanup();
        }
    }

    private void cleanup() {
        this.currentRecord.set(null);
        this.executorService.shutdownNow();
    }

    private void takeSnapshot(Consumer<SourceRecord> consumer) {
        long snapshotStart = this.clock().currentTimeInMillis();
        Connection jdbcConnection = null;
        try (PostgresConnection connection = this.taskContext.createConnection();){
            jdbcConnection = connection.connection();
            String lineSeparator = System.lineSeparator();
            this.logger.info("Step 0: disabling autocommit");
            connection.setAutoCommit(false);
            long lockTimeoutMillis = this.taskContext.config().snapshotLockTimeoutMillis();
            this.logger.info("Step 1: starting transaction and refreshing the DB schemas for database '{}' and user '{}'", (Object)connection.database(), (Object)connection.username());
            StringBuilder statements = new StringBuilder("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;");
            connection.executeWithoutCommitting(statements.toString());
            statements.delete(0, statements.length());
            PostgresSchema schema = this.schema();
            schema.refresh(connection, false);
            this.logger.info("Step 2: locking each of the database tables, waiting a maximum of '{}' seconds for each lock", (Object)((double)lockTimeoutMillis / 1000.0));
            statements.append("SET lock_timeout = ").append(lockTimeoutMillis).append(";").append(lineSeparator);
            schema.tables().forEach(tableId -> statements.append("LOCK TABLE ").append(tableId.toString()).append(" IN SHARE UPDATE EXCLUSIVE MODE;").append(lineSeparator));
            connection.executeWithoutCommitting(statements.toString());
            schema.refresh(connection, false);
            long xlogStart = connection.currentXLogLocation();
            int txId = connection.currentTransactionId().intValue();
            this.logger.info("\t read xlogStart at '{}' from transaction '{}'", (Object)ReplicationConnection.format(xlogStart), (Object)txId);
            this.sourceInfo.startSnapshot();
            this.sourceInfo.update(xlogStart, this.clock().currentTimeInMicros(), txId);
            this.logger.info("Step 3: reading and exporting the contents of each table");
            AtomicInteger rowsCounter = new AtomicInteger(0);
            schema.tables().forEach(tableId -> {
                if (schema.isFilteredOut((TableId)tableId)) {
                    this.logger.info("\t table '{}' is filtered out, ignoring", tableId);
                    return;
                }
                long exportStart = this.clock().currentTimeInMillis();
                this.logger.info("\t exporting data from table '{}'", tableId);
                try {
                    connection.query("SELECT * FROM " + tableId, this::readTableStatement, rs -> this.readTable((TableId)tableId, rs, consumer, rowsCounter));
                    this.logger.info("\t finished exporting '{}' records for '{}'; total duration '{}'", new Object[]{rowsCounter.get(), tableId, Strings.duration((long)(this.clock().currentTimeInMillis() - exportStart))});
                    rowsCounter.set(0);
                }
                catch (SQLException e) {
                    throw new ConnectException((Throwable)e);
                }
            });
            this.logger.info("Step 4: committing transaction '{}'", (Object)txId);
            jdbcConnection.commit();
            this.logger.info("Step 5: sending the last snapshot record");
            SourceRecord currentRecord = this.currentRecord.get();
            if (currentRecord != null) {
                this.sourceInfo.markLastSnapshotRecord();
                this.currentRecord.set(new SourceRecord(currentRecord.sourcePartition(), this.sourceInfo.offset(), currentRecord.topic(), currentRecord.kafkaPartition(), currentRecord.keySchema(), currentRecord.key(), currentRecord.valueSchema(), currentRecord.value()));
                this.sendCurrentRecord(consumer);
            }
            this.sourceInfo.completeSnapshot();
            this.logger.info("Snapshot completed in '{}'", (Object)Strings.duration((long)(this.clock().currentTimeInMillis() - snapshotStart)));
        }
        catch (SQLException e) {
            try {
                if (jdbcConnection != null) {
                    jdbcConnection.rollback();
                }
            }
            catch (SQLException se) {
                this.logger.error("Cannot rollback snapshot transaction", (Throwable)se);
            }
            throw new ConnectException((Throwable)e);
        }
    }

    private Statement readTableStatement(Connection conn) throws SQLException {
        int rowsFetchSize = this.taskContext.config().rowsFetchSize();
        Statement statement = conn.createStatement();
        statement.setFetchSize(rowsFetchSize);
        return statement;
    }

    private void readTable(TableId tableId, ResultSet rs, Consumer<SourceRecord> consumer, AtomicInteger rowsCounter) throws SQLException {
        Table table = this.schema().tableFor(tableId);
        assert (table != null);
        int numColumns = table.columns().size();
        Object[] row = new Object[numColumns];
        ResultSetMetaData metaData = rs.getMetaData();
        while (rs.next()) {
            rowsCounter.incrementAndGet();
            this.sendCurrentRecord(consumer);
            int i = 0;
            int j = 1;
            while (i != numColumns) {
                row[i] = this.valueForColumn(rs, j, metaData);
                ++i;
                ++j;
            }
            this.generateReadRecord(tableId, row);
        }
    }

    private Object valueForColumn(ResultSet rs, int colIdx, ResultSetMetaData metaData) throws SQLException {
        try {
            String columnTypeName = metaData.getColumnTypeName(colIdx);
            int colOid = PgOid.valueOf((String)columnTypeName);
            switch (colOid) {
                case 790: {
                    return new PGmoney((String)rs.getString((int)colIdx)).val;
                }
                case 1560: {
                    return rs.getString(colIdx);
                }
            }
            return rs.getObject(colIdx);
        }
        catch (SQLException e) {
            return rs.getObject(colIdx);
        }
    }

    protected void generateReadRecord(TableId tableId, Object[] rowData) {
        if (rowData.length == 0) {
            return;
        }
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        Object key = tableSchema.keyFromColumnData(rowData);
        Struct value = tableSchema.valueFromColumnData(rowData);
        if (key == null || value == null) {
            return;
        }
        Schema keySchema = tableSchema.keySchema();
        this.sourceInfo.update(this.clock().currentTimeInMicros());
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor(tableId);
        Envelope envelope = this.createEnvelope(tableSchema, topicName);
        this.currentRecord.set(new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(), (Object)envelope.read((Object)value, this.sourceInfo.source(), Long.valueOf(this.clock().currentTimeInMillis()))));
    }

    private void sendCurrentRecord(Consumer<SourceRecord> consumer) {
        SourceRecord record = this.currentRecord.get();
        if (record == null) {
            return;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending read event '{}'", (Object)record);
        }
        consumer.accept(record);
    }
}

