/*
 * Decompiled with CFR 0.152.
 */
package com.erudika.para.server.queue;

import com.erudika.para.core.listeners.DestroyListener;
import com.erudika.para.core.utils.Para;
import com.erudika.para.server.queue.River;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;

public final class AWSQueueUtils {
    private static SqsAsyncClient sqsClient;
    private static final int MAX_MESSAGES = 10;
    private static final Map<String, Future<?>> POLLING_THREADS;
    private static final String LOCAL_ENDPOINT = "http://localhost:9324";
    private static final Logger logger;

    private AWSQueueUtils() {
    }

    public static SqsAsyncClient getClient() {
        if (sqsClient != null) {
            return sqsClient;
        }
        sqsClient = Para.getConfig().environment().equals("embedded") ? (SqsAsyncClient)((SqsAsyncClientBuilder)((SqsAsyncClientBuilder)SqsAsyncClient.builder().endpointOverride(URI.create(LOCAL_ENDPOINT))).credentialsProvider((AwsCredentialsProvider)StaticCredentialsProvider.create((AwsCredentials)AwsBasicCredentials.create((String)"x", (String)"x")))).build() : SqsAsyncClient.create();
        Para.addDestroyListener((DestroyListener)new DestroyListener(){

            public void onDestroy() {
                sqsClient.close();
            }
        });
        return sqsClient;
    }

    protected static String createQueue(String name) {
        if (StringUtils.isBlank((CharSequence)name)) {
            return null;
        }
        String queueURL = AWSQueueUtils.getQueueURL(name);
        if (queueURL == null) {
            try {
                queueURL = ((CreateQueueResponse)AWSQueueUtils.getClient().createQueue((T b) -> b.queueName(name)).get()).queueUrl();
            }
            catch (AwsServiceException ase) {
                AWSQueueUtils.logException(ase);
            }
            catch (SdkException ace) {
                logger.error("Could not reach SQS. {0}", (Object)ace.toString());
            }
            catch (InterruptedException | ExecutionException ex) {
                logger.error(null, (Throwable)ex);
                Thread.currentThread().interrupt();
            }
        }
        return queueURL;
    }

    protected static void deleteQueue(String queueURL) {
        if (!StringUtils.isBlank((CharSequence)queueURL)) {
            try {
                AWSQueueUtils.getClient().deleteQueue((T b) -> b.queueUrl(queueURL));
            }
            catch (AwsServiceException ase) {
                AWSQueueUtils.logException(ase);
            }
            catch (SdkException ace) {
                logger.error("Could not reach SQS. {0}", (Object)ace.toString());
            }
        }
    }

    protected static String getQueueURL(String name) {
        try {
            return ((GetQueueUrlResponse)AWSQueueUtils.getClient().getQueueUrl(b -> b.queueName(name)).get()).queueUrl();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            logger.info("Queue '{}' could not be found: {}", (Object)name, (Object)e.getMessage());
        }
        return null;
    }

    protected static List<String> listQueues() {
        ArrayList<String> list = new ArrayList();
        try {
            list = ((ListQueuesResponse)AWSQueueUtils.getClient().listQueues().get()).queueUrls();
        }
        catch (AwsServiceException ase) {
            AWSQueueUtils.logException(ase);
        }
        catch (SdkException ace) {
            logger.error("Could not reach SQS. {0}", (Object)ace.toString());
        }
        catch (InterruptedException | ExecutionException ex) {
            logger.error(null, (Throwable)ex);
            Thread.currentThread().interrupt();
        }
        return list;
    }

    protected static void pushMessages(String queueURL, List<String> messages) {
        if (!StringUtils.isBlank((CharSequence)queueURL) && messages != null) {
            try {
                int j = 0;
                ArrayList<SendMessageBatchRequestEntry> msgs = new ArrayList<SendMessageBatchRequestEntry>(10);
                for (int i = 0; i < messages.size(); ++i) {
                    String message = messages.get(i);
                    if (!StringUtils.isBlank((CharSequence)message)) {
                        msgs.add((SendMessageBatchRequestEntry)SendMessageBatchRequestEntry.builder().messageBody(message).id(Integer.toString(i)).build());
                    }
                    if (++j < 10 && i != messages.size() - 1) continue;
                    if (!msgs.isEmpty()) {
                        AWSQueueUtils.getClient().sendMessageBatch(b -> b.queueUrl(queueURL).entries((Collection)msgs));
                        msgs.clear();
                    }
                    j = 0;
                }
            }
            catch (AwsServiceException ase) {
                AWSQueueUtils.logException(ase);
            }
            catch (SdkException ace) {
                logger.error("Could not reach SQS. {}", (Object)ace.toString());
            }
        }
    }

