/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.mongodb;

import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateManyModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mongodb.AbstractMongoProcessor;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.bson.Document;
import org.bson.conversions.Bson;

@Tags(value={"mongodb", "insert", "update", "upsert", "record", "put"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="This processor is a record-aware processor for inserting/upserting data into MongoDB. It uses a configured record reader and schema to read an incoming record set from the body of a flowfile and then inserts/upserts batches of those records into a configured MongoDB collection. This processor does not support deletes. The number of documents to insert/upsert at a time is controlled by the \"Batch Size\" configuration property. This value should be set to a reasonable size to ensure that MongoDB is not overloaded with too many operations at once.")
@ReadsAttribute(attribute="mongodb.update.mode", description="Configurable parameter for controlling update mode on a per-flowfile basis. Acceptable values are 'one' and 'many' and controls whether a single incoming record should update a single or multiple Mongo documents.")
public class PutMongoRecord
extends AbstractMongoProcessor {
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor INSERT_COUNT = new PropertyDescriptor.Builder().name("Batch Size").description("The number of records to group together for one single insert/upsert operation against MongoDB.").defaultValue("100").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder().name("Ordered").description("Perform ordered or unordered operations").allowableValues(new String[]{"True", "False"}).defaultValue("False").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    static final PropertyDescriptor BYPASS_VALIDATION = new PropertyDescriptor.Builder().name("Bypass Validation").description("        Enable or disable bypassing document schema validation during insert or update operations.\n        Bypassing document validation is a Privilege Action in MongoDB.\n        Enabling this property can result in authorization errors for users with limited privileges.\n").allowableValues(new String[]{"True", "False"}).defaultValue("False").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    static final PropertyDescriptor UPDATE_KEY_FIELDS = new PropertyDescriptor.Builder().name("Update Key Fields").description("Comma separated list of fields based on which to identify documents that need to be updated. If this property is set NiFi will attempt an upsert operation on all documents. If this property is not set all documents will be inserted.").required(false).addValidator(StandardValidators.createListValidator((boolean)true, (boolean)false, (Validator)StandardValidators.NON_EMPTY_VALIDATOR)).build();
    static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder().name("Update Mode").dependsOn(UPDATE_KEY_FIELDS, new AllowableValue[0]).description("Choose between updating a single document or multiple documents per incoming record.").allowableValues(AbstractMongoProcessor.UpdateMethod.class).defaultValue((DescribedValue)AbstractMongoProcessor.UpdateMethod.UPDATE_ONE).build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(PutMongoRecord.getCommonPropertyDescriptors().stream(), Stream.of(RECORD_READER_FACTORY, INSERT_COUNT, ORDERED, BYPASS_VALIDATION, UPDATE_KEY_FIELDS, UPDATE_MODE)).toList();

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        RecordReaderFactory recordParserFactory = (RecordReaderFactory)context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
        WriteConcern writeConcern = this.clientService.getWriteConcern();
        int ceiling = context.getProperty(INSERT_COUNT).asInteger();
        int written = 0;
        boolean error = false;
        boolean ordered = context.getProperty(ORDERED).asBoolean();
        boolean bypass = context.getProperty(BYPASS_VALIDATION).asBoolean();
        LinkedHashMap<String, List<String>> updateKeyFieldPathToFieldChain = new LinkedHashMap<String, List<String>>();
        if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
            Arrays.stream(context.getProperty(UPDATE_KEY_FIELDS).getValue().split("\\s*,\\s*")).forEach(updateKeyField -> updateKeyFieldPathToFieldChain.put((String)updateKeyField, Arrays.asList(updateKeyField.split("\\."))));
        }
        BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
        bulkWriteOptions.ordered(ordered);
        bulkWriteOptions.bypassDocumentValidation(Boolean.valueOf(bypass));
        try {
            try (InputStream inStream = session.read(flowFile);
                 RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, this.getLogger());){
                Record record;
                RecordSchema schema = reader.getSchema();
                MongoCollection collection = this.getCollection(context, flowFile).withWriteConcern(writeConcern);
                ArrayList<InsertOneModel> writeModels = new ArrayList<InsertOneModel>();
                while ((record = reader.nextRecord()) != null) {
                    InsertOneModel writeModel;
                    Map contentMap = (Map)DataTypeUtils.convertRecordFieldtoObject((Object)record, (DataType)RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
                    Document document = new Document();
                    for (String name : schema.getFieldNames()) {
                        document.put(name, contentMap.get(name));
                    }
                    Document readyToUpsert = this.convertArrays(document);
                    if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
                        Bson[] filters = this.buildFilters(updateKeyFieldPathToFieldChain, readyToUpsert);
                        AbstractMongoProcessor.UpdateMethod mongoUpdateMode = (AbstractMongoProcessor.UpdateMethod)context.getProperty(UPDATE_MODE).asAllowableValue(AbstractMongoProcessor.UpdateMethod.class);
                        if (this.updateModeMatches(AbstractMongoProcessor.UpdateMethod.UPDATE_ONE, mongoUpdateMode, flowFile)) {
                            writeModel = new UpdateOneModel(Filters.and((Bson[])filters), (Bson)new Document("$set", (Object)readyToUpsert), new UpdateOptions().upsert(true));
                        } else {
                            if (!this.updateModeMatches(AbstractMongoProcessor.UpdateMethod.UPDATE_MANY, mongoUpdateMode, flowFile)) {
                                String flowfileUpdateMode = flowFile.getAttribute("mongodb.update.mode");
                                throw new ProcessException("Unrecognized 'mongodb.update.mode' value '" + flowfileUpdateMode + "'");
                            }
                            writeModel = new UpdateManyModel(Filters.and((Bson[])filters), (Bson)new Document("$set", (Object)readyToUpsert), new UpdateOptions().upsert(true));
                        }
                    } else {
                        writeModel = new InsertOneModel((Object)readyToUpsert);
                    }
                    writeModels.add(writeModel);
                    if (writeModels.size() != ceiling) continue;
                    collection.bulkWrite(writeModels, bulkWriteOptions);
                    written += writeModels.size();
                    writeModels = new ArrayList();
                }
                if (!writeModels.isEmpty()) {
                    collection.bulkWrite(writeModels, bulkWriteOptions);
                }
            }
            if (error) return;
        }
        catch (MongoException | IOException | ProcessException | SchemaNotFoundException | MalformedRecordException e) {
            try {
                this.getLogger().error("PutMongoRecord failed with error:", e);
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
            catch (Throwable throwable) {
                if (error) throw throwable;
                session.getProvenanceReporter().send(flowFile, this.clientService.getURI(), String.format("Written %d documents to MongoDB.", written));
                session.transfer(flowFile, REL_SUCCESS);
                this.getLogger().info("Written {} records into MongoDB", new Object[]{written});
                throw throwable;
            }
        }
        session.getProvenanceReporter().send(flowFile, this.clientService.getURI(), String.format("Written %d documents to MongoDB.", written));
        session.transfer(flowFile, REL_SUCCESS);
        this.getLogger().info("Written {} records into MongoDB", new Object[]{written});
    }

    @Override
    public void migrateProperties(PropertyConfiguration config) {
        super.migrateProperties(config);
        config.renameProperty("record-reader", RECORD_READER_FACTORY.getName());
        config.renameProperty("insert_count", INSERT_COUNT.getName());
        config.renameProperty("ordered", ORDERED.getName());
        config.renameProperty("bypass-validation", BYPASS_VALIDATION.getName());
        config.renameProperty("update-key-fields", UPDATE_KEY_FIELDS.getName());
        config.renameProperty("update-mode", UPDATE_MODE.getName());
    }

    private Document convertArrays(Document doc) {
        Document retVal = new Document();
        for (Map.Entry entry : doc.entrySet()) {
            if (entry.getValue() != null && entry.getValue().getClass().isArray()) {
                retVal.put((String)entry.getKey(), this.convertArrays((Object[])entry.getValue()));
                continue;
            }
            if (entry.getValue() != null && (entry.getValue() instanceof Map || entry.getValue() instanceof Document)) {
                retVal.put((String)entry.getKey(), (Object)this.convertArrays(new Document((Map)entry.getValue())));
                continue;
            }
            retVal.put((String)entry.getKey(), entry.getValue());
        }
        return retVal;
    }

    private List<Object> convertArrays(Object[] input) {
        ArrayList<Object> retVal = new ArrayList<Object>();
        for (Object o : input) {
            if (o != null && o.getClass().isArray()) {
                retVal.add(this.convertArrays((Object[])o));
                continue;
            }
            if (o instanceof Map) {
                retVal.add(this.convertArrays(new Document((Map)o)));
                continue;
            }
            retVal.add(o);
        }
        return retVal;
    }

    private Bson[] buildFilters(Map<String, List<String>> updateKeyFieldPathToFieldChain, Document readyToUpsert) {
        Bson[] filters = (Bson[])updateKeyFieldPathToFieldChain.entrySet().stream().map(updateKeyFieldPath__fieldChain -> {
            String fieldPath = (String)updateKeyFieldPath__fieldChain.getKey();
            List fieldChain = (List)updateKeyFieldPath__fieldChain.getValue();
            Object value = readyToUpsert;
            String previousField = null;
            for (String field : fieldChain) {
                if (!(value instanceof Map)) {
                    throw new ProcessException("field '" + previousField + "' (from field expression '" + fieldPath + "') is not an embedded document");
                }
                if ((value = ((Map)value).get(field)) == null) {
                    throw new ProcessException("field '" + field + "' (from field expression '" + fieldPath + "') has no value");
                }
                previousField = field;
            }
            Bson filter = Filters.eq((String)fieldPath, (Object)value);
            return filter;
        }).toArray(Bson[]::new);
        return filters;
    }
}

