/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils;

import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.time.Conversions;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresTypeUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresUtils {
    private static final Logger log = LoggerFactory.getLogger(PostgresUtils.class);
    private static final int DEFAULT_FETCH_SIZE = 1024;
    private static final JdbcDialect JDBC_DIALECT = new PostgresDialect();

    private PostgresUtils() {
    }

    public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName, Column column) throws SQLException {
        columnName = PostgresUtils.quote(columnName);
        if (column != null) {
            columnName = JDBC_DIALECT.convertType(columnName, column.typeName());
        }
        String minMaxQuery = String.format("SELECT MIN(%s), MAX(%s) FROM %s", columnName, columnName, PostgresUtils.quote(tableId));
        return (Object[])jdbc.queryAndMap(minMaxQuery, rs -> {
            if (!rs.next()) {
                throw new SQLException(String.format("No result returned after running query [%s]", minMaxQuery));
            }
            return SourceRecordUtils.rowToArray((ResultSet)rs, (int)2);
        });
    }

    public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException {
        String rowCountQuery = String.format("SELECT reltuples FROM pg_class r WHERE relkind = 'r' AND relname = '%s';", tableId.table());
        return (Long)jdbc.queryAndMap(rowCountQuery, rs -> {
            if (!rs.next()) {
                throw new SQLException(String.format("No result returned after running query [%s]", rowCountQuery));
            }
            return rs.getLong(1);
        });
    }

    public static Object queryMin(JdbcConnection jdbc, TableId tableId, String columnName, Column column, Object excludedLowerBound) throws SQLException {
        columnName = PostgresUtils.quote(columnName);
        if (column != null) {
            columnName = JDBC_DIALECT.convertType(columnName, column.typeName());
        }
        String minQuery = String.format("SELECT MIN(%s) FROM %s WHERE %s > ?", columnName, PostgresUtils.quote(tableId), columnName);
        return jdbc.prepareQueryAndMap(minQuery, ps -> ps.setObject(1, excludedLowerBound), rs -> {
            if (!rs.next()) {
                throw new SQLException(String.format("No result returned after running query [%s]", minQuery));
            }
            return rs.getObject(1);
        });
    }

    public static Object[] sampleDataFromColumn(JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) throws SQLException {
        String minQuery = String.format("SELECT %s FROM %s WHERE MOD((%s - (SELECT MIN(%s) FROM %s)), %s) = 0 ORDER BY %s", PostgresUtils.quote(columnName), PostgresUtils.quote(tableId), PostgresUtils.quote(columnName), PostgresUtils.quote(columnName), PostgresUtils.quote(tableId), inverseSamplingRate, PostgresUtils.quote(columnName));
        return (Object[])jdbc.queryAndMap(minQuery, resultSet -> {
            ArrayList<Object> results = new ArrayList<Object>();
            while (resultSet.next()) {
                results.add(resultSet.getObject(1));
            }
            return results.toArray();
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Object[] skipReadAndSortSampleData(JdbcConnection jdbc, TableId tableId, String columnName, Column column, int inverseSamplingRate) throws Exception {
        columnName = PostgresUtils.quote(columnName);
        if (column != null) {
            columnName = JDBC_DIALECT.convertType(columnName, column.typeName());
        }
        String sampleQuery = String.format("SELECT %s FROM %s", columnName, PostgresUtils.quote(tableId));
        Statement stmt = null;
        ResultSet rs = null;
        ArrayList<Object> results = new ArrayList<Object>();
        try {
            stmt = jdbc.connection().createStatement(1003, 1007);
            stmt.setFetchSize(1024);
            rs = stmt.executeQuery(sampleQuery);
            int count = 0;
            while (rs.next()) {
                if (++count % 100000 == 0) {
                    log.info("Processing row index: {}", (Object)count);
                }
                if (Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException("Thread interrupted");
                }
                if (count % inverseSamplingRate != 0) continue;
                results.add(rs.getObject(1));
            }
        }
        finally {
            if (rs != null) {
                try {
                    rs.close();
                }
                catch (SQLException e) {
                    log.error("Failed to close ResultSet", (Throwable)e);
                }
            }
            if (stmt != null) {
                try {
                    stmt.close();
                }
                catch (SQLException e) {
                    log.error("Failed to close Statement", (Throwable)e);
                }
            }
        }
        Object[] resultsArray = results.toArray();
        Arrays.sort(resultsArray);
        return resultsArray;
    }

    public static Object queryNextChunkMax(JdbcConnection jdbc, TableId tableId, String splitColumnName, Column splitColumn, int chunkSize, Object includedLowerBound) throws SQLException {
        String quotedColumn = PostgresUtils.quote(splitColumnName);
        if (splitColumn != null) {
            quotedColumn = JDBC_DIALECT.convertType(quotedColumn, splitColumn.typeName());
        }
        String query = String.format("SELECT MAX(%s) FROM (SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s) AS T", quotedColumn, quotedColumn, PostgresUtils.quote(tableId), quotedColumn, quotedColumn, chunkSize);
        return jdbc.prepareQueryAndMap(query, ps -> ps.setObject(1, includedLowerBound), rs -> {
            if (!rs.next()) {
                throw new SQLException(String.format("No result returned after running query [%s]", query));
            }
            return rs.getObject(1);
        });
    }

    public static SeaTunnelRowType getSplitType(Table table) {
        List primaryKeys = table.primaryKeyColumns();
        if (primaryKeys.isEmpty()) {
            throw new SeaTunnelException(String.format("Incremental snapshot for tables requires primary key, but table %s doesn't have primary key.", table.id()));
        }
        return PostgresUtils.getSplitType((Column)primaryKeys.get(0));
    }

    public static SeaTunnelRowType getSplitType(Column splitColumn) {
        return new SeaTunnelRowType(new String[]{splitColumn.name()}, new SeaTunnelDataType[]{PostgresTypeUtils.convertFromColumn(splitColumn)});
    }

    public static Offset getLsnPosition(SourceRecord record) {
        return PostgresUtils.getLsnPosition(record.sourceOffset());
    }

    public static LsnOffset getLsnPosition(Map<String, ?> offset) {
        HashMap<String, String> offsetStrMap = new HashMap<String, String>();
        for (Map.Entry<String, ?> entry : offset.entrySet()) {
            offsetStrMap.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
        }
        return new LsnOffset(offsetStrMap);
    }

    public static LsnOffset currentLsn(PostgresConnection jdbcConnection) {
        Long txId;
        Long lsn;
        try {
            lsn = jdbcConnection.currentXLogLocation();
            txId = jdbcConnection.currentTransactionId();
            log.trace("Read xlogStart at '{}' from transaction '{}'", (Object)Lsn.valueOf(lsn), (Object)txId);
        }
        catch (SQLException e) {
            throw new SeaTunnelException("Error getting current Lsn/txId " + e.getMessage(), (Throwable)e);
        }
        try {
            jdbcConnection.commit();
        }
        catch (SQLException e) {
            throw new SeaTunnelException("JDBC connection fails to commit: " + e.getMessage(), (Throwable)e);
        }
        HashMap<String, String> offsetMap = new HashMap<String, String>();
        offsetMap.put("lsn", lsn.toString());
        if (txId != null) {
            offsetMap.put("txId", txId.toString());
        }
        offsetMap.put("ts_usec", String.valueOf(Conversions.toEpochMicros((Instant)Instant.MIN)));
        return LsnOffset.of(offsetMap);
    }

    public static String buildSplitScanQuery(Table table, SeaTunnelRowType rowType, boolean isFirstSplit, boolean isLastSplit) {
        return PostgresUtils.buildSplitQuery(table, rowType, isFirstSplit, isLastSplit, -1, true);
    }

    public static PreparedStatement readTableSplitDataStatement(JdbcConnection jdbc, String sql, boolean isFirstSplit, boolean isLastSplit, Object[] splitStart, Object[] splitEnd, SeaTunnelRowType splitKeyType, int fetchSize) {
        try {
            PreparedStatement statement = PostgresUtils.initStatement(jdbc, sql, fetchSize);
            if (isFirstSplit && isLastSplit) {
                return statement;
            }
            int primaryKeyNum = splitKeyType.getTotalFields();
            if (isFirstSplit) {
                for (int i = 0; i < primaryKeyNum; ++i) {
                    statement.setObject(i + 1, splitEnd[i]);
                    statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
                }
            } else if (isLastSplit) {
                for (int i = 0; i < primaryKeyNum; ++i) {
                    statement.setObject(i + 1, splitStart[i]);
                }
            } else {
                for (int i = 0; i < primaryKeyNum; ++i) {
                    statement.setObject(i + 1, splitStart[i]);
                    statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
                    statement.setObject(i + 1 + 2 * primaryKeyNum, splitEnd[i]);
                }
            }
            return statement;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to build the split data read statement.", e);
        }
    }

    private static String getPrimaryKeyColumnsProjection(SeaTunnelRowType rowType) {
        StringBuilder sql = new StringBuilder();
        Iterator fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator();
        while (fieldNamesIt.hasNext()) {
            sql.append((String)fieldNamesIt.next());
            if (!fieldNamesIt.hasNext()) continue;
            sql.append(" , ");
        }
        return sql.toString();
    }

    private static String buildSplitQuery(Table table, SeaTunnelRowType rowType, boolean isFirstSplit, boolean isLastSplit, int limitSize, boolean isScanningData) {
        StringBuilder sql;
        String condition;
        if (isFirstSplit && isLastSplit) {
            condition = null;
        } else if (isFirstSplit) {
            sql = new StringBuilder();
            PostgresUtils.addPrimaryKeyColumnsToCondition(table, rowType, sql, " <= ?");
            if (isScanningData) {
                sql.append(" AND NOT (");
                PostgresUtils.addPrimaryKeyColumnsToCondition(table, rowType, sql, " = ?");
                sql.append(")");
            }
            condition = sql.toString();
        } else if (isLastSplit) {
            sql = new StringBuilder();
            PostgresUtils.addPrimaryKeyColumnsToCondition(table, rowType, sql, " >= ?");
            condition = sql.toString();
        } else {
            sql = new StringBuilder();
            PostgresUtils.addPrimaryKeyColumnsToCondition(table, rowType, sql, " >= ?");
            if (isScanningData) {
                sql.append(" AND NOT (");
                PostgresUtils.addPrimaryKeyColumnsToCondition(table, rowType, sql, " = ?");
                sql.append(")");
            }
            sql.append(" AND ");
            PostgresUtils.addPrimaryKeyColumnsToCondition(table, rowType, sql, " <= ?");
            condition = sql.toString();
        }
        if (isScanningData) {
            return PostgresUtils.buildSelectWithRowLimits(table.id(), limitSize, "*", Optional.ofNullable(condition), Optional.empty());
        }
        String orderBy = String.join((CharSequence)", ", rowType.getFieldNames());
        return PostgresUtils.buildSelectWithBoundaryRowLimits(table.id(), limitSize, PostgresUtils.getPrimaryKeyColumnsProjection(rowType), PostgresUtils.getMaxPrimaryKeyColumnsProjection(rowType), Optional.ofNullable(condition), orderBy);
    }

    private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, int fetchSize) throws SQLException {
        Connection connection = jdbc.connection();
        connection.setAutoCommit(false);
        PreparedStatement statement = connection.prepareStatement(sql);
        statement.setFetchSize(fetchSize);
        return statement;
    }

    private static String getMaxPrimaryKeyColumnsProjection(SeaTunnelRowType rowType) {
        StringBuilder sql = new StringBuilder();
        Iterator fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator();
        while (fieldNamesIt.hasNext()) {
            sql.append("MAX(" + (String)fieldNamesIt.next() + ")");
            if (!fieldNamesIt.hasNext()) continue;
            sql.append(" , ");
        }
        return sql.toString();
    }

    private static String buildSelectWithRowLimits(TableId tableId, int limit, String projection, Optional<String> condition, Optional<String> orderBy) {
        StringBuilder sql = new StringBuilder("SELECT ");
        if (limit > 0) {
            sql.append(" TOP( ").append(limit).append(") ");
        }
        sql.append(projection).append(" FROM ");
        sql.append(PostgresUtils.quoteSchemaAndTable(tableId));
        if (condition.isPresent()) {
            sql.append(" WHERE ").append(condition.get());
        }
        if (orderBy.isPresent()) {
            sql.append(" ORDER BY ").append(orderBy.get());
        }
        return sql.toString();
    }

    private static String quoteSchemaAndTable(TableId tableId) {
        StringBuilder quoted = new StringBuilder();
        if (tableId.schema() != null && !tableId.schema().isEmpty()) {
            quoted.append(PostgresUtils.quote(tableId.schema())).append(".");
        }
        quoted.append(PostgresUtils.quote(tableId.table()));
        return quoted.toString();
    }

    public static String quote(String dbOrTableName) {
        return "\"" + dbOrTableName + "\"";
    }

    public static String quote(TableId tableId) {
        return "\"" + tableId.schema() + "\".\"" + tableId.table() + "\"";
    }

    private static void addPrimaryKeyColumnsToCondition(Table table, SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
        for (int i = 0; i < rowType.getTotalFields(); ++i) {
            String fieldName = PostgresUtils.quote(rowType.getFieldName(i));
            fieldName = JDBC_DIALECT.convertType(fieldName, table.columnWithName(rowType.getFieldName(i)).typeName());
            sql.append(fieldName).append(predicate);
            if (i >= rowType.getTotalFields() - 1) continue;
            sql.append(" AND ");
        }
    }

    private static String buildSelectWithBoundaryRowLimits(TableId tableId, int limit, String projection, String maxColumnProjection, Optional<String> condition, String orderBy) {
        StringBuilder sql = new StringBuilder("SELECT ");
        sql.append(maxColumnProjection);
        sql.append(" FROM (");
        sql.append("SELECT ");
        sql.append(" TOP( ").append(limit).append(") ");
        sql.append(projection);
        sql.append(" FROM ");
        sql.append(PostgresUtils.quoteSchemaAndTable(tableId));
        if (condition.isPresent()) {
            sql.append(" WHERE ").append(condition.get());
        }
        sql.append(" ORDER BY ").append(orderBy);
        sql.append(") T");
        return sql.toString();
    }
}

