/*
 * Decompiled with CFR 0.152.
 */
package com.erudika.para.server.queue;

import com.erudika.para.core.App;
import com.erudika.para.core.ParaObject;
import com.erudika.para.core.Sysprop;
import com.erudika.para.core.Webhook;
import com.erudika.para.core.annotations.Locked;
import com.erudika.para.core.utils.Pager;
import com.erudika.para.core.utils.Para;
import com.erudika.para.core.utils.ParaObjectUtils;
import com.erudika.para.core.utils.Utils;
import com.erudika.para.server.utils.HealthUtils;
import com.fasterxml.jackson.databind.ObjectReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class River
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(River.class);
    private static final CloseableHttpClient HTTP;
    private static ConcurrentHashMap<String, Integer> pendingIds;

    abstract List<String> pullMessages();

    @Override
    public void run() {
        LinkedList<ParaObject> createList = new LinkedList<ParaObject>();
        LinkedList<ParaObject> updateList = new LinkedList<ParaObject>();
        LinkedList<ParaObject> deleteList = new LinkedList<ParaObject>();
        ObjectReader jreader = ParaObjectUtils.getJsonReader(Map.class);
        int idleCount = 0;
        try {
            while (!Thread.interrupted()) {
                int sleep;
                logger.debug("Waiting {}s for messages...", (Object)Para.getConfig().queuePollingIntervalSec());
                int processedHooks = 0;
                List<Object> msgs = Collections.emptyList();
                if (HealthUtils.getInstance().isHealthy()) {
                    try {
                        msgs = this.pullMessages();
                        logger.debug("Pulled {} messages from queue.", (Object)msgs.size());
                        for (String string : msgs) {
                            logger.debug("Message from queue: {}", (Object)string);
                            if (!StringUtils.contains((CharSequence)string, (CharSequence)"appid") || !StringUtils.contains((CharSequence)string, (CharSequence)"type")) continue;
                            processedHooks += this.parseAndCategorizeMessage((Map)jreader.readValue(string), createList, updateList, deleteList);
                        }
                    }
                    catch (Exception e) {
                        logger.error("Batch processing operation failed:", (Throwable)e);
                    }
                }
                if (!(createList.isEmpty() && updateList.isEmpty() && deleteList.isEmpty() && processedHooks <= 0)) {
                    logger.debug("River summary: {} created, {} updated, {} deleted, {} webhooks delivered.", new Object[]{createList.size(), updateList.size(), deleteList.size(), processedHooks});
                    this.persistChanges(createList, updateList, deleteList);
                    idleCount = 0;
                    continue;
                }
                if (!msgs.isEmpty() || (sleep = Para.getConfig().queuePollingWaitSec()) <= 0 || ++idleCount < 3) continue;
                logger.debug("Queue is empty. Sleeping for {}s...", (Object)sleep);
                Thread.sleep((long)sleep * 1000L);
            }
        }
        catch (InterruptedException ex) {
            logger.info("River interrupted: {}", (Object)ex.getMessage());
            Thread.currentThread().interrupt();
        }
    }

    private int parseAndCategorizeMessage(Map<String, Object> parsed, List<ParaObject> createList, List<ParaObject> updateList, List<ParaObject> deleteList) {
        String id = parsed.containsKey("id") ? (String)parsed.get("id") : null;
        String type = (String)parsed.get("type");
        String appid = (String)parsed.get("appid");
        Class clazz = ParaObjectUtils.toClass((String)type);
        boolean isWhitelistedType = clazz.equals(Sysprop.class);
        if (!StringUtils.isBlank((CharSequence)appid) && isWhitelistedType) {
            if ("webhookpayload".equals(type)) {
                return this.processWebhookPayload(appid, id, parsed);
            }
            if ("indexpayload".equals(type)) {
                return this.processIndexPayload(appid, id, parsed);
            }
            if (parsed.containsKey("_delete") && "true".equals(parsed.get("_delete")) && id != null) {
                Sysprop s = new Sysprop(id);
                s.setAppid(appid);
                deleteList.add((ParaObject)s);
            } else if (id == null || "true".equals(parsed.get("_create"))) {
                ParaObject obj = ParaObjectUtils.setAnnotatedFields(parsed);
                if (obj != null) {
                    createList.add(obj);
                }
            } else {
                updateList.add(ParaObjectUtils.setAnnotatedFields((ParaObject)Para.getDAO().read(appid, id), parsed, Locked.class));
            }
        }
        return 0;
    }

    protected int processWebhookPayload(String appid, String id, Map<String, Object> parsed) {
        if (!Para.getConfig().webhooksEnabled() || !parsed.containsKey("targetUrl") || StringUtils.isBlank((CharSequence)id) || parsed.isEmpty()) {
            return 0;
        }
        try {
            boolean urlEncoded = (Boolean)parsed.get("urlEncoded");
            String targetUrl = StringUtils.trimToEmpty((String)((String)parsed.get("targetUrl")));
            String payload = (String)parsed.get("payload");
            Integer repeatDelivery = Math.abs(NumberUtils.toInt((String)("" + parsed.get("repeatedDeliveryAttempts")), (int)1));
            HttpPost postToTarget = new HttpPost(targetUrl);
            postToTarget.addHeader("User-Agent", (Object)("Para Webhook Dispacher " + Para.getVersion()));
            postToTarget.setHeader("Content-Type", (Object)(urlEncoded ? "application/x-www-form-urlencoded" : "application/json"));
            postToTarget.setHeader("X-Webhook-Signature", (Object)((String)parsed.get("signature")));
            postToTarget.setHeader("X-Para-Event", (Object)((String)parsed.get("event")));
            if (urlEncoded) {
                postToTarget.setEntity((HttpEntity)new StringEntity("payload=".concat(Utils.urlEncode((String)payload)), Charset.forName(Para.getConfig().defaultEncoding())));
            } else {
                postToTarget.setEntity((HttpEntity)new StringEntity(payload, Charset.forName(Para.getConfig().defaultEncoding())));
            }
            if (repeatDelivery > 100) {
                repeatDelivery = 100;
            }
            IntStream.range(0, Math.max(1, repeatDelivery)).parallel().forEach(r -> {
                boolean ok = false;
                String status = "";
                try (CloseableHttpResponse resp1 = HTTP.execute((ClassicHttpRequest)postToTarget);){
                    if (resp1 != null && Math.abs(resp1.getCode() - 200) > 10) {
                        status = resp1.getReasonPhrase();
                        logger.info("Webhook {} delivery failed! {} responded with code {} {} instead of 2xx.", new Object[]{id, targetUrl, resp1.getCode(), resp1.getReasonPhrase()});
                    } else {
                        logger.debug("Webhook {} delivered to {} successfully.", (Object)id, (Object)targetUrl);
                        ok = true;
                    }
                }
                catch (Exception e) {
                    logger.info("Webhook {} not delivered! {} isn't responding. {}", new Object[]{id, targetUrl, status});
                }
                finally {
                    if (!ok) {
                        this.updateFailureCount(appid, id);
                    }
                }
            });
            return 1;
        }
        catch (Exception e) {
            logger.error("Webhook payload was not delivered:", (Throwable)e);
            return 0;
        }
    }

    protected int processIndexPayload(String appid, String opId, Map<String, Object> parsed) {
        if (!Para.getConfig().isSearchEnabled() || StringUtils.isBlank((CharSequence)opId) || parsed.isEmpty()) {
            return 0;
        }
        Object payload = parsed.get("payload");
        try {
            switch (opId) {
                case "index_all_op": {
                    this.indexAllWithRetry(appid, payload);
                    break;
                }
                case "unindex_all_op": {
                    Para.getSearch().unindexAll(appid, this.getPayloadObjects(appid, payload));
                    break;
                }
                case "rebuild_index_op": {
                    App app = (App)ParaObjectUtils.setAnnotatedFields((Map)((Map)payload));
                    Para.getSearch().rebuildIndex(Para.getDAO(), app, "", new Pager[0]);
                    break;
                }
                case "create_index_op": {
                    App app = (App)ParaObjectUtils.setAnnotatedFields((Map)((Map)payload));
                    Para.getSearch().createIndex(app);
                    break;
                }
                case "delete_index_op": {
                    App app = (App)ParaObjectUtils.setAnnotatedFields((Map)((Map)payload));
                    Para.getSearch().deleteIndex(app);
                    break;
                }
            }
            return 1;
        }
        catch (Exception e) {
            logger.error("Indexing operation " + opId + " failed for app '" + appid + "'!", (Throwable)e);
            return 0;
        }
    }

    private void persistChanges(List<ParaObject> createList, List<ParaObject> updateList, List<ParaObject> deleteList) {
        if (!createList.isEmpty()) {
            Para.getDAO().createAll(createList);
        }
        if (!updateList.isEmpty()) {
            Para.getDAO().updateAll(updateList);
        }
        if (!deleteList.isEmpty()) {
            Para.getDAO().deleteAll(deleteList);
        }
        createList.clear();
        updateList.clear();
        deleteList.clear();
    }

    private void indexAllWithRetry(String appid, Object payload) {
        List ids = Optional.ofNullable(payload).orElse(Collections.emptyList());
        Para.getCache().removeAll(appid, ids);
        Map objs = Para.getDAO().readAll(appid, ids, true);
        Para.getSearch().indexAll(appid, objs.values().stream().filter(v -> v != null).collect(Collectors.toList()));
        if (objs.containsValue(null)) {
            if (pendingIds == null) {
                pendingIds = new ConcurrentHashMap();
            }
            objs.entrySet().stream().filter(entry -> entry.getValue() == null).forEachOrdered(entry -> pendingIds.putIfAbsent((String)entry.getKey(), 1));
            logger.debug("Some objects are missing from local database while performing 'index_all_op': {}", pendingIds);
            Para.asyncExecute(() -> {
                try {
                    for (int i = 0; i < Para.getConfig().riverMaxIndexingRetries(); ++i) {
                        Thread.sleep(1000L * (long)(i + 1));
                        Map pending = Para.getDAO().readAll(appid, new ArrayList(pendingIds.keySet()), true);
                        int pendingCount = pendingIds.size();
                        pending.entrySet().stream().filter(entry -> entry.getValue() != null).forEachOrdered(entry -> pendingIds.remove(entry.getKey()));
                        if (pendingCount != pendingIds.size()) {
                            Para.getSearch().indexAll(appid, pending.values().stream().collect(Collectors.toList()));
                        }
                        if (!pendingIds.isEmpty()) continue;
                        break;
                    }
                }
                catch (InterruptedException ex) {
                    logger.info("Retry indexing operation interrupted: {}", (Object)ex.getMessage());
                    Thread.currentThread().interrupt();
                }
                finally {
                    if (!pendingIds.isEmpty()) {
                        logger.warn("Indexing operation 'index_all_op' failed for objects {} as they were not found in the database for app '{}'. This may cause the index to become out of sync or corrupted.", pendingIds, (Object)appid);
                        pendingIds.clear();
                    }
                }
            });
        }
    }

    private List<ParaObject> getPayloadObjects(String appid, Object payload) {
        List ids = Optional.ofNullable(payload).orElse(Collections.emptyList());
        Para.getCache().removeAll(appid, ids);
        return ids.stream().map(id -> {
            Sysprop s = new Sysprop(id);
            s.setAppid(appid);
            return s;
        }).collect(Collectors.toList());
    }

    private void updateFailureCount(String appid, String id) {
        String countId = "failed_webhook_count" + Para.getConfig().separator() + id;
        Integer count = (Integer)Para.getCache().get(appid, countId);
        if (count == null) {
            count = 0;
        }
        if (count >= Para.getConfig().maxFailedWebhookAttempts() - 1) {
            Webhook hook = (Webhook)Para.getDAO().read(appid, id);
            if (hook != null) {
                hook.setActive(Boolean.valueOf(false));
                hook.setTooManyFailures(Boolean.valueOf(true));
                Para.getDAO().update(appid, (ParaObject)hook);
                Para.getCache().remove(appid, countId);
                logger.info("Webhook {} was disabled - a maximum of {} failed deliveries was reached.", (Object)id, (Object)Para.getConfig().maxFailedWebhookAttempts());
            }
        } else {
            count = count + 1;
            Para.getCache().put(appid, countId, (Object)count);
        }
    }

    static {
        int timeout = 10;
        HTTP = HttpClientBuilder.create().setConnectionReuseStrategy((hr, hr1, hc) -> false).setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout((long)timeout, TimeUnit.SECONDS).setConnectionRequestTimeout((long)timeout, TimeUnit.SECONDS).build()).build();
    }
}

