/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor;
import org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch;
import org.apache.nifi.processors.elasticsearch.SearchElasticsearch;
import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
import org.apache.nifi.util.StringUtils;

@WritesAttributes(value={@WritesAttribute(attribute="mime.type", description="application/json"), @WritesAttribute(attribute="page.number", description="The number of the page (request), starting from 1, in which the results were returned that are in the output flowfile"), @WritesAttribute(attribute="hit.count", description="The number of hits that are in the output flowfile"), @WritesAttribute(attribute="elasticsearch.query.error", description="The error message provided by Elasticsearch if there is an error querying the index.")})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@PrimaryNodeOnly
@DefaultSchedule(period="1 min")
@Tags(value={"elasticsearch", "elasticsearch7", "elasticsearch8", "elasticsearch9", "query", "scroll", "page", "search", "json"})
@CapabilityDescription(value="A processor that repeatedly runs a paginated query against a field using a Range query to consume new Documents from an Elasticsearch index/query. The processor will retrieve multiple pages of results until either no more results are available or the Pagination Keep Alive expiration is reached, after which the Range query will automatically update the field constraint based on the last retrieved Document value.")
@SeeAlso(value={SearchElasticsearch.class, PaginatedJsonQueryElasticsearch.class})
@DynamicProperties(value={@DynamicProperty(name="The name of the HTTP request header", value="A Record Path expression to retrieve the HTTP request header value", expressionLanguageScope=ExpressionLanguageScope.ENVIRONMENT, description="Prefix: HEADER: - adds the specified property name/value as a HTTP request header in the Elasticsearch request. If the Record Path expression results in a null or blank value, the HTTP request header will be omitted."), @DynamicProperty(name="The name of a URL query parameter to add", value="The value of the URL query parameter", expressionLanguageScope=ExpressionLanguageScope.ENVIRONMENT, description="Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. These parameters will override any matching parameters in the query request body. For SCROLL type queries, these parameters are only used in the initial (first page) query as the Elasticsearch Scroll API does not support the same query parameters for subsequent pages of data.")})
@Stateful(scopes={Scope.CLUSTER}, description="The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp, trackingRangeValue) is retained in between invocations of this processor until the Scroll/PiT has expired (when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
@SystemResourceConsideration(resource=SystemResource.MEMORY, description="Care should be taken on the size of each page because each response from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
public class ConsumeElasticsearch
extends SearchElasticsearch {
    static final String STATE_RANGE_VALUE = "trackingRangeValue";
    public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.SIZE).clearDependsOn().build();
    public static final PropertyDescriptor AGGREGATIONS = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.AGGREGATIONS).clearDependsOn().build();
    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.SORT).clearDependsOn().build();
    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.FIELDS).clearDependsOn().build();
    public static final PropertyDescriptor SCRIPT_FIELDS = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.SCRIPT_FIELDS).clearDependsOn().build();
    public static final PropertyDescriptor RANGE_FIELD = new PropertyDescriptor.Builder().name("es-rest-range-field").displayName("Range Query Field").description("Field to be tracked as part of an Elasticsearch Range query using a \"gt\" bound match. This field must exist within the Elasticsearch document for it to be retrieved.").addValidator(StandardValidators.NON_BLANK_VALIDATOR).required(true).build();
    public static final PropertyDescriptor RANGE_FIELD_SORT_ORDER = new PropertyDescriptor.Builder().name("es-rest-sort-order").displayName("Sort Order").description("The order in which to sort the \"" + RANGE_FIELD.getDisplayName() + "\". A \"sort\" clause for the \"" + RANGE_FIELD.getDisplayName() + "\" field will be prepended to any provided \"" + SORT.getDisplayName() + "\" clauses. If a \"sort\" clause already exists for the \"" + RANGE_FIELD.getDisplayName() + "\" field, it will not be updated.").allowableValues(new String[]{"asc", "desc"}).defaultValue("asc").required(true).build();
    public static final PropertyDescriptor RANGE_INITIAL_VALUE = new PropertyDescriptor.Builder().name("es-rest-range-initial-value").displayName("Initial Value").description("The initial value to use for the query if the processor has not run previously. If the processor has run previously and stored a value in its state, this property will be ignored. If no value is provided, and the processor has not previously run, no Range query bounds will be used, i.e. all documents will be retrieved in the specified \"" + RANGE_FIELD_SORT_ORDER.getDisplayName() + "\".").addValidator(StandardValidators.NON_BLANK_VALIDATOR).required(false).build();
    public static final PropertyDescriptor RANGE_DATE_FORMAT = new PropertyDescriptor.Builder().name("es-rest-range-format").displayName(RANGE_INITIAL_VALUE.getDisplayName() + " Date Format").description("If the \"" + RANGE_FIELD.getDisplayName() + "\" is a Date field, convert the \"" + RANGE_INITIAL_VALUE.getDisplayName() + "\" to a date with this format. If not specified, Elasticsearch will use the date format provided by the \"" + RANGE_FIELD.getDisplayName() + "\"'s mapping. For valid syntax, see https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html").addValidator(StandardValidators.NON_BLANK_VALIDATOR).dependsOn(RANGE_INITIAL_VALUE, new AllowableValue[0]).required(false).build();
    public static final PropertyDescriptor RANGE_TIME_ZONE = new PropertyDescriptor.Builder().name("es-rest-range-time-zone").displayName(RANGE_INITIAL_VALUE.getDisplayName() + " Date Time Zone").description("If the \"" + RANGE_FIELD.getDisplayName() + "\" is a Date field, convert the \"" + RANGE_INITIAL_VALUE.getDisplayName() + "\" to UTC with this time zone. Valid values are ISO 8601 UTC offsets, such as \"+01:00\" or \"-08:00\", and IANA time zone IDs, such as \"Europe/London\".").addValidator(StandardValidators.NON_BLANK_VALIDATOR).dependsOn(RANGE_INITIAL_VALUE, new AllowableValue[0]).required(false).build();
    public static final PropertyDescriptor ADDITIONAL_FILTERS = new PropertyDescriptor.Builder().name("es-rest-additional-filters").displayName("Additional Filters").description("One or more query filters in JSON syntax, not Lucene syntax. Ex: [{\"match\":{\"somefield\":\"somevalue\"}}, {\"match\":{\"anotherfield\":\"anothervalue\"}}]. These filters wil be used as part of a Bool query's filter.").addValidator((Validator)JsonValidator.INSTANCE).required(false).build();
    private static final List<PropertyDescriptor> propertyDescriptors = Stream.concat(Stream.of(RANGE_FIELD, RANGE_FIELD_SORT_ORDER, RANGE_INITIAL_VALUE, RANGE_DATE_FORMAT, RANGE_TIME_ZONE, ADDITIONAL_FILTERS), scrollPropertyDescriptors.stream().filter(pd -> !QUERY.equals(pd) && !QUERY_CLAUSE.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd) && !RESTART_ON_FINISH.equals(pd)).map(property -> {
        if (property == ElasticsearchRestProcessor.SIZE) {
            return SIZE;
        }
        if (property == ElasticsearchRestProcessor.AGGREGATIONS) {
            return AGGREGATIONS;
        }
        if (property == ElasticsearchRestProcessor.SORT) {
            return SORT;
        }
        if (property == ElasticsearchRestProcessor.FIELDS) {
            return FIELDS;
        }
        if (property == ElasticsearchRestProcessor.SCRIPT_FIELDS) {
            return SCRIPT_FIELDS;
        }
        return property;
    })).toList();
    protected String trackingRangeField;
    protected String trackingSortOrder;

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override
    Scope getStateScope() {
        return Scope.CLUSTER;
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
        this.trackingRangeField = context.getProperty(RANGE_FIELD).getValue();
        this.trackingSortOrder = context.getProperty(RANGE_FIELD_SORT_ORDER).getValue();
    }

    @Override
    @OnStopped
    public void onStopped() {
        super.onStopped();
        this.trackingRangeField = null;
        this.trackingSortOrder = null;
    }

    private String getTrackingRangeField(ProcessContext context) {
        String field = this.trackingRangeField != null ? this.trackingRangeField : (context != null ? context.getProperty(RANGE_FIELD).getValue() : null);
        return field;
    }

    private String getTrackingSortOrder(ProcessContext context) {
        String sortOrder = this.trackingSortOrder != null ? this.trackingSortOrder : (context != null ? context.getProperty(RANGE_FIELD_SORT_ORDER).getValue() : null);
        return sortOrder;
    }

    @Override
    PaginatedJsonQueryParameters buildJsonQueryParameters(FlowFile input, ProcessContext context, ProcessSession session) throws IOException {
        PaginatedJsonQueryParameters paginatedQueryJsonParameters = super.buildJsonQueryParameters(input, context, session);
        paginatedQueryJsonParameters.setTrackingRangeValue(this.getTrackingRangeValueOrDefault(context));
        return paginatedQueryJsonParameters;
    }

    @Override
    public void addQueryClause(Map<String, Object> query, Map<String, String> attributes, final ProcessContext context, ObjectMapper mapper) throws IOException {
        ArrayList<Map> filters = new ArrayList<Map>(10);
        final String trackingRangeValue = this.getTrackingRangeValueOrDefault(context);
        if (StringUtils.isNotBlank((String)trackingRangeValue)) {
            filters.add(Collections.singletonMap("range", Collections.singletonMap(this.getTrackingRangeField(context), new HashMap<String, String>(3, 1.0f){
                {
                    super(arg0, arg1);
                    this.put("gt", trackingRangeValue);
                    if (context.getProperty(RANGE_INITIAL_VALUE).isSet()) {
                        if (context.getProperty(RANGE_DATE_FORMAT).isSet()) {
                            this.put("format", context.getProperty(RANGE_DATE_FORMAT).getValue());
                        }
                        if (context.getProperty(RANGE_TIME_ZONE).isSet()) {
                            this.put("time_zone", context.getProperty(RANGE_TIME_ZONE).getValue());
                        }
                    }
                }
            })));
        }
        if (context.getProperty(ADDITIONAL_FILTERS).isSet()) {
            JsonNode additionalFilters = mapper.readTree(context.getProperty(ADDITIONAL_FILTERS).getValue());
            if (additionalFilters.isArray()) {
                filters.addAll((Collection)mapper.convertValue((Object)additionalFilters, (TypeReference)new TypeReference<List<Map<String, Object>>>(this){}));
            } else {
                filters.add((Map)mapper.convertValue((Object)additionalFilters, (TypeReference)new TypeReference<Map<String, Object>>(this){}));
            }
        }
        if (!filters.isEmpty()) {
            Map bool = Collections.singletonMap("bool", Collections.singletonMap("filter", filters));
            query.put("query", bool);
        }
    }

    @Override
    public void addSortClause(Map<String, Object> query, Map<String, String> attributes, ProcessContext context, ObjectMapper mapper) throws IOException {
        ArrayList<Map<String, String>> sort;
        super.addSortClause(query, attributes, context, mapper);
        if (query.containsKey("sort")) {
            sort = (ArrayList<Map<String, String>>)query.get("sort");
        } else {
            sort = new ArrayList<Map<String, String>>(1);
            query.put("sort", sort);
        }
        if (sort.stream().noneMatch(s -> s.containsKey(this.getTrackingRangeField(context)))) {
            sort.addFirst(Collections.singletonMap(this.getTrackingRangeField(context), this.getTrackingSortOrder(context)));
        }
    }

    @Override
    void additionalState(Map<String, String> newStateMap, PaginatedJsonQueryParameters paginatedJsonQueryParameters) {
        newStateMap.put(STATE_RANGE_VALUE, paginatedJsonQueryParameters.getTrackingRangeValue());
    }

    @Override
    void updateQueryParameters(PaginatedJsonQueryParameters paginatedJsonQueryParameters, SearchResponse response) {
        super.updateQueryParameters(paginatedJsonQueryParameters, response);
        if (!response.getHits().isEmpty()) {
            int trackingHitIndex;
            if ("desc".equals(this.getTrackingSortOrder(null)) && paginatedJsonQueryParameters.getPageCount() == 1) {
                trackingHitIndex = 0;
            } else if ("asc".equals(this.getTrackingSortOrder(null))) {
                trackingHitIndex = response.getHits().size() - 1;
            } else {
                return;
            }
            String nextValue = String.valueOf(((Map)((Map)response.getHits().get(trackingHitIndex)).get("_source")).get(this.getTrackingRangeField(null)));
            if (StringUtils.isNotBlank((String)nextValue)) {
                paginatedJsonQueryParameters.setTrackingRangeValue(nextValue);
            }
        }
    }

    private String getTrackingRangeValueOrDefault(ProcessContext context) throws IOException {
        StateMap stateMap = context.getStateManager().getState(this.getStateScope());
        return stateMap == null || stateMap.get(STATE_RANGE_VALUE) == null ? context.getProperty(RANGE_INITIAL_VALUE).getValue() : stateMap.get(STATE_RANGE_VALUE);
    }
}

