/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.lang.invoke.CallSite;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StandardColumnDefinition;
import org.apache.nifi.database.dialect.service.api.StandardPageRequest;
import org.apache.nifi.database.dialect.service.api.StandardQueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementResponse;
import org.apache.nifi.database.dialect.service.api.StatementType;
import org.apache.nifi.database.dialect.service.api.TableDefinition;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor;
import org.apache.nifi.processors.standard.ExecuteSQL;
import org.apache.nifi.processors.standard.ListDatabaseTables;
import org.apache.nifi.processors.standard.QueryDatabaseTable;

@TriggerSerially
@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@Tags(value={"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
@SeeAlso(value={QueryDatabaseTable.class, ExecuteSQL.class, ListDatabaseTables.class})
@CapabilityDescription(value="Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This processor is intended to be run on the Primary Node only.\n\nThis processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:\n  - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many fields, but no FlowFile attributes are available. However the properties will be evaluated using the Environment/System properties.\n  - If incoming connection(s) are specified and no FlowFile is available to a processor task, no work will be performed.\n  - If incoming connection(s) are specified and a FlowFile is available to a processor task, the FlowFile's attributes may be used in Expression Language for such fields as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.")
@Stateful(scopes={Scope.CLUSTER}, description="After performing a query on the specified table, the maximum values for the specified column(s) will be retained for use in future executions of the query. This allows the Processor to fetch only those records that have max values greater than the retained values. This can be used for incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor per the State Management documentation")
@WritesAttributes(value={@WritesAttribute(attribute="generatetablefetch.sql.error", description="If the processor has incoming connections, and processing an incoming FlowFile causes a SQL Exception, the FlowFile is routed to failure and this attribute is set to the exception message."), @WritesAttribute(attribute="generatetablefetch.tableName", description="The name of the database table to be queried."), @WritesAttribute(attribute="generatetablefetch.columnNames", description="The comma-separated list of column names used in the query."), @WritesAttribute(attribute="generatetablefetch.whereClause", description="Where clause used in the query to get the expected rows."), @WritesAttribute(attribute="generatetablefetch.maxColumnNames", description="The comma-separated list of column names used to keep track of data that has been returned since the processor started running."), @WritesAttribute(attribute="generatetablefetch.limit", description="The number of result rows to be fetched by the SQL statement."), @WritesAttribute(attribute="generatetablefetch.offset", description="Offset to be used to retrieve the corresponding partition."), @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles generated from the same query result set will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @WritesAttribute(attribute="fragment.count", description="This is the total number of  FlowFiles produced by a single ResultSet. This can be used in conjunction with the fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."), @WritesAttribute(attribute="fragment.index", description="This is the position of this FlowFile in the list of outgoing FlowFiles that were all generated from the same execution. This can be used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same execution and in what order  FlowFiles were produced")})
@DynamicProperty(name="initial.maxvalue.<max_value_column>", value="Initial maximum value for the specified column", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="Specifies an initial max value for max value columns. Properties should be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified). In the case of incoming connections, the value is only used the first time for each table specified in the FlowFiles.")
public class GenerateTableFetch
extends AbstractDatabaseFetchProcessor {
    public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder().name("Partition Size").description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows in the table.").defaultValue("10000").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).build();
    static final PropertyDescriptor COLUMN_FOR_VALUE_PARTITIONING = new PropertyDescriptor.Builder().name("Column for Value Partitioning").description("The name of a column whose values will be used for partitioning. The default behavior is to use row numbers on the result set for partitioning into 'pages' to be fetched from the database, using an offset/limit strategy. However for certain databases, it can be more efficient under the right circumstances to use the column values themselves to define the 'pages'. This property should only be used when the default queries are not performing well, when there is no maximum-value column or a single maximum-value column whose type can be coerced to a long integer (i.e. not date or timestamp), and the column values are evenly distributed and not sparse, for best performance.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS = new PropertyDescriptor.Builder().name("Output Empty FlowFile on Zero Results").description("Depending on the specified properties, an execution of this processor may not result in any SQL statements generated. When this property is true, an empty FlowFile will be generated (having the parent of the incoming FlowFile if present) and transferred to the 'success' relationship. When this property is false, no output FlowFiles will be generated.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    static final PropertyDescriptor CUSTOM_ORDERBY_COLUMN = new PropertyDescriptor.Builder().name("Custom ORDER BY Column").description("The name of a column to be used for ordering the results if Max-Value Columns are not provided and partitioning is enabled. This property is ignored if either Max-Value Columns is set or Partition Size = 0. NOTE: If neither Max-Value Columns nor Custom ORDER BY Column is set, then depending on the the database/driver, the processor may report an error and/or the generated SQL may result in missing and/or duplicate rows. This is because without an explicit ordering, fetching each partition is done using an arbitrary ordering.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(DBCP_SERVICE, DB_TYPE, DATABASE_DIALECT_SERVICE, TABLE_NAME, COLUMN_NAMES, MAX_VALUE_COLUMN_NAMES, QUERY_TIMEOUT, PARTITION_SIZE, COLUMN_FOR_VALUE_PARTITIONING, WHERE_CLAUSE, CUSTOM_ORDERBY_COLUMN, OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS);
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. If no incoming connection(s) are specified, this relationship is unused.").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);

    public GenerateTableFetch() {
        this.propDescriptors = PROPERTY_DESCRIPTORS;
        this.relationships = RELATIONSHIPS;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propDescriptors;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).required(false).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dynamic(true).build();
    }

    @Override
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>(super.customValidate(validationContext));
        PropertyValue columnForPartitioning = validationContext.getProperty(COLUMN_FOR_VALUE_PARTITIONING);
        if (columnForPartitioning.isSet() && !columnForPartitioning.isExpressionLanguagePresent() && columnForPartitioning.getValue().contains(",")) {
            results.add(new ValidationResult.Builder().valid(false).explanation(COLUMN_FOR_VALUE_PARTITIONING.getDisplayName() + " requires a single column name, but a comma was detected").build());
        }
        return results;
    }

    @Override
    @OnScheduled
    public void setup(ProcessContext context) {
        if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
            this.getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor.");
        }
    }

    @OnStopped
    public void stop() {
        this.setupComplete.set(false);
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        StateMap stateMap;
        if (!(this.isDynamicTableName || this.isDynamicMaxValues || this.setupComplete.get())) {
            super.setup(context);
        }
        ProcessSession session = sessionFactory.createSession();
        FlowFile fileToProcess = null;
        if (context.hasIncomingConnection() && (fileToProcess = session.get()) == null) {
            return;
        }
        this.maxValueProperties = this.getDefaultMaxValueProperties(context, fileToProcess);
        ComponentLog logger = this.getLogger();
        DBCPService dbcpService = (DBCPService)context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
        DatabaseDialectService databaseDialectService = this.getDatabaseDialectService((PropertyContext)context);
        String databaseType = context.getProperty(DB_TYPE).getValue();
        String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue();
        String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
        String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
        int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
        String columnForPartitioning = context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
        boolean useColumnValsForPaging = !StringUtils.isEmpty((CharSequence)columnForPartitioning);
        String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
        String customOrderByColumn = context.getProperty(CUSTOM_ORDERBY_COLUMN).evaluateAttributeExpressions(fileToProcess).getValue();
        boolean outputEmptyFlowFileOnZeroResults = context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean();
        FlowFile finalFileToProcess = fileToProcess;
        try {
            stateMap = session.getState(Scope.CLUSTER);
        }
        catch (IOException ioe) {
            logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform query until this is accomplished.", (Throwable)ioe);
            context.yield();
            return;
        }
        try {
            HashMap<String, String> statePropertyMap = new HashMap<String, String>(stateMap.toMap());
            for (Map.Entry maxProp : this.maxValueProperties.entrySet()) {
                String maxPropKey = ((String)maxProp.getKey()).toLowerCase();
                String fullyQualifiedMaxPropKey = GenerateTableFetch.getStateKey(tableName, maxPropKey);
                if (statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) continue;
                String newMaxPropValue = statePropertyMap.containsKey(maxPropKey) ? (String)statePropertyMap.get(maxPropKey) : (String)maxProp.getValue();
                statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
            }
            List<Object> maxValueColumnNameList = StringUtils.isEmpty((CharSequence)maxValueColumnNames) ? new ArrayList(0) : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
            int numMaxValueColumns = maxValueColumnNameList.size();
            ArrayList<CallSite> maxValueClauses = new ArrayList<CallSite>(numMaxValueColumns);
            Long maxValueForPartitioning = null;
            Long minValueForPartitioning = null;
            ArrayList<String> maxValueSelectColumns = new ArrayList<String>(numMaxValueColumns + 1);
            if (useColumnValsForPaging || partitionSize == 0) {
                maxValueSelectColumns.add("-1");
            } else {
                maxValueSelectColumns.add("COUNT(*)");
            }
            IntStream.range(0, numMaxValueColumns).forEach(index -> {
                String colName = (String)maxValueColumnNameList.get(index);
                maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                String maxValue = this.getColumnStateMaxValue(tableName, statePropertyMap, colName);
                if (!StringUtils.isEmpty((CharSequence)maxValue)) {
                    if (this.columnTypeMap.isEmpty() || this.getColumnType(tableName, colName) == null) {
                        super.setup(context, false, finalFileToProcess);
                    }
                    Integer type = this.getColumnType(tableName, colName);
                    maxValueClauses.add((CallSite)((Object)(colName + (index == 0 ? " > " : " >= ") + GenerateTableFetch.getLiteralByType(type, maxValue, databaseType))));
                }
            });
            if (useColumnValsForPaging) {
                if (columnForPartitioning.contains(",")) {
                    throw new ProcessException(COLUMN_FOR_VALUE_PARTITIONING.getDisplayName() + " requires a single column name, but a comma was detected");
                }
                maxValueSelectColumns.add("MAX(" + columnForPartitioning + ") " + columnForPartitioning);
                maxValueSelectColumns.add("MIN(" + columnForPartitioning + ") MIN_" + columnForPartitioning);
            }
            if (customWhereClause != null) {
                maxValueClauses.add((CallSite)((Object)("(" + customWhereClause + ")")));
            }
            String maxWhereClause = StringUtils.join(maxValueClauses, (String)" AND ");
            QueryStatementRequest queryStatementRequest = this.getMaxColumnStatementRequest(tableName, maxValueSelectColumns, maxWhereClause);
            StatementResponse statementResponse = databaseDialectService.getStatement((StatementRequest)queryStatementRequest);
            String selectQuery = statementResponse.sql();
            try (Connection con = dbcpService.getConnection(finalFileToProcess == null ? Collections.emptyMap() : finalFileToProcess.getAttributes());
                 Statement st = con.createStatement();){
                String maxColumnNames;
                long numberOfFetches;
                long rowCount;
                int queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
                st.setQueryTimeout(queryTimeout);
                logger.debug("Executing {}", new Object[]{selectQuery});
                ResultSet resultSet = st.executeQuery(selectQuery);
                if (resultSet.next()) {
                    int i;
                    rowCount = resultSet.getLong(1);
                    ResultSetMetaData rsmd = resultSet.getMetaData();
                    for (i = 2; i <= numMaxValueColumns + 1; ++i) {
                        String resultColumnName = (StringUtils.isNotEmpty((CharSequence)rsmd.getColumnLabel(i)) ? rsmd.getColumnLabel(i) : rsmd.getColumnName(i)).toLowerCase();
                        String fullyQualifiedStateKey = GenerateTableFetch.getStateKey(tableName, resultColumnName);
                        String resultColumnCurrentMax = (String)statePropertyMap.get(fullyQualifiedStateKey);
                        if (StringUtils.isEmpty((CharSequence)resultColumnCurrentMax) && !this.isDynamicTableName) {
                            resultColumnCurrentMax = (String)statePropertyMap.get(resultColumnName);
                        }
                        int type = rsmd.getColumnType(i);
                        if (this.isDynamicTableName) {
                            this.columnTypeMap.put(fullyQualifiedStateKey, type);
                        }
                        try {
                            String newMaxValue = GenerateTableFetch.getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax);
                            if (newMaxValue == null) continue;
                            statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
                            continue;
                        }
                        catch (IOException | ClassCastException | ParseException pice) {
                            throw new ProcessException((Throwable)pice);
                        }
                    }
                    if (useColumnValsForPaging) {
                        Object o = resultSet.getObject(i);
                        maxValueForPartitioning = o == null ? null : Long.valueOf(o.toString());
                        o = resultSet.getObject(i + 1);
                        minValueForPartitioning = o == null ? null : Long.valueOf(o.toString());
                    }
                } else {
                    throw new SQLException("No rows returned from metadata query: " + selectQuery);
                }
                IntStream.range(0, numMaxValueColumns).forEach(index -> {
                    String colName = (String)maxValueColumnNameList.get(index);
                    String maxValue = this.getColumnStateMaxValue(tableName, statePropertyMap, colName);
                    if (!StringUtils.isEmpty((CharSequence)maxValue)) {
                        if (this.columnTypeMap.isEmpty() || this.getColumnType(tableName, colName) == null) {
                            super.setup(context, false, finalFileToProcess);
                        }
                        Integer type = this.getColumnType(tableName, colName);
                        maxValueClauses.add((CallSite)((Object)(colName + " <= " + GenerateTableFetch.getLiteralByType(type, maxValue, databaseType))));
                    }
                });
                if (useColumnValsForPaging) {
                    long valueRangeSize;
                    long l = valueRangeSize = maxValueForPartitioning == null ? 0L : maxValueForPartitioning - minValueForPartitioning + 1L;
                    numberOfFetches = partitionSize == 0 ? 1L : valueRangeSize / (long)partitionSize + (long)(valueRangeSize % (long)partitionSize == 0L ? 0 : 1);
                } else {
                    numberOfFetches = partitionSize == 0 ? 1L : rowCount / (long)partitionSize + (long)(rowCount % (long)partitionSize == 0L ? 0 : 1);
                }
                String fragmentIdentifier = UUID.randomUUID().toString();
                ArrayList<FlowFile> flowFilesToTransfer = new ArrayList<FlowFile>();
                HashMap<String, String> baseAttributes = new HashMap<String, String>();
                baseAttributes.put("generatetablefetch.tableName", tableName);
                if (columnNames != null) {
                    baseAttributes.put("generatetablefetch.columnNames", columnNames);
                }
                if (StringUtils.isNotBlank((CharSequence)(maxColumnNames = StringUtils.join(maxValueColumnNameList, (String)", ")))) {
                    baseAttributes.put("generatetablefetch.maxColumnNames", maxColumnNames);
                }
                baseAttributes.put(FRAGMENT_ID, fragmentIdentifier);
                baseAttributes.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches));
                if (numberOfFetches == 0L && outputEmptyFlowFileOnZeroResults) {
                    FlowFile emptyFlowFile = fileToProcess == null ? session.create() : session.create(fileToProcess);
                    HashMap<String, String> attributesToAdd = new HashMap<String, String>();
                    String fetchWhereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, (String)" AND ");
                    attributesToAdd.put("generatetablefetch.whereClause", fetchWhereClause);
                    attributesToAdd.put("generatetablefetch.limit", null);
                    if (partitionSize != 0) {
                        attributesToAdd.put("generatetablefetch.offset", null);
                    }
                    attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(0));
                    attributesToAdd.putAll(baseAttributes);
                    emptyFlowFile = session.putAllAttributes(emptyFlowFile, attributesToAdd);
                    flowFilesToTransfer.add(emptyFlowFile);
                } else {
                    Long limit = partitionSize == 0 ? null : Long.valueOf(partitionSize);
                    for (long i = 0L; i < numberOfFetches; ++i) {
                        Long offset;
                        String whereClause;
                        if (i == numberOfFetches - 1L && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) {
                            maxValueClauses.add((CallSite)((Object)(columnForPartitioning + " <= " + maxValueForPartitioning)));
                            limit = null;
                        }
                        String string = whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, (String)" AND ");
                        Long l = partitionSize == 0 ? null : (offset = Long.valueOf(i * (long)partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0L)));
                        String orderByClause = partitionSize == 0 ? null : (maxColumnNames.isEmpty() ? customOrderByColumn : maxColumnNames);
                        List<Object> namedColumns = columnNames == null ? List.of() : Arrays.asList(columnNames.split(", "));
                        QueryStatementRequest selectStatementRequest = this.getSelectStatementRequest(tableName, namedColumns, whereClause, orderByClause, offset, limit, columnForPartitioning);
                        StatementResponse selectStatementResponse = databaseDialectService.getStatement((StatementRequest)selectStatementRequest);
                        String query = selectStatementResponse.sql();
                        FlowFile sqlFlowFile = fileToProcess == null ? session.create() : session.create(fileToProcess);
                        sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
                        HashMap<String, String> attributesToAdd = new HashMap<String, String>();
                        attributesToAdd.put("generatetablefetch.whereClause", whereClause);
                        attributesToAdd.put("generatetablefetch.limit", limit == null ? null : limit.toString());
                        if (partitionSize != 0) {
                            attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset));
                        }
                        attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i));
                        attributesToAdd.putAll(baseAttributes);
                        sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd);
                        flowFilesToTransfer.add(sqlFlowFile);
                    }
                }
                session.transfer(flowFilesToTransfer, REL_SUCCESS);
                if (fileToProcess != null) {
                    session.remove(fileToProcess);
                }
            }
            catch (SQLException e) {
                if (fileToProcess != null) {
                    logger.error("Routing {} to failure since unable to execute SQL select query {}", new Object[]{fileToProcess, selectQuery, e});
                    fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage());
                    session.transfer(fileToProcess, REL_FAILURE);
                }
                logger.error("Unable to execute SQL select query {}", new Object[]{selectQuery, e});
                throw new ProcessException((Throwable)e);
            }
            try {
                session.setState(statePropertyMap, Scope.CLUSTER);
            }
            catch (IOException ioe) {
                logger.error("{} failed to update State Manager, observed maximum values will not be recorded. Also, any generated SQL statements may be duplicated.", new Object[]{this, ioe});
            }
            session.commitAsync();
        }
        catch (ProcessException pe) {
            Throwable t = pe.getCause() == null ? pe : pe.getCause();
            logger.error("Error during processing: {}", new Object[]{t.getMessage(), t});
            session.rollback();
            context.yield();
        }
    }

    @Override
    public void migrateProperties(PropertyConfiguration config) {
        super.migrateProperties(config);
        config.renameProperty("gen-table-fetch-partition-size", PARTITION_SIZE.getName());
        config.renameProperty("gen-table-column-for-val-partitioning", COLUMN_FOR_VALUE_PARTITIONING.getName());
        config.renameProperty("gen-table-output-flowfile-on-zero-results", OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS.getName());
        config.renameProperty("gen-table-custom-orderby-column", CUSTOM_ORDERBY_COLUMN.getName());
    }

    private QueryStatementRequest getMaxColumnStatementRequest(String tableName, List<String> maxValueSelectColumns, String whereClause) {
        List<ColumnDefinition> maxValueColumns = maxValueSelectColumns.stream().map(StandardColumnDefinition::new).map(ColumnDefinition.class::cast).toList();
        TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, maxValueColumns);
        return new StandardQueryStatementRequest(StatementType.SELECT, tableDefinition, Optional.empty(), Optional.of(whereClause), Optional.empty(), Optional.empty());
    }

    private QueryStatementRequest getSelectStatementRequest(String tableName, List<String> namedColumns, String whereClause, String orderByClause, Long offset, Long limit, String indexColumnName) {
        StandardPageRequest pageRequest;
        List<ColumnDefinition> maxValueColumns = namedColumns.stream().map(StandardColumnDefinition::new).map(ColumnDefinition.class::cast).toList();
        TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, maxValueColumns);
        Optional<String> orderByClauseFound = Optional.ofNullable(orderByClause);
        Optional<String> whereClauseFound = Optional.ofNullable(whereClause);
        if (offset == null) {
            pageRequest = null;
        } else {
            OptionalLong pageLimit = limit == null ? OptionalLong.empty() : OptionalLong.of(limit);
            pageRequest = new StandardPageRequest(offset.longValue(), pageLimit, Optional.ofNullable(indexColumnName));
        }
        return new StandardQueryStatementRequest(StatementType.SELECT, tableDefinition, Optional.empty(), whereClauseFound, orderByClauseFound, Optional.ofNullable(pageRequest));
    }

    private String getColumnStateMaxValue(String tableName, Map<String, String> statePropertyMap, String colName) {
        String fullyQualifiedStateKey = GenerateTableFetch.getStateKey(tableName, colName);
        String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
        if (StringUtils.isEmpty((CharSequence)maxValue) && !this.isDynamicTableName) {
            maxValue = statePropertyMap.get(GenerateTableFetch.getStateKey(null, colName));
        }
        return maxValue;
    }

    private Integer getColumnType(String tableName, String colName) {
        String fullyQualifiedStateKey = GenerateTableFetch.getStateKey(tableName, colName);
        Integer type = (Integer)this.columnTypeMap.get(fullyQualifiedStateKey);
        if (type == null && !this.isDynamicTableName) {
            type = (Integer)this.columnTypeMap.get(GenerateTableFetch.getStateKey(null, colName));
        }
        return type;
    }
}

