/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.data.explorer;

import java.io.InputStream;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.services.azure.data.explorer.KustoQueryResponse;
import org.apache.nifi.services.azure.data.explorer.KustoQueryService;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"Azure", "Data", "Explorer", "ADX", "Kusto"})
@CapabilityDescription(value="Query Azure Data Explorer and stream JSON results to output FlowFiles")
@WritesAttributes(value={@WritesAttribute(attribute="query.error.message", description="Azure Data Explorer query error message on failures"), @WritesAttribute(attribute="query.executed", description="Azure Data Explorer query executed"), @WritesAttribute(attribute="mime.type", description="Content Type set to application/json")})
public class QueryAzureDataExplorer
extends AbstractProcessor {
    public static final String QUERY_ERROR_MESSAGE = "query.error.message";
    public static final String QUERY_EXECUTED = "query.executed";
    public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("FlowFiles containing results of a successful Query").build();
    public static final Relationship FAILURE = new Relationship.Builder().name("failure").description("FlowFiles containing original input associated with a failed Query").build();
    public static final PropertyDescriptor KUSTO_QUERY_SERVICE = new PropertyDescriptor.Builder().name("Kusto Query Service").displayName("Kusto Query Service").description("Azure Data Explorer Kusto Query Service").required(true).identifiesControllerService(KustoQueryService.class).build();
    public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder().name("Database Name").displayName("Database Name").description("Azure Data Explorer Database Name for querying").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().name("Query").displayName("Query").description("Query to be run against Azure Data Explorer").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    protected static final String APPLICATION_JSON = "application/json";
    private static final Set<Relationship> RELATIONSHIPS = Set.of(SUCCESS, FAILURE);
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_QUERY_SERVICE, DATABASE_NAME, QUERY);
    private volatile KustoQueryService service;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.service = (KustoQueryService)context.getProperty(KUSTO_QUERY_SERVICE).asControllerService(KustoQueryService.class);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        block10: {
            FlowFile flowFile = session.get();
            if (flowFile == null) {
                return;
            }
            String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
            String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
            try {
                flowFile = session.putAttribute(flowFile, QUERY_EXECUTED, query);
                KustoQueryResponse kustoQueryResponse = this.executeQuery(databaseName, query);
                if (kustoQueryResponse.isError()) {
                    this.getLogger().error("Query failed: {}", new Object[]{kustoQueryResponse.getErrorMessage()});
                    flowFile = session.putAttribute(flowFile, QUERY_ERROR_MESSAGE, kustoQueryResponse.getErrorMessage());
                    session.transfer(flowFile, FAILURE);
                    break block10;
                }
                try (InputStream responseStream = kustoQueryResponse.getResponseStream();){
                    flowFile = session.importFrom(responseStream, flowFile);
                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
                    session.transfer(flowFile, SUCCESS);
                }
            }
            catch (Exception e) {
                this.getLogger().error("Query failed", (Throwable)e);
                flowFile = session.putAttribute(flowFile, QUERY_ERROR_MESSAGE, e.getMessage());
                session.transfer(flowFile, FAILURE);
            }
        }
    }

    protected KustoQueryResponse executeQuery(String databaseName, String adxQuery) {
        return this.service.executeQuery(databaseName, adxQuery);
    }
}

