/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.mongodb;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBObject;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mongodb.AbstractMongoProcessor;
import org.bson.Document;
import org.bson.conversions.Bson;

@Tags(value={"mongo", "aggregation", "aggregate"})
@CapabilityDescription(value="A processor that runs an aggregation query whenever a flowfile is received.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
public class RunMongoAggregation
extends AbstractMongoProcessor {
    static final Relationship REL_ORIGINAL = new Relationship.Builder().description("The input flowfile gets sent to this relationship when the query succeeds.").name("original").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().description("The input flowfile gets sent to this relationship when the query fails.").name("failure").build();
    static final Relationship REL_RESULTS = new Relationship.Builder().description("The result set of the aggregation will be sent to this relationship.").name("results").build();
    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().name("mongo-agg-query").displayName("Query").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).description("The aggregation query to be executed.").required(true).addValidator((Validator)JsonValidator.INSTANCE).build();
    static final PropertyDescriptor ALLOW_DISK_USE = new PropertyDescriptor.Builder().name("allow-disk-use").displayName("Allow Disk Use").description("Set this to true to enable writing data to temporary files to prevent exceeding the maximum memory use limit during aggregation pipeline staged when handling large datasets.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_RESULTS, REL_ORIGINAL, REL_FAILURE);
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(RunMongoAggregation.getCommonPropertyDescriptors().stream(), Stream.of(CHARSET, QUERY, ALLOW_DISK_USE, JSON_TYPE, QUERY_ATTRIBUTE, BATCH_SIZE, RESULTS_PER_FLOWFILE, DATE_FORMAT)).toList();

    static final List<Bson> buildAggregationQuery(String query) throws IOException {
        ArrayList<Bson> result = new ArrayList<Bson>();
        ObjectMapper mapper = new ObjectMapper();
        List querySteps = (List)mapper.readValue(query, List.class);
        for (Map queryStep : querySteps) {
            BasicDBObject bson = BasicDBObject.parse((String)mapper.writeValueAsString((Object)queryStep));
            result.add((Bson)bson);
        }
        return result;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    private String buildBatch(List<Document> batch) {
        String retVal;
        try {
            retVal = this.objectMapper.writeValueAsString((Object)(batch.size() > 1 ? batch : batch.getFirst()));
        }
        catch (Exception e) {
            retVal = null;
        }
        return retVal;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = null;
        if (context.hasIncomingConnection() && (flowFile = session.get()) == null && context.hasNonLoopConnection()) {
            return;
        }
        String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
        Boolean allowDiskUse = context.getProperty(ALLOW_DISK_USE).asBoolean();
        String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
        Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
        Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
        String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
        String dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions(flowFile).getValue();
        this.configureMapper(jsonTypeSetting, dateFormat);
        HashMap<String, String> attrs = new HashMap<String, String>();
        if (queryAttr != null && !queryAttr.trim().isEmpty()) {
            attrs.put(queryAttr, query);
        }
        try (MongoCursor iter = null;){
            MongoCollection<Document> collection = this.getCollection(context, flowFile);
            List<Bson> aggQuery = RunMongoAggregation.buildAggregationQuery(query);
            AggregateIterable it = collection.aggregate(aggQuery).allowDiskUse(allowDiskUse);
            it.batchSize(batchSize != null ? batchSize : 1);
            iter = it.iterator();
            ArrayList<Object> batch = new ArrayList<Document>();
            Boolean doneSomething = false;
            while (iter.hasNext()) {
                batch.add((Document)iter.next());
                if (batch.size() != resultsPerFlowfile.intValue()) continue;
                this.writeBatch(this.buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
                batch = new ArrayList();
                doneSomething = doneSomething | true;
            }
            if (!batch.isEmpty()) {
                this.writeBatch(this.buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
            } else if (!doneSomething.booleanValue()) {
                this.writeBatch("", flowFile, context, session, attrs, REL_RESULTS);
            }
            if (flowFile != null) {
                session.transfer(flowFile, REL_ORIGINAL);
            }
        }
    }
}

