/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.aws.sqs;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.sqs.DeleteSQS;
import org.apache.nifi.processors.aws.sqs.PutSQS;
import org.apache.nifi.processors.aws.v2.AbstractAwsSyncProcessor;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

@SupportsBatching
@SeeAlso(value={PutSQS.class, DeleteSQS.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
@CapabilityDescription(value="Fetches messages from an Amazon Simple Queuing Service Queue")
@WritesAttributes(value={@WritesAttribute(attribute="hash.value", description="The MD5 sum of the message"), @WritesAttribute(attribute="hash.algorithm", description="MD5"), @WritesAttribute(attribute="sqs.message.id", description="The unique identifier of the SQS message"), @WritesAttribute(attribute="sqs.receipt.handle", description="The SQS Receipt Handle that is to be used to delete the message from the queue")})
public class GetSQS
extends AbstractAwsSyncProcessor<SqsClient, SqsClientBuilder> {
    public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder().name("Queue URL").description("The URL of the queue to get messages from").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("Character Set").description("The Character Set that should be used to encode the textual content of the SQS message").required(true).defaultValue("UTF-8").allowableValues(Charset.availableCharsets().keySet().toArray(new String[0])).build();
    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder().name("Auto Delete Messages").description("Specifies whether the messages should be automatically deleted by the processors once they have been received.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder().name("Visibility Timeout").description("The amount of time after a message is received but not deleted that the message is hidden from other consumers").expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).defaultValue("15 mins").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The maximum number of messages to send in a single network request").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)10L, (boolean)true)).defaultValue("10").build();
    public static final PropertyDescriptor RECEIVE_MSG_WAIT_TIME = new PropertyDescriptor.Builder().name("Receive Message Wait Time").description("The maximum amount of time to wait on a long polling receive call. Setting this to a value of 1 second or greater will reduce the number of SQS requests and decrease fetch latency at the cost of a constantly active thread.").expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).defaultValue("0 sec").addValidator(StandardValidators.createTimePeriodValidator((long)0L, (TimeUnit)TimeUnit.SECONDS, (long)20L, (TimeUnit)TimeUnit.SECONDS)).build();
    public static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(QUEUE_URL, REGION, AWS_CREDENTIALS_PROVIDER_SERVICE, SSL_CONTEXT_SERVICE, AUTO_DELETE, BATCH_SIZE, TIMEOUT, ENDPOINT_OVERRIDE, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME, PROXY_CONFIGURATION_SERVICE);

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        ReceiveMessageResponse response;
        String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions().getValue();
        SqsClient client = (SqsClient)this.getClient(context);
        ReceiveMessageRequest request = (ReceiveMessageRequest)ReceiveMessageRequest.builder().messageSystemAttributeNames(new MessageSystemAttributeName[]{MessageSystemAttributeName.ALL}).messageAttributeNames(new String[]{"All"}).maxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger()).visibilityTimeout(Integer.valueOf(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue())).queueUrl(queueUrl).waitTimeSeconds(Integer.valueOf(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue())).build();
        Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
        try {
            response = client.receiveMessage(request);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to receive messages from Amazon SQS", (Throwable)e);
            context.yield();
            return;
        }
        List messages = response.messages();
        if (messages.isEmpty()) {
            context.yield();
            return;
        }
        boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
        for (Message message : messages) {
            FlowFile flowFile = session.create();
            HashMap<Object, String> attributes = new HashMap<Object, String>();
            for (Map.Entry entry : message.attributes().entrySet()) {
                attributes.put("sqs." + String.valueOf(entry.getKey()), (String)entry.getValue());
            }
            for (Map.Entry entry : message.messageAttributes().entrySet()) {
                attributes.put("sqs." + (String)entry.getKey(), ((MessageAttributeValue)entry.getValue()).stringValue());
            }
            attributes.put("hash.value", message.md5OfBody());
            attributes.put("hash.algorithm", "md5");
            attributes.put("sqs.message.id", message.messageId());
            attributes.put("sqs.receipt.handle", message.receiptHandle());
            flowFile = session.putAllAttributes(flowFile, attributes);
            flowFile = session.write(flowFile, out -> out.write(message.body().getBytes(charset)));
            session.transfer(flowFile, REL_SUCCESS);
            session.getProvenanceReporter().receive(flowFile, queueUrl);
            this.getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile});
        }
        if (autoDelete) {
            session.commitAsync(() -> this.deleteMessages(client, queueUrl, messages));
        }
    }

    private void deleteMessages(SqsClient client, String queueUrl, List<Message> messages) {
        ArrayList<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<DeleteMessageBatchRequestEntry>();
        for (Message message : messages) {
            DeleteMessageBatchRequestEntry entry = (DeleteMessageBatchRequestEntry)DeleteMessageBatchRequestEntry.builder().id(message.messageId()).receiptHandle(message.receiptHandle()).build();
            deleteRequestEntries.add(entry);
        }
        DeleteMessageBatchRequest deleteRequest = (DeleteMessageBatchRequest)DeleteMessageBatchRequest.builder().queueUrl(queueUrl).entries(deleteRequestEntries).build();
        try {
            client.deleteMessageBatch(deleteRequest);
        }
        catch (Exception e) {
            this.getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages may be duplicated", new Object[]{messages.size(), e});
        }
    }

    protected SqsClientBuilder createClientBuilder(ProcessContext context) {
        return SqsClient.builder();
    }
}

