/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.CopyManagerBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcOutputFormatBuilder {
    private static final Logger log = LoggerFactory.getLogger(JdbcOutputFormatBuilder.class);
    @NonNull
    private final JdbcDialect dialect;
    @NonNull
    private final JdbcConnectionProvider connectionProvider;
    @NonNull
    private final JdbcSinkConfig jdbcSinkConfig;
    @NonNull
    private final TableSchema tableSchema;
    @Nullable
    private final TableSchema databaseTableSchema;

    public JdbcOutputFormat build() {
        String database = this.jdbcSinkConfig.getDatabase();
        String table = this.jdbcSinkConfig.getTable();
        List<String> primaryKeys = this.jdbcSinkConfig.getPrimaryKeys();
        JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory = this.jdbcSinkConfig.isUseCopyStatement() ? () -> JdbcOutputFormatBuilder.createCopyInBufferStatementExecutor(JdbcOutputFormatBuilder.createCopyInBatchStatementExecutor(this.dialect, table, this.tableSchema)) : (StringUtils.isNotBlank(this.jdbcSinkConfig.getSimpleSql()) ? () -> JdbcOutputFormatBuilder.createSimpleBufferedExecutor(this.jdbcSinkConfig.getSimpleSql(), this.tableSchema, this.databaseTableSchema, this.dialect.getRowConverter()) : (primaryKeys == null || primaryKeys.isEmpty() ? () -> JdbcOutputFormatBuilder.createSimpleBufferedExecutor(this.dialect, database, table, this.tableSchema, this.databaseTableSchema) : () -> JdbcOutputFormatBuilder.createUpsertBufferedExecutor(this.dialect, database, table, this.tableSchema, this.databaseTableSchema, primaryKeys.toArray(new String[0]), this.jdbcSinkConfig.isEnableUpsert(), this.jdbcSinkConfig.isPrimaryKeyUpdated(), this.jdbcSinkConfig.isSupportUpsertByInsertOnly())));
        return new JdbcOutputFormat(this.connectionProvider, this.jdbcSinkConfig.getJdbcConnectionConfig(), statementExecutorFactory);
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema, TableSchema databaseTableSchema) {
        String insertSQL = dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames());
        return JdbcOutputFormatBuilder.createSimpleBufferedExecutor(insertSQL, tableSchema, databaseTableSchema, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(String sql, TableSchema tableSchema, TableSchema databaseTableSchema, JdbcRowConverter rowConverter) {
        JdbcBatchStatementExecutor<SeaTunnelRow> simpleRowExecutor = JdbcOutputFormatBuilder.createSimpleExecutor(sql, tableSchema, databaseTableSchema, rowConverter);
        return new BufferedBatchStatementExecutor(simpleRowExecutor, Function.identity());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertBufferedExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema, TableSchema databaseTableSchema, String[] pkNames, boolean enableUpsert, boolean isPrimaryKeyUpdated, boolean supportUpsertByInsertOnly) {
        int[] pkFields = Arrays.stream(pkNames).mapToInt(arg_0 -> ((SeaTunnelRowType)tableSchema.toPhysicalRowDataType()).indexOf(arg_0)).toArray();
        TableSchema pkSchema = TableSchema.builder().columns(Arrays.stream(pkFields).mapToObj(tableSchema.getColumns()::get).collect(Collectors.toList())).build();
        Function<SeaTunnelRow, SeaTunnelRow> keyExtractor = JdbcOutputFormatBuilder.createKeyExtractor(pkFields);
        JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor = JdbcOutputFormatBuilder.createDeleteExecutor(dialect, database, table, pkNames, pkSchema, databaseTableSchema);
        JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor = JdbcOutputFormatBuilder.createUpsertExecutor(dialect, database, table, tableSchema, databaseTableSchema, pkNames, pkSchema, keyExtractor, enableUpsert, isPrimaryKeyUpdated, supportUpsertByInsertOnly);
        return new BufferReducedBatchStatementExecutor(upsertExecutor, deleteExecutor, keyExtractor, Function.identity());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema, TableSchema databaseTableSchema, String[] pkNames, TableSchema pkTableSchema, Function<SeaTunnelRow, SeaTunnelRow> keyExtractor, boolean enableUpsert, boolean isPrimaryKeyUpdated, boolean supportUpsertByInsertOnly) {
        if (supportUpsertByInsertOnly) {
            return JdbcOutputFormatBuilder.createInsertOnlyExecutor(dialect, database, table, tableSchema, databaseTableSchema);
        }
        if (enableUpsert) {
            Optional<String> upsertSQL = dialect.getUpsertStatement(database, table, tableSchema.getFieldNames(), pkNames);
            if (upsertSQL.isPresent()) {
                return JdbcOutputFormatBuilder.createSimpleExecutor(upsertSQL.get(), tableSchema, databaseTableSchema, dialect.getRowConverter());
            }
            return JdbcOutputFormatBuilder.createInsertOrUpdateByQueryExecutor(dialect, database, table, tableSchema, databaseTableSchema, pkNames, pkTableSchema, keyExtractor, isPrimaryKeyUpdated);
        }
        return JdbcOutputFormatBuilder.createInsertOrUpdateExecutor(dialect, database, table, tableSchema, databaseTableSchema, pkNames, isPrimaryKeyUpdated);
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createCopyInBufferStatementExecutor(CopyManagerBatchStatementExecutor copyManagerBatchStatementExecutor) {
        return new BufferedBatchStatementExecutor(copyManagerBatchStatementExecutor, Function.identity());
    }

    private static CopyManagerBatchStatementExecutor createCopyInBatchStatementExecutor(JdbcDialect dialect, String table, TableSchema tableSchema) {
        String columns = Arrays.stream(tableSchema.getFieldNames()).map(dialect::quoteIdentifier).collect(Collectors.joining(",", "(", ")"));
        String copyInSql = String.format("COPY %s %s FROM STDIN WITH CSV", table, columns);
        return new CopyManagerBatchStatementExecutor(copyInSql, tableSchema);
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOnlyExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema, TableSchema databaseTableSchema) {
        return new SimpleBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames()), tableSchema.getFieldNames()), tableSchema, databaseTableSchema, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOrUpdateExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema, TableSchema databaseTableSchema, String[] pkNames, boolean isPrimaryKeyUpdated) {
        return new InsertOrUpdateBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames()), tableSchema.getFieldNames()), connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getUpdateStatement(database, table, tableSchema.getFieldNames(), pkNames, isPrimaryKeyUpdated), tableSchema.getFieldNames()), tableSchema, databaseTableSchema, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOrUpdateByQueryExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema, TableSchema databaseTableSchema, String[] pkNames, TableSchema pkTableSchema, Function<SeaTunnelRow, SeaTunnelRow> keyExtractor, boolean isPrimaryKeyUpdated) {
        return new InsertOrUpdateBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getRowExistsStatement(database, table, pkNames), pkNames), connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames()), tableSchema.getFieldNames()), connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getUpdateStatement(database, table, tableSchema.getFieldNames(), pkNames, isPrimaryKeyUpdated), tableSchema.getFieldNames()), pkTableSchema, keyExtractor, tableSchema, databaseTableSchema, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createDeleteExecutor(JdbcDialect dialect, String database, String table, String[] pkNames, TableSchema pkTableSchema, TableSchema databaseTableSchema) {
        String deleteSQL = dialect.getDeleteStatement(database, table, pkNames);
        return JdbcOutputFormatBuilder.createSimpleExecutor(deleteSQL, pkTableSchema, databaseTableSchema, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(String sql, TableSchema tableSchema, TableSchema databaseTableSchema, JdbcRowConverter rowConverter) {
        return new SimpleBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, sql, tableSchema.getFieldNames()), tableSchema, databaseTableSchema, rowConverter);
    }

    static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
        return row -> {
            Object[] fields = new Object[pkFields.length];
            for (int i = 0; i < pkFields.length; ++i) {
                fields[i] = row.getField(pkFields[i]);
            }
            SeaTunnelRow newRow = new SeaTunnelRow(fields);
            newRow.setTableId(row.getTableId());
            return newRow;
        };
    }

    public JdbcOutputFormatBuilder(@NonNull JdbcDialect dialect, @NonNull JdbcConnectionProvider connectionProvider, @NonNull JdbcSinkConfig jdbcSinkConfig, @NonNull TableSchema tableSchema, @Nullable TableSchema databaseTableSchema) {
        if (dialect == null) {
            throw new NullPointerException("dialect is marked non-null but is null");
        }
        if (connectionProvider == null) {
            throw new NullPointerException("connectionProvider is marked non-null but is null");
        }
        if (jdbcSinkConfig == null) {
            throw new NullPointerException("jdbcSinkConfig is marked non-null but is null");
        }
        if (tableSchema == null) {
            throw new NullPointerException("tableSchema is marked non-null but is null");
        }
        this.dialect = dialect;
        this.connectionProvider = connectionProvider;
        this.jdbcSinkConfig = jdbcSinkConfig;
        this.tableSchema = tableSchema;
        this.databaseTableSchema = databaseTableSchema;
    }
}

