/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.sqs.javamessaging;

import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper;
import com.amazon.sqs.javamessaging.PrefetchManager;
import com.amazon.sqs.javamessaging.SQSMessageConsumer;
import com.amazon.sqs.javamessaging.SQSQueueDestination;
import com.amazon.sqs.javamessaging.SQSSessionCallbackScheduler;
import com.amazon.sqs.javamessaging.acknowledge.Acknowledger;
import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger;
import com.amazon.sqs.javamessaging.acknowledge.SQSMessageIdentifier;
import com.amazon.sqs.javamessaging.message.SQSBytesMessage;
import com.amazon.sqs.javamessaging.message.SQSMessage;
import com.amazon.sqs.javamessaging.message.SQSObjectMessage;
import com.amazon.sqs.javamessaging.message.SQSTextMessage;
import com.amazon.sqs.javamessaging.util.ExponentialBackoffStrategy;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

public class SQSMessageConsumerPrefetch
implements Runnable,
PrefetchManager {
    private static final Logger LOG = LoggerFactory.getLogger(SQSMessageConsumerPrefetch.class);
    protected static int WAIT_TIME_SECONDS = 20;
    protected static final String ALL = "All";
    private final AmazonSQSMessagingClientWrapper amazonSQSClient;
    private final String queueUrl;
    private final int numberOfMessagesToPrefetch;
    private final SQSQueueDestination sqsDestination;
    protected final ArrayDeque<MessageManager> messageQueue;
    private final Acknowledger acknowledger;
    private final NegativeAcknowledger negativeAcknowledger;
    private volatile MessageListener messageListener;
    private SQSMessageConsumer messageConsumer;
    private final SQSSessionCallbackScheduler sqsSessionRunnable;
    protected int messagesPrefetched = 0;
    protected int messagesRequested = 0;
    protected volatile boolean closed = false;
    protected volatile boolean running = false;
    protected int retriesAttempted = 0;
    private final Object stateLock = new Object();
    protected ExponentialBackoffStrategy backoffStrategy = new ExponentialBackoffStrategy(25L, 25L, 2000L);

    SQSMessageConsumerPrefetch(SQSSessionCallbackScheduler sqsSessionRunnable, Acknowledger acknowledger, NegativeAcknowledger negativeAcknowledger, SQSQueueDestination sqsDestination, AmazonSQSMessagingClientWrapper amazonSQSClient, int numberOfMessagesToPrefetch) {
        this.amazonSQSClient = amazonSQSClient;
        this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch;
        this.acknowledger = acknowledger;
        this.negativeAcknowledger = negativeAcknowledger;
        this.queueUrl = sqsDestination.getQueueUrl();
        this.sqsDestination = sqsDestination;
        this.sqsSessionRunnable = sqsSessionRunnable;
        this.messageQueue = new ArrayDeque(numberOfMessagesToPrefetch);
    }

    MessageListener getMessageListener() {
        return this.messageListener;
    }

    void setMessageConsumer(SQSMessageConsumer messageConsumer) {
        this.messageConsumer = messageConsumer;
    }

    @Override
    public SQSMessageConsumer getMessageConsumer() {
        return this.messageConsumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void setMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
        if (messageListener == null || this.isClosed()) {
            return;
        }
        Object object = this.stateLock;
        synchronized (object) {
            if (!this.running || this.isClosed()) {
                return;
            }
            ArrayList<MessageManager> allPrefetchedMessages = new ArrayList<MessageManager>(this.messageQueue);
            this.sqsSessionRunnable.scheduleCallBacks(messageListener, allPrefetchedMessages);
            this.messageQueue.clear();
            this.messageListenerReady();
        }
    }

    private int numberOfMessagesToFetch() {
        int numberOfMessagesNeeded = Math.max(this.numberOfMessagesToPrefetch, this.messagesRequested);
        return Math.max(numberOfMessagesNeeded - this.messagesPrefetched, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (true) {
            boolean nackQueueMessages = false;
            List<software.amazon.awssdk.services.sqs.model.Message> messages = null;
            try {
                int prefetchBatchSize;
                if (this.isClosed()) break;
                Object object = this.stateLock;
                synchronized (object) {
                    this.waitForStart();
                    this.waitForPrefetch();
                    prefetchBatchSize = Math.min(this.numberOfMessagesToFetch(), 10);
                }
                if (!this.isClosed()) {
                    messages = this.getMessagesWithBackoff(prefetchBatchSize);
                }
                if (messages == null || messages.isEmpty()) continue;
                this.processReceivedMessages(messages);
                continue;
            }
            catch (InterruptedException e) {
                nackQueueMessages = true;
            }
            catch (Throwable e) {
                LOG.error("Unexpected exception when prefetch messages:", e);
                nackQueueMessages = true;
                throw new RuntimeException(e);
            }
            finally {
                if (!this.isClosed() && !nackQueueMessages) continue;
                this.nackQueueMessages();
                continue;
            }
            break;
        }
    }

    protected List<software.amazon.awssdk.services.sqs.model.Message> getMessages(int batchSize, int waitTimeSeconds) throws JMSException {
        assert (batchSize > 0);
        ReceiveMessageRequest.Builder receiveMessageRequestBuilder = ReceiveMessageRequest.builder().queueUrl(this.queueUrl).maxNumberOfMessages(Integer.valueOf(batchSize)).attributeNamesWithStrings(new String[]{ALL}).messageAttributeNames(new String[]{ALL}).waitTimeSeconds(Integer.valueOf(waitTimeSeconds));
        if (this.sqsDestination.isFifo()) {
            receiveMessageRequestBuilder.receiveRequestAttemptId(UUID.randomUUID().toString());
        }
        ReceiveMessageResponse receivedMessageResult = this.amazonSQSClient.receiveMessage((ReceiveMessageRequest)receiveMessageRequestBuilder.build());
        return receivedMessageResult.messages();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processReceivedMessages(List<software.amazon.awssdk.services.sqs.model.Message> messages) {
        ArrayList<String> nackMessages = new ArrayList<String>();
        ArrayList<MessageManager> messageManagers = new ArrayList<MessageManager>();
        for (software.amazon.awssdk.services.sqs.model.Message message : messages) {
            try {
                Message jmsMessage = this.convertToJMSMessage(message);
                messageManagers.add(new MessageManager(this, jmsMessage));
            }
            catch (JMSException e) {
                LOG.warn("Caught exception while converting received messages", (Throwable)e);
                nackMessages.add(message.receiptHandle());
            }
        }
        Object object = this.stateLock;
        synchronized (object) {
            if (this.messageListener != null) {
                this.sqsSessionRunnable.scheduleCallBacks(this.messageListener, messageManagers);
            } else {
                this.messageQueue.addAll(messageManagers);
            }
            this.messagesPrefetched += messageManagers.size();
            this.notifyStateChange();
        }
        try {
            this.negativeAcknowledger.action(this.queueUrl, nackMessages);
        }
        catch (JMSException e) {
            LOG.warn("Caught exception while nacking received messages", (Throwable)e);
        }
    }

    protected List<software.amazon.awssdk.services.sqs.model.Message> getMessagesWithBackoff(int batchSize) throws InterruptedException {
        try {
            List<software.amazon.awssdk.services.sqs.model.Message> result = this.getMessages(batchSize, WAIT_TIME_SECONDS);
            this.retriesAttempted = 0;
            return result;
        }
        catch (JMSException e) {
            LOG.warn("Encountered exception during receive in ConsumerPrefetch thread", (Throwable)e);
            try {
                this.sleep(this.backoffStrategy.delayBeforeNextRetry(this.retriesAttempted++));
                return Collections.emptyList();
            }
            catch (InterruptedException ex) {
                LOG.warn("Interrupted while retrying on receive", (Throwable)ex);
                throw ex;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForPrefetch() throws InterruptedException {
        Object object = this.stateLock;
        synchronized (object) {
            while (this.numberOfMessagesToFetch() <= 0 && !this.isClosed()) {
                try {
                    this.stateLock.wait();
                }
                catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting on prefetch", (Throwable)e);
                    throw e;
                }
            }
        }
    }

    protected Message convertToJMSMessage(software.amazon.awssdk.services.sqs.model.Message message) throws JMSException {
        MessageAttributeValue correlationIdAttribute;
        SQSMessage jmsMessage;
        MessageAttributeValue messageTypeAttribute = (MessageAttributeValue)message.messageAttributes().get("JMS_SQSMessageType");
        if (messageTypeAttribute == null) {
            jmsMessage = new SQSTextMessage(this.acknowledger, this.queueUrl, message);
        } else {
            String messageType = messageTypeAttribute.stringValue();
            if ("byte".equals(messageType)) {
                try {
                    jmsMessage = new SQSBytesMessage(this.acknowledger, this.queueUrl, message);
                }
                catch (JMSException e) {
                    LOG.warn("MessageReceiptHandle - " + message.receiptHandle() + "cannot be serialized to BytesMessage", (Throwable)e);
                    throw e;
                }
            } else if ("object".equals(messageType)) {
                jmsMessage = new SQSObjectMessage(this.acknowledger, this.queueUrl, message);
            } else if ("text".equals(messageType)) {
                jmsMessage = new SQSTextMessage(this.acknowledger, this.queueUrl, message);
            } else {
                throw new JMSException("Not a supported JMS message type");
            }
        }
        jmsMessage.setJMSDestination(this.sqsDestination);
        MessageAttributeValue replyToQueueNameAttribute = (MessageAttributeValue)message.messageAttributes().get("JMS_SQSReplyToQueueName");
        MessageAttributeValue replyToQueueUrlAttribute = (MessageAttributeValue)message.messageAttributes().get("JMS_SQSReplyToQueueURL");
        if (replyToQueueNameAttribute != null && replyToQueueUrlAttribute != null) {
            String replyToQueueUrl = replyToQueueUrlAttribute.stringValue();
            String replyToQueueName = replyToQueueNameAttribute.stringValue();
            SQSQueueDestination replyToQueue = new SQSQueueDestination(replyToQueueName, replyToQueueUrl);
            jmsMessage.setJMSReplyTo(replyToQueue);
        }
        if ((correlationIdAttribute = (MessageAttributeValue)message.messageAttributes().get("JMS_SQSCorrelationID")) != null) {
            jmsMessage.setJMSCorrelationID(correlationIdAttribute.stringValue());
        }
        jmsMessage.setJMSTimestamp(this.getJMSTimestamp(message));
        return jmsMessage;
    }

    private long getJMSTimestamp(software.amazon.awssdk.services.sqs.model.Message message) {
        Map systemAttributes = message.attributesAsStrings();
        String timestamp = (String)systemAttributes.get("SentTimestamp");
        if (timestamp != null) {
            return Long.parseLong(timestamp);
        }
        return 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void nackQueueMessages() {
        Object object = this.stateLock;
        synchronized (object) {
            try {
                this.negativeAcknowledger.bulkAction(this.messageQueue, this.queueUrl);
            }
            catch (JMSException e) {
                LOG.warn("Caught exception while nacking queued messages", (Throwable)e);
            }
            finally {
                this.notifyStateChange();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForStart() throws InterruptedException {
        Object object = this.stateLock;
        synchronized (object) {
            while (!this.running && !this.isClosed()) {
                try {
                    this.stateLock.wait();
                }
                catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting on consumer start", (Throwable)e);
                    throw e;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void messageDispatched() {
        Object object = this.stateLock;
        synchronized (object) {
            --this.messagesPrefetched;
            --this.messagesRequested;
            if (this.numberOfMessagesToFetch() > 0) {
                this.notifyStateChange();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void messageListenerReady() {
        Object object = this.stateLock;
        synchronized (object) {
            if (this.messagesRequested <= 0 && !this.isClosed() && this.messageListener != null) {
                this.requestMessage();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void requestMessage() {
        Object object = this.stateLock;
        synchronized (object) {
            ++this.messagesRequested;
            this.notifyStateChange();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unrequestMessage() {
        Object object = this.stateLock;
        synchronized (object) {
            --this.messagesRequested;
            this.notifyStateChange();
        }
    }

    Message receive() throws JMSException {
        return this.receive(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    Message receive(long timeout) throws JMSException {
        if (this.cannotDeliver()) {
            return null;
        }
        if (timeout < 0L) {
            timeout = 0L;
        }
        MessageManager messageManager = null;
        Object object = this.stateLock;
        synchronized (object) {
            this.requestMessage();
            try {
                if (this.messageQueue.isEmpty()) {
                    long startTime = System.currentTimeMillis();
                    long waitTime = 0L;
                    while (this.messageQueue.isEmpty() && !this.isClosed() && (timeout == 0L || (waitTime = this.getWaitTime(timeout, startTime)) > 0L)) {
                        try {
                            this.stateLock.wait(waitTime);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            Message message = null;
                            if (messageManager != null) return message;
                            this.unrequestMessage();
                            return message;
                        }
                    }
                    if (this.messageQueue.isEmpty() || this.isClosed()) {
                        Message message = null;
                        return message;
                    }
                }
                messageManager = this.messageQueue.pollFirst();
            }
            finally {
                if (messageManager == null) {
                    this.unrequestMessage();
                }
            }
            return this.messageHandler(messageManager);
        }
    }

    private long getWaitTime(long timeout, long startTime) {
        return timeout - (System.currentTimeMillis() - startTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void notifyStateChange() {
        Object object = this.stateLock;
        synchronized (object) {
            this.stateLock.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Message receiveNoWait() throws JMSException {
        MessageManager messageManager;
        if (this.cannotDeliver()) {
            return null;
        }
        Object object = this.stateLock;
        synchronized (object) {
            List<software.amazon.awssdk.services.sqs.model.Message> messages;
            if (this.messageQueue.isEmpty() && this.numberOfMessagesToPrefetch == 0 && (messages = this.getMessages(1, 0)) != null && !messages.isEmpty()) {
                this.processReceivedMessages(messages);
            }
            messageManager = this.messageQueue.pollFirst();
        }
        if (messageManager != null) {
            this.requestMessage();
        }
        return this.messageHandler(messageManager);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void start() {
        if (this.isClosed() || this.running) {
            return;
        }
        Object object = this.stateLock;
        synchronized (object) {
            this.running = true;
            this.messageListenerReady();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stop() {
        if (this.isClosed() || !this.running) {
            return;
        }
        Object object = this.stateLock;
        synchronized (object) {
            this.running = false;
            this.notifyStateChange();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close() {
        if (this.isClosed()) {
            return;
        }
        Object object = this.stateLock;
        synchronized (object) {
            this.closed = true;
            this.notifyStateChange();
            this.messageListener = null;
        }
    }

    private Message messageHandler(MessageManager messageManager) throws JMSException {
        if (messageManager == null) {
            return null;
        }
        Message message = messageManager.message();
        this.messageDispatched();
        this.acknowledger.notifyMessageReceived((SQSMessage)message);
        return message;
    }

    private boolean cannotDeliver() throws JMSException {
        if (!this.running) {
            return true;
        }
        if (this.isClosed()) {
            throw new JMSException("Cannot receive messages when the consumer is closed");
        }
        if (this.messageListener != null) {
            throw new JMSException("Cannot receive messages synchronously after a message listener is set");
        }
        return false;
    }

    protected void sleep(long sleepTimeMillis) throws InterruptedException {
        Thread.sleep(sleepTimeMillis);
    }

    protected boolean isClosed() {
        return this.closed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<SQSMessageIdentifier> purgePrefetchedMessagesWithGroups(Set<String> affectedGroups) throws JMSException {
        ArrayList<SQSMessageIdentifier> purgedMessages = new ArrayList<SQSMessageIdentifier>();
        Object object = this.stateLock;
        synchronized (object) {
            Iterator<MessageManager> managerIterator = this.messageQueue.iterator();
            while (managerIterator.hasNext()) {
                MessageManager messageManager = managerIterator.next();
                SQSMessage prefetchedMessage = (SQSMessage)messageManager.message();
                SQSMessageIdentifier messageIdentifier = SQSMessageIdentifier.fromSQSMessage(prefetchedMessage);
                if (!affectedGroups.contains(messageIdentifier.getGroupId())) continue;
                purgedMessages.add(messageIdentifier);
                managerIterator.remove();
                --this.messagesPrefetched;
            }
            this.notifyStateChange();
        }
        return purgedMessages;
    }

    public record MessageManager(PrefetchManager prefetchManager, Message message) {
    }
}

