/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.eventhub;

import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.core.credential.TokenCredential;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.SendOptions;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@SupportsBatching
@Tags(value={"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Send FlowFile contents to Azure Event Hubs")
@SystemResourceConsideration(resource=SystemResource.MEMORY, description="The Processor buffers FlowFile contents in memory before sending")
public class PutAzureEventHub
extends AbstractProcessor
implements AzureEventHubComponent {
    private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s%s/%s";
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("Name of Azure Event Hubs destination").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("Namespace of Azure Event Hubs prefixed to Service Bus Endpoint domain").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the shared access policy. This policy must have Send claims.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
    static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
    static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("partitioning-key-attribute-name").displayName("Partitioning Key Attribute Name").description("If specified, the value from argument named by this field will be used as a partitioning key to be used by event hub.").required(false).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR).build();
    static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder().name("max-batch-size").displayName("Maximum Batch Size").description("Maximum number of FlowFiles processed for each Processor invocation").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.NUMBER_VALIDATOR).defaultValue("100").build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully sent to the event hubs will be transferred to this Relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that could not be sent to the event hub will be transferred to this Relationship.").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(NAMESPACE, EVENT_HUB_NAME, SERVICE_BUS_ENDPOINT, TRANSPORT_TYPE, ACCESS_POLICY, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, PARTITIONING_KEY_ATTRIBUTE_NAME, MAX_BATCH_SIZE, PROXY_CONFIGURATION_SERVICE);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private EventHubProducerClient eventHubProducerClient;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnScheduled
    public final void createClient(ProcessContext context) {
        this.eventHubProducerClient = this.createEventHubProducerClient(context);
    }

    @OnStopped
    public void closeClient() {
        if (this.eventHubProducerClient == null) {
            this.getLogger().info("Azure Event Hub Producer Client not configured");
        } else {
            this.eventHubProducerClient.close();
        }
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        StopWatch stopWatch = new StopWatch(true);
        String partitioningKeyAttributeName = context.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue();
        int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
        List flowFileBatch = session.get(maxBatchSize);
        ArrayList<FlowFileResultCarrier<Relationship>> flowFileResults = new ArrayList<FlowFileResultCarrier<Relationship>>();
        for (FlowFile flowFile : flowFileBatch) {
            FlowFileResultCarrier<Relationship> flowFileResult = this.handleFlowFile(flowFile, partitioningKeyAttributeName, session);
            flowFileResults.add(flowFileResult);
        }
        this.processFlowFileResults(context, session, stopWatch, flowFileResults);
    }

    protected EventHubProducerClient createEventHubProducerClient(ProcessContext context) throws ProcessException {
        boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
        String namespace = context.getProperty(NAMESPACE).getValue();
        String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
        String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
        AmqpTransportType transportType = ((AzureEventHubTransportType)context.getProperty(TRANSPORT_TYPE).asAllowableValue(AzureEventHubTransportType.class)).asAmqpTransportType();
        try {
            EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
            eventHubClientBuilder.transportType(transportType);
            String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
            if (useManagedIdentity) {
                ManagedIdentityCredentialBuilder managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
                ManagedIdentityCredential managedIdentityCredential = managedIdentityCredentialBuilder.build();
                eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, (TokenCredential)managedIdentityCredential);
            } else {
                String policyName = context.getProperty(ACCESS_POLICY).getValue();
                String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
                AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
                eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
            }
            AzureEventHubUtils.getProxyOptions((PropertyContext)context).ifPresent(arg_0 -> ((EventHubClientBuilder)eventHubClientBuilder).proxyOptions(arg_0));
            return eventHubClientBuilder.buildProducerClient();
        }
        catch (Exception e) {
            throw new ProcessException("EventHubClient creation failed", (Throwable)e);
        }
    }

    private void processFlowFileResults(ProcessContext context, ProcessSession session, StopWatch stopWatch, List<FlowFileResultCarrier<Relationship>> flowFileResults) {
        try {
            for (FlowFileResultCarrier<Relationship> flowFileResult : flowFileResults) {
                FlowFile flowFile = flowFileResult.flowFile();
                if (flowFileResult.result() == REL_SUCCESS) {
                    String namespace = context.getProperty(NAMESPACE).getValue();
                    String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
                    String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
                    String transitUri = String.format(TRANSIT_URI_FORMAT_STRING, namespace, serviceBusEndpoint, eventHubName);
                    session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                    session.transfer(flowFile, REL_SUCCESS);
                    continue;
                }
                Throwable processException = flowFileResult.exception();
                this.getLogger().error("Send failed {}", new Object[]{flowFile, processException});
                session.transfer(session.penalize(flowFile), REL_FAILURE);
            }
        }
        catch (Exception e) {
            session.rollback();
            this.getLogger().error("FlowFile Batch Size [{}] processing failed", new Object[]{flowFileResults.size()});
        }
    }

    private FlowFileResultCarrier<Relationship> handleFlowFile(FlowFile flowFile, String partitioningKeyAttributeName, ProcessSession session) {
        byte[] buffer = new byte[(int)flowFile.getSize()];
        session.read(flowFile, in -> StreamUtils.fillBuffer((InputStream)in, (byte[])buffer));
        String partitioningKey = StringUtils.isNotBlank((CharSequence)partitioningKeyAttributeName) ? flowFile.getAttribute(partitioningKeyAttributeName) : null;
        Map attributes = flowFile.getAttributes();
        Map userProperties = attributes == null ? Collections.emptyMap() : attributes;
        try {
            this.sendMessage(buffer, partitioningKey, userProperties);
            return new FlowFileResultCarrier<Relationship>(flowFile, REL_SUCCESS);
        }
        catch (Exception processException) {
            return new FlowFileResultCarrier<Relationship>(flowFile, REL_FAILURE, processException);
        }
    }

    private void sendMessage(byte[] buffer, String partitioningKey, Map<String, ?> userProperties) {
        EventData eventData = new EventData(buffer);
        eventData.getProperties().putAll(userProperties);
        SendOptions sendOptions = new SendOptions();
        if (StringUtils.isNotBlank((CharSequence)partitioningKey)) {
            sendOptions.setPartitionKey(partitioningKey);
        }
        this.eventHubProducerClient.send(Collections.singleton(eventData), sendOptions);
    }
}

