/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.databend.sink;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorException;
import org.apache.seatunnel.connectors.seatunnel.databend.schema.SchemaChangeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatabendSinkWriter
implements SinkWriter<SeaTunnelRow, Void, Void>,
SupportSchemaEvolutionSinkWriter {
    private static final Logger log = LoggerFactory.getLogger(DatabendSinkWriter.class);
    private final Connection connection;
    private final SinkWriter.Context context;
    private final CatalogTable catalogTable;
    private String insertSql;
    private final int batchSize;
    private final int executeTimeoutSec;
    private TableSchema tableSchema;
    private final TablePath sinkTablePath;
    protected TableSchemaChangeEventDispatcher tableSchemaChanger = new TableSchemaChangeEventDispatcher();
    private SchemaChangeManager schemaChangeManager;
    private PreparedStatement preparedStatement;
    private int batchCount = 0;
    private DatabendSinkConfig databendSinkConfig;

    public DatabendSinkWriter(SinkWriter.Context context, Connection connection, CatalogTable catalogTable, DatabendSinkConfig databendSinkConfig, String customSql, String database, String table, int batchSize, int executeTimeoutSec) {
        this.context = context;
        this.connection = connection;
        this.catalogTable = catalogTable;
        this.databendSinkConfig = databendSinkConfig;
        this.batchSize = batchSize;
        this.executeTimeoutSec = executeTimeoutSec;
        this.tableSchema = catalogTable.getTableSchema();
        this.sinkTablePath = TablePath.of((String)database, (String)table);
        log.info("DatabendSinkWriter constructor - catalogTable: {}", (Object)catalogTable);
        log.info("DatabendSinkWriter constructor - tableSchema: {}", (Object)this.tableSchema);
        log.info("DatabendSinkWriter constructor - rowType: {}", (Object)catalogTable.getSeaTunnelRowType());
        log.info("DatabendSinkWriter constructor - target table path: {}", (Object)this.sinkTablePath);
        if (customSql != null && !customSql.isEmpty()) {
            this.insertSql = customSql;
            log.info("Using custom SQL: {}", (Object)this.insertSql);
            try {
                this.schemaChangeManager = new SchemaChangeManager(databendSinkConfig);
                this.preparedStatement = connection.prepareStatement(this.insertSql);
                this.preparedStatement.setQueryTimeout(executeTimeoutSec);
                log.info("PreparedStatement created successfully with custom SQL");
            }
            catch (SQLException e) {
                throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, "Failed to prepare custom statement: " + e.getMessage(), e);
            }
        }
        SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
        if (rowType == null || rowType.getFieldNames().length == 0) {
            throw new DatabendConnectorException(DatabendConnectorErrorCode.SCHEMA_NOT_FOUND, "Source table schema is empty or null");
        }
        try {
            if (!this.tableExists(database, table)) {
                log.info("Target table {}.{} does not exist, creating with source schema", (Object)database, (Object)table);
                this.createTable(database, table, rowType);
            } else {
                log.info("Target table {}.{} exists, verifying schema", (Object)database, (Object)table);
                this.verifyTableSchema(database, table, rowType);
            }
        }
        catch (SQLException e) {
            throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, "Failed to verify/create target table: " + e.getMessage(), e);
        }
        this.insertSql = this.generateInsertSql(database, table, rowType);
        log.info("Generated insert SQL: {}", (Object)this.insertSql);
        try {
            this.schemaChangeManager = new SchemaChangeManager(databendSinkConfig);
            this.preparedStatement = connection.prepareStatement(this.insertSql);
            this.preparedStatement.setQueryTimeout(executeTimeoutSec);
            log.info("PreparedStatement created successfully");
        }
        catch (SQLException e) {
            throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, "Failed to prepare statement: " + e.getMessage(), e);
        }
    }

    public void applySchemaChange(SchemaChangeEvent event) {
        try {
            this.executeBatch();
            this.tableSchema = this.tableSchemaChanger.reset(this.tableSchema).apply(event);
            this.schemaChangeManager.applySchemaChange(this.sinkTablePath, event);
            if (this.preparedStatement != null) {
                try {
                    this.preparedStatement.close();
                }
                catch (SQLException e) {
                    log.warn("Failed to close PreparedStatement during schema change", e);
                }
                finally {
                    this.preparedStatement = null;
                }
            }
            this.insertSql = this.generateInsertSql(this.catalogTable, this.tableSchema);
            this.batchCount = 0;
            log.info("Schema change applied successfully for table {}", (Object)this.sinkTablePath.getFullName());
        }
        catch (Exception e) {
            throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, "Failed to apply schema change: " + e.getMessage(), e);
        }
    }

    private String generateInsertSql(CatalogTable catalogTable, TableSchema tableSchema) {
        String tableName = catalogTable.getTablePath().getFullName();
        List columnNames = tableSchema.getColumns().stream().map(column -> "`" + column.getName() + "`").collect(Collectors.toList());
        String placeholders = String.join((CharSequence)", ", Collections.nCopies(columnNames.size(), "?"));
        return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, String.join((CharSequence)", ", columnNames), placeholders);
    }

    public void write(SeaTunnelRow row) {
        try {
            log.info("Writing row: {}", (Object)row);
            if (row == null || row.getFields() == null || row.getFields().length == 0) {
                log.warn("Received empty row data, skipping");
                return;
            }
            if (this.preparedStatement == null) {
                log.info("PreparedStatement is null, initializing...");
                this.initializePreparedStatement(row);
                log.info("PreparedStatement initialized successfully");
            }
            boolean allFieldsNull = true;
            for (Object field : row.getFields()) {
                if (field == null) continue;
                allFieldsNull = false;
                break;
            }
            if (allFieldsNull) {
                log.warn("All fields in row are null, skipping");
                return;
            }
            this.processRow(row);
            ++this.batchCount;
            log.info("Batch count after adding row: {}", (Object)this.batchCount);
            if (this.batchCount >= this.batchSize) {
                log.info("Batch size {} reached, executing batch", (Object)this.batchSize);
                this.executeBatch();
                log.info("Batch executed successfully");
            }
        }
        catch (Exception e) {
            log.error("Failed to write row: {}", (Object)row, (Object)e);
            try {
                if (this.batchCount > 0) {
                    log.info("Attempting to execute remaining batch after error");
                    this.executeBatch();
                }
            }
            catch (Exception ex) {
                log.error("Failed to execute remaining batch after error", ex);
            }
            throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, "Failed to write data to Databend: " + e.getMessage(), e);
        }
    }

    private void initializePreparedStatement(SeaTunnelRow row) throws SQLException {
        log.info("Initializing PreparedStatement based on row data");
        String database = this.sinkTablePath.getDatabaseName();
        String table = this.sinkTablePath.getTableName();
        log.info("Querying target table schema for {}.{}", (Object)database, (Object)table);
        SeaTunnelRowType actualTableSchema = this.queryTableSchema(database, table);
        if (actualTableSchema != null) {
            log.info("Using actual table schema: {}", (Object)actualTableSchema);
            this.insertSql = this.generateInsertSql(database, table, actualTableSchema);
        } else {
            log.warn("Could not query table schema, using inferred schema from data");
            SeaTunnelRowType inferredRowType = this.inferRowTypeFromRow(row);
            log.info("Inferred row type from data: {}", (Object)inferredRowType);
            this.insertSql = this.generateInsertSql(database, table, inferredRowType);
        }
        log.info("Generated insert SQL from schema: {}", (Object)this.insertSql);
        this.preparedStatement = this.connection.prepareStatement(this.insertSql);
        this.preparedStatement.setQueryTimeout(this.executeTimeoutSec);
        log.info("PreparedStatement initialized successfully");
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private SeaTunnelRowType queryTableSchema(String database, String table) {
        try {
            this.connection.createStatement().execute("USE " + database);
            String describeSQL = String.format("DESCRIBE %s.%s", database, table);
            log.info("Executing describe table SQL: {}", (Object)describeSQL);
            try (PreparedStatement stmt = this.connection.prepareStatement(describeSQL);
                 ResultSet rs = stmt.executeQuery();){
                ArrayList<String> fieldNames = new ArrayList<String>();
                ArrayList fieldTypes = new ArrayList();
                while (rs.next()) {
                    String columnName = rs.getString("Field");
                    String columnType = rs.getString("Type");
                    fieldNames.add(columnName);
                    fieldTypes.add(this.convertDatabendTypeNameToSeaTunnelType(columnType));
                    log.info("Found column: {} {}", (Object)columnName, (Object)columnType);
                }
                if (fieldNames.isEmpty()) return null;
                SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames.toArray(new String[0]), fieldTypes.toArray(new SeaTunnelDataType[0]));
                return seaTunnelRowType;
            }
        }
        catch (Exception e) {
            log.warn("Failed to query table schema: {}", (Object)e.getMessage());
        }
        return null;
    }

    private SeaTunnelDataType<?> convertDatabendTypeNameToSeaTunnelType(String typeName) {
        if (typeName == null) {
            return BasicType.STRING_TYPE;
        }
        if ((typeName = typeName.toUpperCase()).contains("VARCHAR") || typeName.contains("STRING") || typeName.contains("TEXT")) {
            return BasicType.STRING_TYPE;
        }
        if (typeName.contains("INT") && !typeName.contains("BIGINT")) {
            return BasicType.INT_TYPE;
        }
        if (typeName.contains("BIGINT")) {
            return BasicType.LONG_TYPE;
        }
        if (typeName.contains("DOUBLE") || typeName.contains("FLOAT64")) {
            return BasicType.DOUBLE_TYPE;
        }
        if (typeName.contains("FLOAT") || typeName.contains("FLOAT32")) {
            return BasicType.FLOAT_TYPE;
        }
        if (typeName.contains("BOOLEAN")) {
            return BasicType.BOOLEAN_TYPE;
        }
        return BasicType.STRING_TYPE;
    }

    private SeaTunnelRowType inferRowTypeFromRow(SeaTunnelRow row) {
        Object[] fields = row.getFields();
        String[] fieldNames = new String[fields.length];
        SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[fields.length];
        if (this.catalogTable != null && this.catalogTable.getSeaTunnelRowType() != null) {
            String[] sourceFieldNames = this.catalogTable.getSeaTunnelRowType().getFieldNames();
            if (sourceFieldNames.length == fields.length) {
                fieldNames = sourceFieldNames;
            } else {
                log.warn("Source table field count ({}) doesn't match row field count ({}), using default column names", (Object)sourceFieldNames.length, (Object)fields.length);
                for (int i = 0; i < fields.length; ++i) {
                    fieldNames[i] = "column_" + (i + 1);
                }
            }
        } else {
            log.warn("No source table schema available, can't get column names");
            throw new DatabendConnectorException(DatabendConnectorErrorCode.SCHEMA_NOT_FOUND, "Source table schema is empty or null, cannot infer row type");
        }
        for (int i = 0; i < fields.length; ++i) {
            Object field = fields[i];
            fieldTypes[i] = field == null ? BasicType.STRING_TYPE : (field instanceof String ? BasicType.STRING_TYPE : (field instanceof Integer ? BasicType.INT_TYPE : (field instanceof Long ? BasicType.LONG_TYPE : (field instanceof Double ? BasicType.DOUBLE_TYPE : (field instanceof Float ? BasicType.FLOAT_TYPE : (field instanceof Boolean ? BasicType.BOOLEAN_TYPE : BasicType.STRING_TYPE))))));
        }
        return new SeaTunnelRowType(fieldNames, fieldTypes);
    }

    private void processRow(SeaTunnelRow row) throws SQLException {
        log.info("Processing row with {} fields", (Object)row.getFields().length);
        for (int i = 0; i < row.getFields().length; ++i) {
            Object field = row.getFields()[i];
            if (field == null) {
                log.warn("Field {} is null, setting to NULL in prepared statement", (Object)(i + 1));
                this.preparedStatement.setNull(i + 1, 12);
                continue;
            }
            log.info("Setting parameter {}: {} ({})", i + 1, field, field.getClass().getSimpleName());
            this.preparedStatement.setObject(i + 1, field);
        }
        this.preparedStatement.addBatch();
        log.info("Added row to batch, current batch count: {}", (Object)(this.batchCount + 1));
    }

    private void executeBatch() {
        if (this.batchCount > 0) {
            try {
                log.info("Executing batch of {} records", (Object)this.batchCount);
                int[] results = this.preparedStatement.executeBatch();
                int totalAffected = 0;
                for (int result : results) {
                    totalAffected += result;
                }
                log.info("Batch executed successfully, total affected rows: {}", (Object)totalAffected);
                this.batchCount = 0;
            }
            catch (SQLException e) {
                log.error("Failed to execute batch", e);
                throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, "Failed to execute batch: " + e.getMessage(), e);
            }
        } else {
            log.debug("No rows in batch to execute");
        }
    }

    public Optional<Void> prepareCommit() throws IOException {
        log.info("Preparing to commit, executing remaining batch");
        this.executeBatch();
        log.info("Commit prepared successfully");
        return Optional.empty();
    }

    public void abortPrepare() {
        try {
            if (this.connection != null && !this.connection.getAutoCommit()) {
                log.info("Aborting prepared transaction");
                this.connection.rollback();
            }
            this.batchCount = 0;
        }
        catch (SQLException e) {
            throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, "Failed to abort transaction: " + e.getMessage(), e);
        }
    }

    private String generateInsertSql(String database, String table, SeaTunnelRowType rowType) {
        String tableName = database + "." + table;
        String[] fieldNames = rowType.getFieldNames();
        ArrayList<String> columnNames = new ArrayList<String>();
        for (String fieldName : fieldNames) {
            columnNames.add("`" + fieldName + "`");
        }
        String placeholders = String.join((CharSequence)", ", Collections.nCopies(columnNames.size(), "?"));
        return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, String.join((CharSequence)", ", columnNames), placeholders);
    }

    public void close() throws IOException {
        log.info("Closing DatabendSinkWriter");
        try {
            if (this.preparedStatement != null) {
                log.info("Executing final batch before closing");
                this.executeBatch();
                log.info("Closing PreparedStatement");
                this.preparedStatement.close();
            }
            if (this.connection != null) {
                if (!this.connection.getAutoCommit()) {
                    log.info("Committing transaction");
                    this.connection.commit();
                }
                log.info("Closing connection");
                this.connection.close();
            }
            log.info("DatabendSinkWriter closed successfully");
        }
        catch (SQLException e) {
            log.error("Failed to close DatabendSinkWriter", e);
            throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, "Failed to close connection: " + e.getMessage(), e);
        }
    }

    private boolean tableExists(String database, String table) throws SQLException {
        try (ResultSet rs = this.connection.getMetaData().getTables(null, database, table, new String[]{"TABLE"});){
            boolean bl = rs.next();
            return bl;
        }
    }

    private void createTable(String database, String table, SeaTunnelRowType rowType) throws SQLException {
        StringBuilder createTableSql = new StringBuilder();
        createTableSql.append("CREATE TABLE ").append(database).append(".").append(table).append(" (");
        String[] fieldNames = rowType.getFieldNames();
        SeaTunnelDataType[] fieldTypes = rowType.getFieldTypes();
        ArrayList<String> columns = new ArrayList<String>();
        for (int i = 0; i < fieldNames.length; ++i) {
            String columnName = fieldNames[i];
            SeaTunnelDataType dataType = fieldTypes[i];
            columns.add(String.format("`%s` %s", columnName, this.convertToDatabendType(dataType)));
        }
        createTableSql.append(String.join((CharSequence)", ", columns));
        createTableSql.append(")");
        log.info("Creating table with SQL: {}", (Object)createTableSql);
        try (Statement stmt = this.connection.createStatement();){
            stmt.execute(createTableSql.toString());
        }
    }

    private void verifyTableSchema(String database, String table, SeaTunnelRowType expectedRowType) throws SQLException {
        String[] expectedFieldNames = expectedRowType.getFieldNames();
        HashMap<String, String> existingColumns = new HashMap<String, String>();
        ResultSet rs = this.connection.getMetaData().getColumns(null, database, table, null);
        Object object = null;
        try {
            while (rs.next()) {
                String columnName = rs.getString("COLUMN_NAME");
                String columnType = rs.getString("TYPE_NAME");
                existingColumns.put(columnName.toLowerCase(), columnType);
            }
        }
        catch (Throwable columnName) {
            object = columnName;
            throw columnName;
        }
        finally {
            if (rs != null) {
                if (object != null) {
                    try {
                        rs.close();
                    }
                    catch (Throwable columnName) {
                        ((Throwable)object).addSuppressed(columnName);
                    }
                } else {
                    rs.close();
                }
            }
        }
        ArrayList<String> missingColumns = new ArrayList<String>();
        for (String fieldName : expectedFieldNames) {
            if (existingColumns.containsKey(fieldName.toLowerCase())) continue;
            missingColumns.add(fieldName);
        }
        if (!missingColumns.isEmpty()) {
            log.info("Found missing columns in target table: {}", (Object)missingColumns);
            for (String columnName : missingColumns) {
                int columnIndex = Arrays.asList(expectedFieldNames).indexOf(columnName);
                SeaTunnelDataType columnType = expectedRowType.getFieldTypes()[columnIndex];
                String databendType = this.convertToDatabendType(columnType);
                String alterTableSql = String.format("ALTER TABLE %s.%s ADD COLUMN `%s` %s", database, table, columnName, databendType);
                log.info("Executing ALTER TABLE to add column: {}", (Object)alterTableSql);
                try {
                    Statement stmt = this.connection.createStatement();
                    Throwable throwable = null;
                    try {
                        stmt.execute(alterTableSql);
                        log.info("Successfully added column {} to table {}.{}", columnName, database, table);
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (stmt == null) continue;
                        if (throwable != null) {
                            try {
                                stmt.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        stmt.close();
                    }
                }
                catch (SQLException e) {
                    throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, String.format("Failed to add column %s to table %s.%s: %s", columnName, database, table, e.getMessage()), e);
                }
            }
        }
    }

    private String convertToDatabendType(SeaTunnelDataType<?> dataType) {
        switch (dataType.getSqlType()) {
            case STRING: {
                return "VARCHAR";
            }
            case BOOLEAN: {
                return "BOOLEAN";
            }
            case TINYINT: {
                return "TINYINT";
            }
            case SMALLINT: {
                return "SMALLINT";
            }
            case INT: {
                return "INT";
            }
            case BIGINT: {
                return "BIGINT";
            }
            case FLOAT: {
                return "FLOAT";
            }
            case DOUBLE: {
                return "DOUBLE";
            }
            case DECIMAL: {
                return "DECIMAL";
            }
            case BYTES: {
                return "VARBINARY";
            }
            case DATE: {
                return "DATE";
            }
            case TIME: {
                return "TIMESTAMP";
            }
            case TIMESTAMP: {
                return "TIMESTAMP";
            }
        }
        return "VARCHAR";
    }
}

