/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.amqp.processors;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import java.io.InputStream;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.nifi.amqp.processors.AMQPException;
import org.apache.nifi.amqp.processors.AMQPPublisher;
import org.apache.nifi.amqp.processors.AMQPRollbackException;
import org.apache.nifi.amqp.processors.AbstractAMQPProcessor;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;

@Tags(value={"amqp", "rabbit", "put", "message", "send", "publish"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Creates an AMQP Message from the contents of a FlowFile and sends the message to an AMQP Exchange. In a typical AMQP exchange model, the message that is sent to the AMQP Exchange will be routed based on the 'Routing Key' to its final destination in the queue (the binding). If due to some misconfiguration the binding between the Exchange, Routing Key and Queue is not set up, the message will have no final destination and will return (i.e., the data will not make it to the queue). If that happens you will see a log in both app-log and bulletin stating to that effect, and the FlowFile will be routed to the 'failure' relationship.")
@SystemResourceConsideration(resource=SystemResource.MEMORY)
@ReadsAttributes(value={@ReadsAttribute(attribute="amqp$appId", description="The App ID field to set on the AMQP Message"), @ReadsAttribute(attribute="amqp$contentEncoding", description="The Content Encoding to set on the AMQP Message"), @ReadsAttribute(attribute="amqp$contentType", description="The Content Type to set on the AMQP Message"), @ReadsAttribute(attribute="amqp$headers", description="The headers to set on the AMQP Message, if 'Header Source' is set to use it. See additional details of the processor."), @ReadsAttribute(attribute="amqp$deliveryMode", description="The numeric indicator for the Message's Delivery Mode"), @ReadsAttribute(attribute="amqp$priority", description="The Message priority"), @ReadsAttribute(attribute="amqp$correlationId", description="The Message's Correlation ID"), @ReadsAttribute(attribute="amqp$replyTo", description="The value of the Message's Reply-To field"), @ReadsAttribute(attribute="amqp$expiration", description="The Message Expiration"), @ReadsAttribute(attribute="amqp$messageId", description="The unique ID of the Message"), @ReadsAttribute(attribute="amqp$timestamp", description="The timestamp of the Message, as the number of milliseconds since epoch"), @ReadsAttribute(attribute="amqp$type", description="The type of message"), @ReadsAttribute(attribute="amqp$userId", description="The ID of the user"), @ReadsAttribute(attribute="amqp$clusterId", description="The ID of the AMQP Cluster")})
public class PublishAMQP
extends AbstractAMQPProcessor<AMQPPublisher> {
    public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder().name("Exchange Name").description("The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.").required(true).defaultValue("").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder().name("Routing Key").description("The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). Usually provided by the administrator (e.g., 'myKey')In the event when messages are sent to a default exchange this property corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set (usually by the AMQP administrator)").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder().name("Headers Source").description("The source of the headers which will be applied to the published message.").required(true).allowableValues(InputHeaderSource.class).defaultValue((DescribedValue)InputHeaderSource.AMQP_HEADERS_ATTRIBUTE).build();
    public static final PropertyDescriptor HEADERS_PATTERN = new PropertyDescriptor.Builder().name("Headers Pattern").description("Regular expression that will be evaluated against the FlowFile attributes to select the matching attributes and put as AMQP headers. Attribute name will be used as header key.").required(true).dependsOn(HEADERS_SOURCE, (DescribedValue)InputHeaderSource.FLOWFILE_ATTRIBUTES, new DescribedValue[0]).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.createRegexValidator((int)0, (int)Integer.MAX_VALUE, (boolean)true)).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder().name("Header Separator").description("The character that is used to split key-value for headers. The value must only one character. Otherwise you will get an error message").defaultValue(",").dependsOn(HEADERS_SOURCE, (DescribedValue)InputHeaderSource.AMQP_HEADERS_ATTRIBUTE, new DescribedValue[0]).addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR).required(false).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are sent to the AMQP destination are routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be routed to the AMQP destination are routed to this relationship").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(Stream.of(EXCHANGE, ROUTING_KEY, HEADERS_SOURCE, HEADERS_PATTERN, HEADER_SEPARATOR), PublishAMQP.getCommonPropertyDescriptors().stream()).toList();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);

    @Override
    protected void processResource(Connection connection, AMQPPublisher publisher, ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
        if (routingKey == null) {
            throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '" + String.valueOf(context.getProperty(ROUTING_KEY)) + "' after evaluating it as expression against incoming FlowFile.");
        }
        InputHeaderSource selectedHeaderSource = (InputHeaderSource)context.getProperty(HEADERS_SOURCE).asAllowableValue(InputHeaderSource.class);
        Pattern pattern = this.getPattern(context, selectedHeaderSource);
        Character headerSeparator = this.getHeaderSeparator(context, selectedHeaderSource);
        AMQP.BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile, selectedHeaderSource, headerSeparator, pattern);
        String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
        byte[] messageContent = this.extractMessage(flowFile, session);
        try {
            publisher.publish(messageContent, amqpProperties, routingKey, exchange);
        }
        catch (AMQPRollbackException e) {
            session.rollback();
            throw e;
        }
        catch (AMQPException e) {
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            throw e;
        }
        session.transfer(flowFile, REL_SUCCESS);
        session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @Override
    protected AMQPPublisher createAMQPWorker(ProcessContext context, Connection connection) {
        return new AMQPPublisher(connection, this.getLogger());
    }

    @Override
    public void migrateProperties(PropertyConfiguration config) {
        super.migrateProperties(config);
        config.renameProperty("header.separator", HEADER_SEPARATOR.getName());
    }

    private byte[] extractMessage(FlowFile flowFile, ProcessSession session) {
        byte[] messageContent = new byte[(int)flowFile.getSize()];
        session.read(flowFile, in -> StreamUtils.fillBuffer((InputStream)in, (byte[])messageContent, (boolean)true));
        return messageContent;
    }

    private void readAmqpAttribute(FlowFile flowFile, String attributeKey, Consumer<String> updater) {
        String attributeValue = flowFile.getAttribute(attributeKey);
        if (attributeValue == null) {
            return;
        }
        try {
            updater.accept(attributeValue);
        }
        catch (Exception e) {
            this.getLogger().warn("Failed to update AMQP Message Property [{}]", new Object[]{attributeKey, e});
        }
    }

    private AMQP.BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile, InputHeaderSource selectedHeaderSource, Character separator, Pattern pattern) {
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        this.readAmqpAttribute(flowFile, "amqp$contentType", arg_0 -> ((AMQP.BasicProperties.Builder)builder).contentType(arg_0));
        this.readAmqpAttribute(flowFile, "amqp$contentEncoding", arg_0 -> ((AMQP.BasicProperties.Builder)builder).contentEncoding(arg_0));
        this.readAmqpAttribute(flowFile, "amqp$deliveryMode", mode -> builder.deliveryMode(Integer.valueOf(Integer.parseInt(mode))));
        this.readAmqpAttribute(flowFile, "amqp$priority", pri -> builder.priority(Integer.valueOf(Integer.parseInt(pri))));
        this.readAmqpAttribute(flowFile, "amqp$correlationId", arg_0 -> ((AMQP.BasicProperties.Builder)builder).correlationId(arg_0));
        this.readAmqpAttribute(flowFile, "amqp$replyTo", arg_0 -> ((AMQP.BasicProperties.Builder)builder).replyTo(arg_0));
        this.readAmqpAttribute(flowFile, "amqp$expiration", arg_0 -> ((AMQP.BasicProperties.Builder)builder).expiration(arg_0));
        this.readAmqpAttribute(flowFile, "amqp$messageId", arg_0 -> ((AMQP.BasicProperties.Builder)builder).messageId(arg_0));
        this.readAmqpAttribute(flowFile, "amqp$timestamp", ts -> builder.timestamp(new Date(Long.parseLong(ts))));
        this.readAmqpAttribute(flowFile, "amqp$type", arg_0 -> ((AMQP.BasicProperties.Builder)builder).type(arg_0));
        this.readAmqpAttribute(flowFile, "amqp$userId", arg_0 -> ((AMQP.BasicProperties.Builder)builder).userId(arg_0));
        this.readAmqpAttribute(flowFile, "amqp$appId", arg_0 -> ((AMQP.BasicProperties.Builder)builder).appId(arg_0));
        this.readAmqpAttribute(flowFile, "amqp$clusterId", arg_0 -> ((AMQP.BasicProperties.Builder)builder).clusterId(arg_0));
        Map<String, Object> headers = this.prepareAMQPHeaders(flowFile, selectedHeaderSource, separator, pattern);
        builder.headers(headers);
        return builder.build();
    }

    private Map<String, Object> prepareAMQPHeaders(FlowFile flowFile, InputHeaderSource selectedHeaderSource, Character headerSeparator, Pattern pattern) {
        HashMap<String, Object> headers = new HashMap<String, Object>();
        if (InputHeaderSource.FLOWFILE_ATTRIBUTES.equals((Object)selectedHeaderSource)) {
            headers.putAll(this.getMatchedAttributes(flowFile.getAttributes(), pattern));
        } else if (InputHeaderSource.AMQP_HEADERS_ATTRIBUTE.equals((Object)selectedHeaderSource)) {
            this.readAmqpAttribute(flowFile, "amqp$headers", value -> headers.putAll(this.validateAMQPHeaderProperty((String)value, headerSeparator)));
        }
        return headers;
    }

    private Map<String, String> getMatchedAttributes(Map<String, String> attributes, Pattern pattern) {
        HashMap<String, String> headers = new HashMap<String, String>();
        for (Map.Entry<String, String> attributeEntry : attributes.entrySet()) {
            if (!pattern.matcher(attributeEntry.getKey()).matches()) continue;
            headers.put(attributeEntry.getKey(), attributeEntry.getValue());
        }
        return headers;
    }

    private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue, Character splitValue) {
        String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue)));
        HashMap<String, Object> headers = new HashMap<String, Object>();
        for (String strEntry : strEntries) {
            String[] kv = strEntry.split("=", -1);
            if (kv.length == 2) {
                headers.put(kv[0].trim(), kv[1].trim());
                continue;
            }
            if (kv.length == 1) {
                headers.put(kv[0].trim(), null);
                continue;
            }
            this.getLogger().warn("Malformed key value pair in AMQP header property ({}): {}", new Object[]{amqpPropValue, strEntry});
        }
        return headers;
    }

    protected Pattern getPattern(ProcessContext context, InputHeaderSource selectedHeaderSource) {
        return switch (selectedHeaderSource.ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> Pattern.compile(context.getProperty(HEADERS_PATTERN).evaluateAttributeExpressions().getValue());
            case 1 -> null;
        };
    }

    protected Character getHeaderSeparator(ProcessContext context, InputHeaderSource selectedHeaderSource) {
        return switch (selectedHeaderSource.ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> null;
            case 1 -> context.getProperty(HEADER_SEPARATOR).isSet() ? Character.valueOf(context.getProperty(HEADER_SEPARATOR).getValue().charAt(0)) : null;
        };
    }

    public static enum InputHeaderSource implements DescribedValue
    {
        FLOWFILE_ATTRIBUTES("FlowFile Attributes", "Select FlowFile Attributes based on regular expression pattern for event headers. Key of the matching attribute will be used as header key"),
        AMQP_HEADERS_ATTRIBUTE("AMQP Headers Attribute", "Prepare headers from 'amqp$headers' attribute string");

        private final String name;
        private final String description;

        private InputHeaderSource(String displayName, String description) {
            this.name = displayName;
            this.description = description;
        }

        public String getValue() {
            return this.name();
        }

        public String getDisplayName() {
            return this.name;
        }

        public String getDescription() {
            return this.description;
        }
    }
}

