/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.mongodb;

import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mongodb.AbstractMongoProcessor;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.bson.Document;

@EventDriven
@Tags(value={"mongodb", "insert", "record", "put"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="This processor is a record-aware processor for inserting data into MongoDB. It uses a configured record reader and schema to read an incoming record set from the body of a flowfile and then inserts batches of those records into a configured MongoDB collection. This processor does not support updates, deletes or upserts. The number of documents to insert at a time is controlled by the \"Insert Batch Size\" configuration property. This value should be set to a reasonable size to ensure that MongoDB is not overloaded with too many inserts at once.")
public class PutMongoRecord
extends AbstractMongoProcessor {
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor INSERT_COUNT = new PropertyDescriptor.Builder().name("insert_count").displayName("Insert Batch Size").description("The number of records to group together for one single insert operation against MongoDB.").defaultValue("100").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        block34: {
            String url;
            FlowFile flowFile = session.get();
            if (flowFile == null) {
                return;
            }
            RecordReaderFactory recordParserFactory = (RecordReaderFactory)context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
            WriteConcern writeConcern = this.getWriteConcern(context);
            ArrayList<Document> inserts = new ArrayList<Document>();
            int ceiling = context.getProperty(INSERT_COUNT).asInteger();
            int added = 0;
            boolean error = false;
            try {
                try (InputStream inStream = session.read(flowFile);
                     RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, this.getLogger());){
                    Record record;
                    MongoCollection collection = this.getCollection(context, flowFile).withWriteConcern(writeConcern);
                    RecordSchema schema = reader.getSchema();
                    while ((record = reader.nextRecord()) != null) {
                        Map contentMap = (Map)DataTypeUtils.convertRecordFieldtoObject((Object)record, (DataType)RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
                        Document document = new Document();
                        for (String name : schema.getFieldNames()) {
                            document.put(name, contentMap.get(name));
                        }
                        inserts.add(this.convertArrays(document));
                        if (inserts.size() != ceiling) continue;
                        collection.insertMany(inserts);
                        added += inserts.size();
                        inserts = new ArrayList();
                    }
                    if (inserts.size() > 0) {
                        collection.insertMany(inserts);
                    }
                }
                if (error) break block34;
                url = this.clientService != null ? this.clientService.getURI() : context.getProperty(URI).evaluateAttributeExpressions().getValue();
            }
            catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
                String url2;
                try {
                    this.getLogger().error("PutMongoRecord failed with error:", e);
                    session.transfer(flowFile, REL_FAILURE);
                    error = true;
                    if (error) break block34;
                    url2 = this.clientService != null ? this.clientService.getURI() : context.getProperty(URI).evaluateAttributeExpressions().getValue();
                }
                catch (Throwable throwable) {
                    if (!error) {
                        String url3 = this.clientService != null ? this.clientService.getURI() : context.getProperty(URI).evaluateAttributeExpressions().getValue();
                        session.getProvenanceReporter().send(flowFile, url3, String.format("Added %d documents to MongoDB.", added));
                        session.transfer(flowFile, REL_SUCCESS);
                        this.getLogger().info("Inserted {} records into MongoDB", new Object[]{added});
                    }
                    throw throwable;
                }
                session.getProvenanceReporter().send(flowFile, url2, String.format("Added %d documents to MongoDB.", added));
                session.transfer(flowFile, REL_SUCCESS);
                this.getLogger().info("Inserted {} records into MongoDB", new Object[]{added});
            }
            session.getProvenanceReporter().send(flowFile, url, String.format("Added %d documents to MongoDB.", added));
            session.transfer(flowFile, REL_SUCCESS);
            this.getLogger().info("Inserted {} records into MongoDB", new Object[]{added});
        }
        session.commit();
    }

    private Document convertArrays(Document doc) {
        Document retVal = new Document();
        for (Map.Entry entry : doc.entrySet()) {
            if (entry.getValue() != null && entry.getValue().getClass().isArray()) {
                retVal.put((String)entry.getKey(), (Object)this.convertArrays((Object[])entry.getValue()));
                continue;
            }
            if (entry.getValue() != null && (entry.getValue() instanceof Map || entry.getValue() instanceof Document)) {
                retVal.put((String)entry.getKey(), (Object)this.convertArrays(new Document((Map)entry.getValue())));
                continue;
            }
            retVal.put((String)entry.getKey(), entry.getValue());
        }
        return retVal;
    }

    private List convertArrays(Object[] input) {
        ArrayList<Object> retVal = new ArrayList<Object>();
        for (Object o : input) {
            if (o != null && o.getClass().isArray()) {
                retVal.add(this.convertArrays((Object[])o));
                continue;
            }
            if (o instanceof Map) {
                retVal.add(this.convertArrays(new Document((Map)o)));
                continue;
            }
            retVal.add(o);
        }
        return retVal;
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.addAll(descriptors);
        _propertyDescriptors.add(WRITE_CONCERN);
        _propertyDescriptors.add(RECORD_READER_FACTORY);
        _propertyDescriptors.add(INSERT_COUNT);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

