/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.sqs.common;

import com.linecorp.armeria.client.retry.Backoff;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.source.sqs.common.SqsRetriesExhaustedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.KmsAccessDeniedException;
import software.amazon.awssdk.services.sqs.model.KmsNotFoundException;
import software.amazon.awssdk.services.sqs.model.KmsThrottledException;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.SqsException;
import software.amazon.awssdk.services.sts.model.StsException;

public class SqsWorkerCommon {
    private static final Logger LOG = LoggerFactory.getLogger(SqsWorkerCommon.class);
    public static final String ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME = "acknowledgementSetCallbackCounter";
    public static final String SQS_MESSAGES_RECEIVED_METRIC_NAME = "sqsMessagesReceived";
    public static final String SQS_MESSAGES_DELETED_METRIC_NAME = "sqsMessagesDeleted";
    public static final String SQS_MESSAGES_FAILED_METRIC_NAME = "sqsMessagesFailed";
    public static final String SQS_MESSAGES_DELETE_FAILED_METRIC_NAME = "sqsMessagesDeleteFailed";
    public static final String SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangedCount";
    public static final String SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangeFailedCount";
    public static final String SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME = "sqsMessagesAccessDenied";
    public static final String SQS_MESSAGE_THROTTLED_METRIC_NAME = "sqsMessagesThrottled";
    public static final String SQS_RESOURCE_NOT_FOUND_METRIC_NAME = "sqsResourceNotFound";
    private final Backoff standardBackoff;
    private final PluginMetrics pluginMetrics;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private volatile boolean isStopped;
    private int failedAttemptCount;
    private final Counter sqsMessagesReceivedCounter;
    private final Counter sqsMessagesDeletedCounter;
    private final Counter sqsMessagesFailedCounter;
    private final Counter sqsMessagesDeleteFailedCounter;
    private final Counter acknowledgementSetCallbackCounter;
    private final Counter sqsVisibilityTimeoutChangedCount;
    private final Counter sqsVisibilityTimeoutChangeFailedCount;
    private final Counter sqsMessageAccessDeniedCounter;
    private final Counter sqsMessageThrottledCounter;
    private final Counter sqsResourceNotFoundCounter;

