/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.couchbase;

import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.couchbase.CouchbaseAttributes;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor;
import org.apache.nifi.processors.couchbase.DocumentType;
import org.apache.nifi.stream.io.StreamUtils;

@Tags(value={"nosql", "couchbase", "database", "put"})
@CapabilityDescription(value="Put a document to Couchbase Server via Key/Value access.")
@SeeAlso(value={CouchbaseClusterControllerService.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@ReadsAttributes(value={@ReadsAttribute(attribute="uuid", description="Used as a document id if 'Document Id' is not specified")})
@WritesAttributes(value={@WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."), @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."), @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."), @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")})
public class PutCouchbaseKey
extends AbstractCouchbaseProcessor {
    public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder().name("Persist To").description("Durability constraint about disk persistence.").required(true).allowableValues((Enum[])PersistTo.values()).defaultValue(PersistTo.NONE.toString()).build();
    public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder().name("Replicate To").description("Durability constraint about replication.").required(true).allowableValues((Enum[])ReplicateTo.values()).defaultValue(ReplicateTo.NONE.toString()).build();

    @Override
    protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
        descriptors.add(DOCUMENT_TYPE);
        descriptors.add(DOC_ID);
        descriptors.add(PERSIST_TO);
        descriptors.add(REPLICATE_TO);
    }

    @Override
    protected void addSupportedRelationships(Set<Relationship> relationships) {
        relationships.add(REL_SUCCESS);
        relationships.add(REL_RETRY);
        relationships.add(REL_FAILURE);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        ProcessorLog logger = this.getLogger();
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        final byte[] content = new byte[(int)flowFile.getSize()];
        session.read(flowFile, new InputStreamCallback(){

            public void process(InputStream in) throws IOException {
                StreamUtils.fillBuffer((InputStream)in, (byte[])content, (boolean)true);
            }
        });
        String docId = flowFile.getAttribute(CoreAttributes.UUID.key());
        if (!StringUtils.isEmpty((CharSequence)context.getProperty(DOC_ID).getValue())) {
            docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
        }
        try {
            BinaryDocument doc = null;
            DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
            switch (documentType) {
                case Json: {
                    doc = RawJsonDocument.create((String)docId, (String)new String(content, StandardCharsets.UTF_8));
                    break;
                }
                case Binary: {
                    ByteBuf buf = Unpooled.copiedBuffer((byte[])content);
                    doc = BinaryDocument.create((String)docId, (ByteBuf)buf);
                    break;
                }
            }
            PersistTo persistTo = PersistTo.valueOf((String)context.getProperty(PERSIST_TO).getValue());
            ReplicateTo replicateTo = ReplicateTo.valueOf((String)context.getProperty(REPLICATE_TO).getValue());
            doc = this.openBucket(context).upsert((Document)doc, persistTo, replicateTo);
            HashMap<String, String> updatedAttrs = new HashMap<String, String>();
            updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
            updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
            updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
            updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
            updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
            flowFile = session.putAllAttributes(flowFile, updatedAttrs);
            session.getProvenanceReporter().send(flowFile, this.getTransitUrl(context, docId));
            session.transfer(flowFile, REL_SUCCESS);
        }
        catch (CouchbaseException e) {
            String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", new Object[]{docId, flowFile, e});
            this.handleCouchbaseException(context, session, logger, flowFile, e, errMsg);
        }
    }
}

