/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener.acknowledgement;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.SqsAcknowledgementException;
import io.awspring.cloud.sqs.listener.QueueAttributes;
import io.awspring.cloud.sqs.listener.QueueAttributesAware;
import io.awspring.cloud.sqs.listener.SqsAsyncClientAware;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementExecutor;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StopWatch;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;

public class SqsAcknowledgementExecutor<T>
implements AcknowledgementExecutor<T>,
SqsAsyncClientAware,
QueueAttributesAware {
    private static final Logger logger = LoggerFactory.getLogger(SqsAcknowledgementExecutor.class);
    private SqsAsyncClient sqsAsyncClient;
    private String queueUrl;
    private String queueName;

    @Override
    public void setQueueAttributes(QueueAttributes queueAttributes) {
        Assert.notNull((Object)queueAttributes, (String)"queueAttributes cannot be null");
        this.queueUrl = queueAttributes.getQueueUrl();
        this.queueName = queueAttributes.getQueueName();
    }

    @Override
    public void setSqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
        Assert.notNull((Object)sqsAsyncClient, (String)"sqsAsyncClient cannot be null");
        this.sqsAsyncClient = sqsAsyncClient;
    }

    @Override
    public CompletableFuture<Void> execute(Collection<Message<T>> messagesToAck) {
        try {
            logger.debug("Executing acknowledgement for {} messages", (Object)messagesToAck.size());
            Assert.notEmpty(messagesToAck, () -> "empty collection sent to acknowledge in queue " + this.queueName);
            return this.deleteMessages(messagesToAck);
        }
        catch (Exception e) {
            return CompletableFutures.failedFuture((Throwable)((Object)this.createAcknowledgementException(messagesToAck, e)));
        }
    }

    private SqsAcknowledgementException createAcknowledgementException(Collection<Message<T>> messagesToAck, Throwable e) {
        return new SqsAcknowledgementException("Error acknowledging messages " + MessageHeaderUtils.getId(messagesToAck), Collections.emptyList(), messagesToAck.stream().map(msg -> msg).collect(Collectors.toList()), this.queueUrl, e);
    }

    private CompletableFuture<Void> deleteMessages(Collection<Message<T>> messagesToAck) {
        logger.trace("Acknowledging messages for queue {}: {}", (Object)this.queueName, (Object)MessageHeaderUtils.getId(messagesToAck));
        StopWatch watch = new StopWatch();
        watch.start();
        return CompletableFutures.exceptionallyCompose(this.sqsAsyncClient.deleteMessageBatch(this.createDeleteMessageBatchRequest(messagesToAck)).thenRun(() -> {}), t -> CompletableFutures.failedFuture((Throwable)((Object)this.createAcknowledgementException(messagesToAck, (Throwable)t)))).whenComplete((v, t) -> this.logAckResult(messagesToAck, (Throwable)t, watch));
    }

    private DeleteMessageBatchRequest createDeleteMessageBatchRequest(Collection<Message<T>> messagesToAck) {
        return (DeleteMessageBatchRequest)DeleteMessageBatchRequest.builder().queueUrl(this.queueUrl).entries((Collection)messagesToAck.stream().map(this::toDeleteMessageEntry).collect(Collectors.toList())).build();
    }

    private DeleteMessageBatchRequestEntry toDeleteMessageEntry(Message<T> message) {
        return (DeleteMessageBatchRequestEntry)DeleteMessageBatchRequestEntry.builder().receiptHandle(MessageHeaderUtils.getHeaderAsString(message, "Sqs_ReceiptHandle")).id(UUID.randomUUID().toString()).build();
    }

    private void logAckResult(Collection<Message<T>> messagesToAck, Throwable t, StopWatch watch) {
        watch.stop();
        long totalTimeMillis = watch.getTotalTimeMillis();
        if (totalTimeMillis > 10000L) {
            logger.warn("Acknowledgement operation took {}ms to finish in queue {} for messages {}", new Object[]{totalTimeMillis, this.queueName, MessageHeaderUtils.getId(messagesToAck)});
        }
        if (t != null) {
            logger.error("Error acknowledging in queue {} messages {} in {}ms", new Object[]{this.queueName, MessageHeaderUtils.getId(messagesToAck), totalTimeMillis, t instanceof CompletionException ? t.getCause() : t});
        } else {
            logger.trace("Done acknowledging in queue {} messages: {} in {}ms", new Object[]{this.queueName, MessageHeaderUtils.getId(messagesToAck), totalTimeMillis});
        }
    }
}

