/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.graph;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.graph.GraphClientService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.graph.AbstractGraphExecutor;
import org.apache.nifi.util.StringUtils;

@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@SupportsBatching
@Tags(value={"cypher", "neo4j", "graph", "network", "insert", "update", "delete", "put", "get", "node", "relationship", "connection", "executor", "gremlin", "tinkerpop"})
@CapabilityDescription(value="This processor is designed to execute queries in either the Cypher query language or the Tinkerpop Gremlin DSL. It delegates most of the logic to a configured client service that handles the interaction with the remote data source. All of the output is written out as JSON data.")
@WritesAttributes(value={@WritesAttribute(attribute="graph.error.message", description="GraphDB error message"), @WritesAttribute(attribute="graph.labels.added", description="Number of labels added"), @WritesAttribute(attribute="graph.nodes.created", description="Number of nodes created"), @WritesAttribute(attribute="graph.nodes.deleted", description="Number of nodes deleted"), @WritesAttribute(attribute="graph.properties.set", description="Number of properties set"), @WritesAttribute(attribute="graph.relations.created", description="Number of relationships created"), @WritesAttribute(attribute="graph.relations.deleted", description="Number of relationships deleted"), @WritesAttribute(attribute="graph.rows.returned", description="Number of rows returned"), @WritesAttribute(attribute="query.took", description="The amount of time in milliseconds that the querytook to execute.")})
public class ExecuteGraphQuery
extends AbstractGraphExecutor {
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_ORIGINAL, REL_FAILURE);
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(CLIENT_SERVICE, QUERY);
    public static final String EXECUTION_TIME = "query.took";
    protected ObjectMapper mapper = new ObjectMapper();
    private volatile GraphClientService clientService;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
        this.clientService = (GraphClientService)context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        FlowFile output = flowFile != null ? session.create(flowFile) : session.create();
        try (OutputStream os = session.write(output);){
            String query = this.getQuery(context, session, flowFile);
            long startTimeMillis = System.currentTimeMillis();
            os.write("[".getBytes());
            Map resultAttrs = this.clientService.executeQuery(query, this.getParameters(context, output), (record, hasMore) -> {
                try {
                    String obj = this.mapper.writeValueAsString((Object)record);
                    os.write(obj.getBytes());
                    if (hasMore) {
                        os.write(",".getBytes());
                    }
                }
                catch (Exception ex) {
                    throw new ProcessException((Throwable)ex);
                }
            });
            os.write("]".getBytes());
            os.close();
            long endTimeMillis = System.currentTimeMillis();
            String executionTime = String.valueOf(endTimeMillis - startTimeMillis);
            resultAttrs.put(EXECUTION_TIME, executionTime);
            resultAttrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
            output = session.putAllAttributes(output, resultAttrs);
            session.transfer(output, REL_SUCCESS);
            session.getProvenanceReporter().invokeRemoteProcess(output, this.clientService.getTransitUrl(), String.format("The following query was executed in %s milliseconds: \"%s\"", executionTime, query));
            if (flowFile != null) {
                session.transfer(flowFile, REL_ORIGINAL);
            }
        }
        catch (Exception exception) {
            this.getLogger().error("Failed to execute graph statement due to {}", new Object[]{exception.getLocalizedMessage(), exception});
            session.remove(output);
            if (flowFile != null) {
                flowFile = session.putAttribute(flowFile, "graph.error.message", String.valueOf(exception.getMessage()));
                session.transfer(flowFile, REL_FAILURE);
            }
            context.yield();
        }
    }

    protected String getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
        String query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
        if (StringUtils.isEmpty((String)query) && input != null) {
            try {
                if (input.getSize() > 65536L) {
                    throw new Exception("Input bigger than 64kb. Cannot assume this is a valid query for Gremlin Server or Neo4J.");
                }
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                session.exportTo(input, (OutputStream)out);
                out.close();
                query = new String(out.toByteArray());
            }
            catch (Exception ex) {
                throw new ProcessException((Throwable)ex);
            }
        }
        return query;
    }
}

