/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.SequencedCollection;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;

@SideEffectFree
@SupportsBatching
@Tags(value={"record", "stats", "metrics"})
@CapabilityDescription(value="Counts the number of Records in a record set, optionally counting the number of elements per category, where the categories are defined by user-defined properties.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name="The name of the category. For example, sport", value="The RecordPath that points to the value of the category. For example /sport", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="Specifies a category that should be counted. For example, if the property name is 'sport' and the value is '/sport', the processor will count how many records have a value of 'soccer' for the /sport field, how many have a value of 'baseball' for the /sport, and so on. These counts be added as attributes named recordStats.sport.soccer, recordStats.sport.baseball.")
@WritesAttributes(value={@WritesAttribute(attribute="record.count", description="A count of the records in the record set in the FlowFile."), @WritesAttribute(attribute="recordStats.<User Defined Property Name>.count", description="A count of the records that contain a value for the user defined property."), @WritesAttribute(attribute="recordStats.<User Defined Property Name>.<value>.count", description="Each value discovered for the user defined property will have its own count attribute. Total number of top N value counts to be added is defined by the limit configuration.")})
public class CalculateRecordStats
extends AbstractProcessor {
    static final String RECORD_COUNT_ATTR = "record.count";
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").description("A record reader to use for reading the records.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder().name("Record Stats Limit").description("Limit the number of individual stats that are returned for each record path to the top N results.").required(true).defaultValue("10").addValidator(StandardValidators.INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(RECORD_READER, LIMIT);
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are successfully processed, are routed to this Relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile cannot be processed for any reason, it is routed to this Relationship.").build();
    static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private RecordPathCache cache;

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).displayName(propertyDescriptorName).dynamic(true).addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnScheduled
    public void onEnabled(ProcessContext context) {
        this.cache = new RecordPathCache(25);
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        try {
            Map<String, RecordPath> recordPaths = this.getRecordPaths(context, flowFile);
            Map<String, String> stats = this.calculateStats(flowFile, recordPaths, context, session);
            flowFile = session.putAllAttributes(flowFile, stats);
            session.transfer(flowFile, REL_SUCCESS);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to process stats for {}", new Object[]{flowFile, e});
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    public void migrateProperties(PropertyConfiguration config) {
        config.renameProperty("record-stats-reader", RECORD_READER.getName());
        config.renameProperty("record-stats-limit", LIMIT.getName());
    }

    protected Map<String, RecordPath> getRecordPaths(ProcessContext context, FlowFile flowFile) {
        return context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic).collect(Collectors.toMap(PropertyDescriptor::getName, propertyName -> {
            String val = context.getProperty(propertyName).evaluateAttributeExpressions(flowFile).getValue();
            return this.cache.getCompiled(val);
        }));
    }

    protected Map<String, String> calculateStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) throws IOException, SchemaNotFoundException, MalformedRecordException {
        try (InputStream is = session.read(flowFile);){
            Record record;
            RecordReaderFactory factory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            int limit = context.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger();
            RecordReader reader = factory.createRecordReader(flowFile, is, this.getLogger());
            HashMap<String, Integer> stats = new HashMap<String, Integer>();
            int recordCount = 0;
            LinkedHashSet<String> baseKeys = new LinkedHashSet<String>();
            while ((record = reader.nextRecord()) != null) {
                for (Map.Entry<String, RecordPath> entry2 : paths.entrySet()) {
                    RecordPathResult result = entry2.getValue().evaluate(record);
                    result.getSelectedFields().forEach(selectedField -> {
                        Object selectedValue = selectedField.getValue();
                        String approxValue = selectedValue == null ? "<null>" : selectedValue.toString();
                        String baseKey = "recordStats." + (String)entry2.getKey();
                        String key = baseKey + "." + approxValue;
                        int stat = stats.getOrDefault(key, 0);
                        int baseStat = stats.getOrDefault(baseKey, 0);
                        stats.put(key, stat + 1);
                        if (selectedValue != null) {
                            stats.put(baseKey, baseStat + 1);
                        }
                        baseKeys.add(baseKey);
                    });
                }
                ++recordCount;
            }
            Map<String, Integer> limited = this.filterBySize(stats, limit, baseKeys);
            limited.put(RECORD_COUNT_ATTR, recordCount);
            Map<String, String> map = limited.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((Integer)entry.getValue()).toString()));
            return map;
        }
    }

    protected Map<String, Integer> filterBySize(Map<String, Integer> values, int limit, Collection<String> baseKeys) {
        if (values.size() <= limit) {
            return values;
        }
        Map<String, Integer> toFilter = values.entrySet().stream().filter(e -> !baseKeys.contains(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        SequencedCollection<Map.Entry<String, Integer>> entryList = new ArrayList<Map.Entry<String, Integer>>(toFilter.entrySet());
        entryList.sort(Map.Entry.comparingByValue());
        entryList = entryList.reversed();
        List topEntries = entryList.subList(0, limit);
        HashMap<String, Integer> limitedValues = new HashMap<String, Integer>();
        values.forEach((k, v) -> {
            if (baseKeys.contains(k)) {
                limitedValues.put((String)k, (Integer)v);
            }
        });
        for (Map.Entry entry : topEntries) {
            limitedValues.put((String)entry.getKey(), (Integer)entry.getValue());
        }
        return limitedValues;
    }
}