    protected static List<String> pullMessages(String queueURL, int numberOfMessages) {
        ArrayList<String> messages = new ArrayList<String>();
        if (!StringUtils.isBlank((CharSequence)queueURL)) {
            try {
                int maxForBatch;
                int batchSteps = 1;
                if (numberOfMessages > 10) {
                    batchSteps = numberOfMessages / 10 + (numberOfMessages % 10 > 0 ? 1 : 0);
                    maxForBatch = 10;
                } else {
                    maxForBatch = numberOfMessages;
                }
                for (int i = 0; i < batchSteps; ++i) {
                    List list = ((ReceiveMessageResponse)AWSQueueUtils.getClient().receiveMessage(b -> b.queueUrl(queueURL).maxNumberOfMessages(Integer.valueOf(maxForBatch)).waitTimeSeconds(Integer.valueOf(Para.getConfig().queuePollingIntervalSec()))).get()).messages();
                    if (list == null || list.isEmpty()) continue;
                    ArrayList<DeleteMessageBatchRequestEntry> del = new ArrayList<DeleteMessageBatchRequestEntry>();
                    for (Message msg : list) {
                        messages.add(msg.body());
                        del.add((DeleteMessageBatchRequestEntry)DeleteMessageBatchRequestEntry.builder().id(msg.messageId()).receiptHandle(msg.receiptHandle()).build());
                    }
                    AWSQueueUtils.getClient().deleteMessageBatch(b -> b.queueUrl(queueURL).entries((Collection)del));
                }
            }
            catch (AwsServiceException ase) {
                AWSQueueUtils.logException(ase);
            }
            catch (SdkException ace) {
                logger.error("Could not reach SQS: {}", (Object)ace.getMessage());
            }
            catch (ExecutionException ee) {
                logger.error("SQS Execution exception: {}", (Object)ee.getMessage());
            }
            catch (InterruptedException ex) {
                logger.error("Interrupted while pulling messages from queue: {}", (Object)ex.getMessage());
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                logger.error("Error while pulling from queue: {}", (Object)e.getMessage());
            }
        }
        return messages;
    }

    protected static void startPollingForMessages(final String queueURL) {
        if (!StringUtils.isBlank((CharSequence)queueURL) && !POLLING_THREADS.containsKey(queueURL)) {
            logger.info("Starting SQS river using queue {} (polling interval: {}s)", (Object)queueURL, (Object)Para.getConfig().queuePollingIntervalSec());
            POLLING_THREADS.putIfAbsent(queueURL, Para.getExecutorService().submit(new River(){

                @Override
                List<String> pullMessages() {
                    return AWSQueueUtils.pullMessages(queueURL, 10);
                }
            }));
            Para.addDestroyListener((DestroyListener)new DestroyListener(){

                public void onDestroy() {
                    AWSQueueUtils.stopPollingForMessages(queueURL);
                }
            });
        }
    }

    protected static void stopPollingForMessages(String queueURL) {
        if (!StringUtils.isBlank((CharSequence)queueURL) && POLLING_THREADS.containsKey(queueURL)) {
            logger.info("Stopping SQS river on queue {} ...", (Object)queueURL);
            POLLING_THREADS.get(queueURL).cancel(true);
            POLLING_THREADS.remove(queueURL);
        }
    }

    private static void logException(AwsServiceException ase) {
        logger.error("AmazonServiceException: error={}, statuscode={}, awserrcode={}, errmessage={}, reqid={}", new Object[]{ase.toString(), ase.statusCode(), ase.awsErrorDetails().errorCode(), ase.awsErrorDetails().errorMessage(), ase.requestId()});
    }

    static {
        POLLING_THREADS = new ConcurrentHashMap();
        logger = LoggerFactory.getLogger(AWSQueueUtils.class);
    }
}

