/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener.source;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.QueueAttributesResolver;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.QueueAttributes;
import io.awspring.cloud.sqs.listener.QueueAttributesAware;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
import io.awspring.cloud.sqs.listener.SqsAsyncClientAware;
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementExecutor;
import io.awspring.cloud.sqs.listener.acknowledgement.ExecutingAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.SqsAcknowledgementExecutor;
import io.awspring.cloud.sqs.listener.source.AbstractPollingMessageSource;
import io.awspring.cloud.sqs.support.converter.MessageConversionContext;
import io.awspring.cloud.sqs.support.converter.SqsMessageConversionContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

public abstract class AbstractSqsMessageSource<T>
extends AbstractPollingMessageSource<T, Message>
implements SqsAsyncClientAware {
    private static final Logger logger = LoggerFactory.getLogger(AbstractSqsMessageSource.class);
    private static final int MESSAGE_VISIBILITY_DISABLED = -1;
    private SqsAsyncClient sqsAsyncClient;
    private String queueUrl;
    private QueueAttributes queueAttributes;
    private QueueNotFoundStrategy queueNotFoundStrategy;
    private Collection<QueueAttributeName> queueAttributeNames;
    private Collection<String> messageAttributeNames;
    private Collection<String> messageSystemAttributeNames;
    private int messageVisibility;
    private int pollTimeout;

    @Override
    public void setSqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
        Assert.notNull((Object)sqsAsyncClient, (String)"sqsAsyncClient cannot be null.");
        this.sqsAsyncClient = sqsAsyncClient;
    }

    @Override
    protected void doConfigure(ContainerOptions<?, ?> containerOptions) {
        Assert.isInstanceOf(SqsContainerOptions.class, containerOptions, (String)"containerOptions must be an instance of SqsContainerOptions");
        SqsContainerOptions sqsContainerOptions = (SqsContainerOptions)containerOptions;
        this.pollTimeout = (int)sqsContainerOptions.getPollTimeout().getSeconds();
        this.queueAttributeNames = sqsContainerOptions.getQueueAttributeNames();
        this.messageAttributeNames = sqsContainerOptions.getMessageAttributeNames();
        this.messageSystemAttributeNames = sqsContainerOptions.getMessageSystemAttributeNames();
        this.queueNotFoundStrategy = sqsContainerOptions.getQueueNotFoundStrategy();
        this.messageVisibility = sqsContainerOptions.getMessageVisibility() != null ? (int)sqsContainerOptions.getMessageVisibility().getSeconds() : -1;
    }

    @Override
    protected void doStart() {
        Assert.notNull((Object)this.sqsAsyncClient, (String)"sqsAsyncClient not set");
        Assert.notNull(this.queueAttributeNames, (String)"queueAttributeNames not set");
        this.queueAttributes = this.resolveQueueAttributes();
        this.queueUrl = this.queueAttributes.getQueueUrl();
        this.configureConversionContextAndAcknowledgement();
    }

    private void configureConversionContextAndAcknowledgement() {
        ConfigUtils.INSTANCE.acceptIfInstance(this.getMessageConversionContext(), SqsAsyncClientAware.class, saca -> saca.setSqsAsyncClient(this.sqsAsyncClient)).acceptIfInstance(this.getMessageConversionContext(), QueueAttributesAware.class, qaa -> qaa.setQueueAttributes(this.queueAttributes)).acceptIfInstance(this.getAcknowledgmentProcessor(), ExecutingAcknowledgementProcessor.class, eap -> eap.setAcknowledgementExecutor(this.createAndConfigureAcknowledgementExecutor(this.queueAttributes)));
    }

    @Override
    protected void doConfigurePayloadTypeOnContext(Class<?> payloadType, MessageConversionContext context) {
        ConfigUtils.INSTANCE.acceptIfInstance(context, SqsMessageConversionContext.class, ctx -> ctx.setPayloadClass(payloadType));
    }

    private QueueAttributes resolveQueueAttributes() {
        return QueueAttributesResolver.builder().queueName(this.getPollingEndpointName()).sqsAsyncClient(this.sqsAsyncClient).queueAttributeNames(this.queueAttributeNames).queueNotFoundStrategy(this.queueNotFoundStrategy).build().resolveQueueAttributes().join();
    }

    protected AcknowledgementExecutor<T> createAndConfigureAcknowledgementExecutor(QueueAttributes queueAttributes) {
        AcknowledgementExecutor<T> executor = this.createAcknowledgementExecutorInstance();
        ConfigUtils.INSTANCE.acceptIfInstance(executor, QueueAttributesAware.class, qaa -> qaa.setQueueAttributes(queueAttributes)).acceptIfInstance(executor, SqsAsyncClientAware.class, saca -> saca.setSqsAsyncClient(this.sqsAsyncClient));
        return executor;
    }

    protected AcknowledgementExecutor<T> createAcknowledgementExecutorInstance() {
        return new SqsAcknowledgementExecutor();
    }

    @Override
    protected CompletableFuture<Collection<Message>> doPollForMessages(int maxNumberOfMessages) {
        logger.debug("Polling queue {} for {} messages.", (Object)this.queueUrl, (Object)maxNumberOfMessages);
        return maxNumberOfMessages <= 10 ? this.executePoll(maxNumberOfMessages) : this.executeMultiplePolls(maxNumberOfMessages);
    }

    private CompletableFuture<Collection<Message>> executePoll(int maxNumberOfMessages) {
        return ((CompletableFuture)((CompletableFuture)this.sqsAsyncClient.receiveMessage(this.createRequest(maxNumberOfMessages)).thenApply(ReceiveMessageResponse::messages)).thenApply(collectionList -> collectionList)).whenComplete(this::logMessagesReceived);
    }

    private ReceiveMessageRequest createRequest(int maxNumberOfMessages) {
        ReceiveMessageRequest.Builder builder = ReceiveMessageRequest.builder().queueUrl(this.queueUrl).maxNumberOfMessages(Integer.valueOf(maxNumberOfMessages)).attributeNamesWithStrings(this.messageSystemAttributeNames).messageAttributeNames(this.messageAttributeNames).waitTimeSeconds(Integer.valueOf(this.pollTimeout));
        this.customizeRequest(builder);
        if (this.messageVisibility >= 0) {
            builder.visibilityTimeout(Integer.valueOf(this.messageVisibility));
        }
        return (ReceiveMessageRequest)builder.build();
    }

    protected void customizeRequest(ReceiveMessageRequest.Builder builder) {
    }

    private CompletableFuture<Collection<Message>> executeMultiplePolls(int maxNumberOfMessages) {
        int remainder = maxNumberOfMessages % 10;
        return remainder == 0 ? this.combinePolls(maxNumberOfMessages) : this.combinePolls(maxNumberOfMessages).thenCombine(this.executePoll(remainder), this::combineBatches);
    }

    private CompletableFuture<Collection<Message>> combinePolls(int maxNumberOfMessages) {
        return IntStream.range(0, maxNumberOfMessages / 10).mapToObj(index -> this.executePoll(10)).reduce(CompletableFuture.completedFuture(Collections.emptyList()), (first, second) -> first.thenCombine((CompletionStage)second, this::combineBatches));
    }

    private Collection<Message> combineBatches(Collection<Message> firstBatch, Collection<Message> secondBatch) {
        ArrayList<Message> combinedBatch = new ArrayList<Message>(firstBatch);
        combinedBatch.addAll(secondBatch);
        return combinedBatch;
    }

    private void logMessagesReceived(@Nullable Collection<Message> v, @Nullable Throwable t) {
        if (v != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Received {} messages {} from queue {}", new Object[]{v.size(), v.stream().map(Message::messageId).collect(Collectors.toList()), this.queueUrl});
            } else {
                logger.debug("Received {} messages from queue {}", (Object)v.size(), (Object)this.queueUrl);
            }
        }
    }
}

