/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.record.sink.db;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

@Tags(value={"db", "jdbc", "database", "connection", "record"})
@CapabilityDescription(value="Provides a service to write records using a configured database connection.")
public class DatabaseRecordSink
extends AbstractControllerService
implements RecordSinkService {
    static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields", "Any field in the document that cannot be mapped to a column in the database is ignored");
    static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail on Unmatched Fields", "Fail on Unmatched Fields", "If the document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
    static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", "Ignore Unmatched Columns", "Any column in the database that does not have a field in the document will be assumed to not be required.  No notification will be logged");
    static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warn on Unmatched Columns", "Warn on Unmatched Columns", "Any column in the database that does not have a field in the document will be assumed to not be required.  A warning will be logged");
    static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail on Unmatched Columns", "Fail on Unmatched Columns", "A flow will fail if any column in the database that does not have a field in the document.  An error will be logged");
    static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("Database Connection Pooling Service").description("The Controller Service that is used to obtain a connection to the database for sending records.").required(true).identifiesControllerService(DBCPService.class).build();
    static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder().name("Catalog Name").description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder().name("Schema Name").description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name").description("The name of the table that the statement should affect.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder().name("Translate Field Names").description("If true, the Processor will attempt to translate field names into the appropriate column names for the table specified. If false, the field names must match the column names exactly, or the column will not be updated").allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder().name("Unmatched Field Behavior").description("If an incoming record has a field that does not map to any of the database table's columns, this property specifies how to handle the situation").allowableValues(new DescribedValue[]{IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD}).defaultValue(IGNORE_UNMATCHED_FIELD.getValue()).build();
    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder().name("Unmatched Column Behavior").description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation").allowableValues(new DescribedValue[]{IGNORE_UNMATCHED_COLUMN, WARNING_UNMATCHED_COLUMN, FAIL_UNMATCHED_COLUMN}).defaultValue(FAIL_UNMATCHED_COLUMN.getValue()).build();
    static final PropertyDescriptor QUOTED_IDENTIFIERS = new PropertyDescriptor.Builder().name("Quote Column Identifiers").description("Enabling this option will cause all column names to be quoted, allowing you to use reserved words as column names in your tables.").allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor QUOTED_TABLE_IDENTIFIER = new PropertyDescriptor.Builder().name("Quote Table Identifiers").description("Enabling this option will cause the table name to be quoted to support the use of special characters in the table name.").allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder().name("Max Wait Time").description("The maximum amount of time allowed for a running SQL statement , zero means there is no limit. Max time less than 1 second will be equal to zero.").defaultValue("0 seconds").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(DBCP_SERVICE, CATALOG_NAME, SCHEMA_NAME, TABLE_NAME, TRANSLATE_FIELD_NAMES, UNMATCHED_FIELD_BEHAVIOR, UNMATCHED_COLUMN_BEHAVIOR, QUOTED_IDENTIFIERS, QUOTED_TABLE_IDENTIFIER, QUERY_TIMEOUT);
    private volatile ConfigurationContext context;
    private volatile DBCPService dbcpService;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) {
        this.context = context;
        this.dbcpService = (DBCPService)context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    }

    public WriteResult sendData(RecordSet recordSet, Map<String, String> attributes, boolean sendZeroResults) throws IOException {
        Boolean originalAutoCommit = null;
        Connection connection = null;
        WriteResult writeResult = null;
        try {
            connection = this.dbcpService.getConnection(attributes);
            originalAutoCommit = connection.getAutoCommit();
            if (originalAutoCommit.booleanValue()) {
                try {
                    connection.setAutoCommit(false);
                }
                catch (SQLFeatureNotSupportedException sfnse) {
                    this.getLogger().debug("setAutoCommit(false) not supported by this driver");
                }
            }
            DMLSettings settings = new DMLSettings((PropertyContext)this.context);
            String catalog = this.context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue();
            String schemaName = this.context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions().getValue();
            String tableName = this.context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
            int queryTimeout = this.context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
            if (StringUtils.isEmpty((CharSequence)tableName)) {
                throw new IOException("Cannot process because Table Name is null or empty");
            }
            TableSchema tableSchema = TableSchema.from(connection, catalog, schemaName, tableName, settings.translateFieldNames);
            StringBuilder tableNameBuilder = new StringBuilder();
            if (catalog != null) {
                tableNameBuilder.append(catalog).append(".");
            }
            if (schemaName != null) {
                tableNameBuilder.append(schemaName).append(".");
            }
            tableNameBuilder.append(tableName);
            String fqTableName = tableNameBuilder.toString();
            RecordSchema recordSchema = recordSet.getSchema();
            if (recordSchema == null) {
                throw new IllegalArgumentException("No record schema specified!");
            }
            SqlAndIncludedColumns sqlHolder = this.generateInsert(recordSchema, fqTableName, tableSchema, settings);
            try (PreparedStatement ps = connection.prepareStatement(sqlHolder.getSql());){
                Record currentRecord;
                block37: {
                    try {
                        ps.setQueryTimeout(queryTimeout);
                    }
                    catch (SQLException se) {
                        if (queryTimeout <= 0) break block37;
                        throw se;
                    }
                }
                List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();
                int recordCount = 0;
                while ((currentRecord = recordSet.next()) != null) {
                    Object[] values = currentRecord.getValues();
                    List dataTypes = currentRecord.getSchema().getDataTypes();
                    if (values != null) {
                        if (fieldIndexes != null) {
                            for (i = 0; i < fieldIndexes.size(); ++i) {
                                int currentFieldIndex = fieldIndexes.get(i);
                                Object currentValue = values[currentFieldIndex];
                                DataType dataType = (DataType)dataTypes.get(currentFieldIndex);
                                int sqlType = DataTypeUtils.getSQLTypeValue((DataType)dataType);
                                ps.setObject(i + 1, currentValue, sqlType);
                            }
                        } else {
                            for (i = 0; i < values.length; ++i) {
                                Object currentValue = values[i];
                                DataType dataType = (DataType)dataTypes.get(i);
                                int sqlType = DataTypeUtils.getSQLTypeValue((DataType)dataType);
                                ps.setObject(i + 1, currentValue, sqlType);
                            }
                        }
                        ps.addBatch();
                    }
                    ++recordCount;
                }
                ps.executeBatch();
                writeResult = WriteResult.of((int)recordCount, attributes);
            }
        }
        catch (IOException ioe) {
            throw ioe;
        }
        catch (Exception e) {
            throw new IOException("Failed to write metrics using record writer: " + e.getMessage(), e);
        }
        finally {
            if (connection != null) {
                if (originalAutoCommit != null) {
                    try {
                        connection.setAutoCommit(originalAutoCommit);
                    }
                    catch (Exception e) {
                        this.getLogger().debug("Error restoring auto-commit", (Throwable)e);
                    }
                }
                try {
                    connection.close();
                }
                catch (Exception e) {
                    this.getLogger().debug("Error closing connection", (Throwable)e);
                }
            }
        }
        return writeResult;
    }

    public void migrateProperties(PropertyConfiguration config) {
        super.migrateProperties(config);
        config.renameProperty("db-record-sink-dcbp-service", DBCP_SERVICE.getName());
        config.renameProperty("db-record-sink-catalog-name", CATALOG_NAME.getName());
        config.renameProperty("db-record-sink-schema-name", SCHEMA_NAME.getName());
        config.renameProperty("db-record-sink-table-name", TABLE_NAME.getName());
        config.renameProperty("db-record-sink-translate-field-names", TRANSLATE_FIELD_NAMES.getName());
        config.renameProperty("db-record-sink-unmatched-field-behavior", UNMATCHED_FIELD_BEHAVIOR.getName());
        config.renameProperty("db-record-sink-unmatched-column-behavior", UNMATCHED_COLUMN_BEHAVIOR.getName());
        config.renameProperty("db-record-sink-quoted-identifiers", QUOTED_IDENTIFIERS.getName());
        config.renameProperty("db-record-sink-quoted-table-identifiers", QUOTED_TABLE_IDENTIFIER.getName());
        config.renameProperty("db-record-sink-query-timeout", QUERY_TIMEOUT.getName());
    }

    private static String normalizeColumnName(String colName, boolean translateColumnNames) {
        return colName == null ? null : (translateColumnNames ? colName.toUpperCase().replace("_", "") : colName);
    }

    private Set<String> getNormalizedColumnNames(RecordSchema schema, boolean translateFieldNames) {
        HashSet<String> normalizedFieldNames = new HashSet<String>();
        if (schema != null) {
            schema.getFieldNames().forEach(fieldName -> normalizedFieldNames.add(DatabaseRecordSink.normalizeColumnName(fieldName, translateFieldNames)));
        }
        return normalizedFieldNames;
    }

    private SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException, SQLException {
        Set<String> normalizedFieldNames = this.getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
        for (String requiredColName : tableSchema.getRequiredColumnNames()) {
            String normalizedColName = DatabaseRecordSink.normalizeColumnName(requiredColName, settings.translateFieldNames);
            if (normalizedFieldNames.contains(normalizedColName)) continue;
            String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
            if (settings.failUnmappedColumns) {
                this.getLogger().error(missingColMessage);
                throw new IllegalArgumentException(missingColMessage);
            }
            if (!settings.warningUnmappedColumns) continue;
            this.getLogger().warn(missingColMessage);
        }
        StringBuilder sqlBuilder = new StringBuilder();
        sqlBuilder.append("INSERT INTO ");
        if (settings.quoteTableName) {
            sqlBuilder.append(tableSchema.getQuotedIdentifierString()).append(tableName).append(tableSchema.getQuotedIdentifierString());
        } else {
            sqlBuilder.append(tableName);
        }
        sqlBuilder.append(" (");
        List fieldNames = recordSchema.getFieldNames();
        ArrayList<Integer> includedColumns = new ArrayList<Integer>();
        if (fieldNames != null) {
            int fieldCount = fieldNames.size();
            AtomicInteger fieldsFound = new AtomicInteger(0);
            for (int i = 0; i < fieldCount; ++i) {
                RecordField field = recordSchema.getField(i);
                String fieldName = field.getFieldName();
                ColumnDescription desc = tableSchema.getColumns().get(DatabaseRecordSink.normalizeColumnName(fieldName, settings.translateFieldNames));
                if (desc == null && !settings.ignoreUnmappedFields) {
                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
                }
                if (desc == null) continue;
                if (fieldsFound.getAndIncrement() > 0) {
                    sqlBuilder.append(", ");
                }
                if (settings.escapeColumnNames) {
                    sqlBuilder.append(tableSchema.getQuotedIdentifierString()).append(desc.getColumnName()).append(tableSchema.getQuotedIdentifierString());
                } else {
                    sqlBuilder.append(desc.getColumnName());
                }
                includedColumns.add(i);
            }
            sqlBuilder.append(") VALUES (");
            sqlBuilder.append(StringUtils.repeat((String)"?", (String)",", (int)includedColumns.size()));
            sqlBuilder.append(")");
            if (fieldsFound.get() == 0) {
                throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
            }
        }
        return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
    }

    private static class DMLSettings {
        private final boolean translateFieldNames;
        private final boolean ignoreUnmappedFields;
        private final boolean failUnmappedColumns;
        private final boolean warningUnmappedColumns;
        private final boolean escapeColumnNames;
        private final boolean quoteTableName;

        private DMLSettings(PropertyContext context) {
            this.translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
            this.ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
            this.failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
            this.warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
            this.escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
            this.quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
        }
    }

    static class TableSchema {
        private List<String> requiredColumnNames;
        private Map<String, ColumnDescription> columns = new HashMap<String, ColumnDescription>();
        private String quotedIdentifierString;

        private TableSchema(List<ColumnDescription> columnDescriptions, boolean translateColumnNames, String quotedIdentifierString) {
            this.requiredColumnNames = new ArrayList<String>();
            this.quotedIdentifierString = quotedIdentifierString;
            for (ColumnDescription desc : columnDescriptions) {
                this.columns.put(DatabaseRecordSink.normalizeColumnName(desc.columnName, translateColumnNames), desc);
                if (!desc.isRequired()) continue;
                this.requiredColumnNames.add(desc.columnName);
            }
        }

        Map<String, ColumnDescription> getColumns() {
            return this.columns;
        }

        List<String> getRequiredColumnNames() {
            return this.requiredColumnNames;
        }

        String getQuotedIdentifierString() {
            return this.quotedIdentifierString;
        }

        static TableSchema from(Connection conn, String catalog, String schema, String tableName, boolean translateColumnNames) throws SQLException {
            DatabaseMetaData dmd = conn.getMetaData();
            if (!dmd.getTables(catalog, schema, tableName, null).next()) {
                throw new SQLException("Table " + tableName + " does not exist in the database");
            }
            try (ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%");){
                ArrayList<ColumnDescription> cols = new ArrayList<ColumnDescription>();
                while (colrs.next()) {
                    ColumnDescription col = ColumnDescription.from(colrs);
                    cols.add(col);
                }
                TableSchema tableSchema = new TableSchema(cols, translateColumnNames, dmd.getIdentifierQuoteString());
                return tableSchema;
            }
        }
    }

    static class SqlAndIncludedColumns {
        String sql;
        List<Integer> fieldIndexes;

        SqlAndIncludedColumns(String sql, List<Integer> fieldIndexes) {
            this.sql = sql;
            this.fieldIndexes = fieldIndexes;
        }

        String getSql() {
            return this.sql;
        }

        List<Integer> getFieldIndexes() {
            return this.fieldIndexes;
        }
    }

    protected static class ColumnDescription {
        private final String columnName;
        private final int dataType;
        private final boolean required;
        private final Integer columnSize;

        ColumnDescription(String columnName, int dataType, boolean required, Integer columnSize) {
            this.columnName = columnName;
            this.dataType = dataType;
            this.required = required;
            this.columnSize = columnSize;
        }

        public int getDataType() {
            return this.dataType;
        }

        public Integer getColumnSize() {
            return this.columnSize;
        }

        public String getColumnName() {
            return this.columnName;
        }

        public boolean isRequired() {
            return this.required;
        }

        public static ColumnDescription from(ResultSet resultSet) throws SQLException {
            ResultSetMetaData md = resultSet.getMetaData();
            ArrayList<String> columns = new ArrayList<String>();
            for (int i = 1; i < md.getColumnCount() + 1; ++i) {
                columns.add(md.getColumnName(i));
            }
            String defaultValue = resultSet.getString("COLUMN_DEF");
            String columnName = resultSet.getString("COLUMN_NAME");
            int dataType = resultSet.getInt("DATA_TYPE");
            int colSize = resultSet.getInt("COLUMN_SIZE");
            String nullableValue = resultSet.getString("IS_NULLABLE");
            boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
            String autoIncrementValue = "NO";
            if (columns.contains("IS_AUTOINCREMENT")) {
                autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
            }
            boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
            boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
            return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : Integer.valueOf(colSize));
        }
    }
}