    public SqsWorkerCommon(Backoff standardBackoff, PluginMetrics pluginMetrics, AcknowledgementSetManager acknowledgementSetManager) {
        this.standardBackoff = standardBackoff;
        this.pluginMetrics = pluginMetrics;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.isStopped = false;
        this.failedAttemptCount = 0;
        this.sqsMessagesReceivedCounter = pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME);
        this.sqsMessagesDeletedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETED_METRIC_NAME);
        this.sqsMessagesFailedCounter = pluginMetrics.counter(SQS_MESSAGES_FAILED_METRIC_NAME);
        this.sqsMessagesDeleteFailedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETE_FAILED_METRIC_NAME);
        this.acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME);
        this.sqsVisibilityTimeoutChangedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME);
        this.sqsVisibilityTimeoutChangeFailedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME);
        this.sqsMessageAccessDeniedCounter = pluginMetrics.counter(SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
        this.sqsMessageThrottledCounter = pluginMetrics.counter(SQS_MESSAGE_THROTTLED_METRIC_NAME);
        this.sqsResourceNotFoundCounter = pluginMetrics.counter(SQS_RESOURCE_NOT_FOUND_METRIC_NAME);
    }

    public List<Message> pollSqsMessages(String queueUrl, SqsClient sqsClient, Integer maxNumberOfMessages, Duration waitTime, Duration visibilityTimeout) {
        try {
            ReceiveMessageRequest request = this.createReceiveMessageRequest(queueUrl, maxNumberOfMessages, waitTime, visibilityTimeout);
            List messages = sqsClient.receiveMessage(request).messages();
            this.failedAttemptCount = 0;
            if (!messages.isEmpty()) {
                this.sqsMessagesReceivedCounter.increment((double)messages.size());
            }
            return messages;
        }
        catch (SqsException | StsException e) {
            LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", (Object)e.getMessage());
            this.recordSqsException((AwsServiceException)e);
            this.applyBackoff();
            return Collections.emptyList();
        }
    }

    private ReceiveMessageRequest createReceiveMessageRequest(String queueUrl, Integer maxNumberOfMessages, Duration waitTime, Duration visibilityTimeout) {
        ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder().queueUrl(queueUrl).attributeNamesWithStrings(new String[]{"All"}).messageAttributeNames(new String[]{"All"});
        if (waitTime != null) {
            requestBuilder.waitTimeSeconds(Integer.valueOf((int)waitTime.getSeconds()));
        }
        if (maxNumberOfMessages != null) {
            requestBuilder.maxNumberOfMessages(maxNumberOfMessages);
        }
        if (visibilityTimeout != null) {
            requestBuilder.visibilityTimeout(Integer.valueOf((int)visibilityTimeout.getSeconds()));
        }
        return (ReceiveMessageRequest)requestBuilder.build();
    }

    public void applyBackoff() {
        long delayMillis;
        if ((delayMillis = this.standardBackoff.nextDelayMillis(++this.failedAttemptCount)) < 0L) {
            Thread.currentThread().interrupt();
            throw new SqsRetriesExhaustedException("SQS retries exhausted. Check your SQS configuration.");
        }
        Duration delayDuration = Duration.ofMillis(delayMillis);
        LOG.info("Pausing SQS processing for {}.{} seconds due to an error.", (Object)delayDuration.getSeconds(), (Object)delayDuration.toMillisPart());
        try {
            Thread.sleep(delayMillis);
        }
        catch (InterruptedException e) {
            LOG.error("Thread interrupted during SQS backoff sleep.", (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    public void deleteSqsMessages(String queueUrl, SqsClient sqsClient, List<DeleteMessageBatchRequestEntry> entries) {
        if (entries == null || entries.isEmpty() || this.isStopped) {
            return;
        }
        try {
            DeleteMessageBatchRequest request = (DeleteMessageBatchRequest)DeleteMessageBatchRequest.builder().queueUrl(queueUrl).entries(entries).build();
            DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(request);
            if (response.hasSuccessful()) {
                int successCount = response.successful().size();
                this.sqsMessagesDeletedCounter.increment((double)successCount);
                LOG.debug("Deleted {} messages from SQS queue [{}]", (Object)successCount, (Object)queueUrl);
            }
            if (response.hasFailed()) {
                int failCount = response.failed().size();
                this.sqsMessagesDeleteFailedCounter.increment((double)failCount);
                LOG.error("Failed to delete {} messages from SQS queue [{}].", (Object)failCount, (Object)queueUrl);
            }
        }
        catch (SdkException e) {
            this.sqsMessagesDeleteFailedCounter.increment((double)entries.size());
            LOG.error("Failed to delete messages from SQS queue [{}]: {}", (Object)queueUrl, (Object)e.getMessage());
        }
    }

    public void increaseVisibilityTimeout(String queueUrl, SqsClient sqsClient, String receiptHandle, int newVisibilityTimeoutSeconds, String messageIdForLogging) {
        if (this.isStopped) {
            LOG.info("Skipping visibility timeout extension because worker is stopping. ID: {}", (Object)messageIdForLogging);
            return;
        }
        try {
            ChangeMessageVisibilityRequest request = (ChangeMessageVisibilityRequest)ChangeMessageVisibilityRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle).visibilityTimeout(Integer.valueOf(newVisibilityTimeoutSeconds)).build();
            sqsClient.changeMessageVisibility(request);
            this.sqsVisibilityTimeoutChangedCount.increment();
            LOG.debug("Set visibility timeout for message {} to {} seconds", (Object)messageIdForLogging, (Object)newVisibilityTimeoutSeconds);
        }
        catch (Exception e) {
            this.sqsVisibilityTimeoutChangeFailedCount.increment();
            LOG.error("Failed to set visibility timeout for message {} to {}. Reason: {}", new Object[]{messageIdForLogging, newVisibilityTimeoutSeconds, e.getMessage()});
        }
    }

    public DeleteMessageBatchRequestEntry buildDeleteMessageBatchRequestEntry(String messageId, String receiptHandle) {
        return (DeleteMessageBatchRequestEntry)DeleteMessageBatchRequestEntry.builder().id(messageId).receiptHandle(receiptHandle).build();
    }

    public Timer createTimer(String timerName) {
        return this.pluginMetrics.timer(timerName);
    }

    public Counter getSqsMessagesFailedCounter() {
        return this.sqsMessagesFailedCounter;
    }

    public Counter getSqsMessagesDeletedCounter() {
        return this.sqsMessagesDeletedCounter;
    }

    public void stop() {
        this.isStopped = true;
    }

    public void recordSqsException(AwsServiceException e) {
        if (e.statusCode() == 403 || e instanceof KmsAccessDeniedException) {
            this.sqsMessageAccessDeniedCounter.increment();
        } else if (e.statusCode() == 404 || e instanceof QueueDoesNotExistException || e instanceof KmsNotFoundException) {
            this.sqsResourceNotFoundCounter.increment();
        } else if (e.isThrottlingException() || e instanceof KmsThrottledException) {
            this.sqsMessageThrottledCounter.increment();
        }
    }
}

