/*
 * Decompiled with CFR 0.152.
 */
package cn.jmicro.pubsub;

import cn.jmicro.api.config.Config;
import cn.jmicro.api.executor.ExecutorConfig;
import cn.jmicro.api.executor.ExecutorFactory;
import cn.jmicro.api.monitor.LG;
import cn.jmicro.api.objectfactory.IObjectFactory;
import cn.jmicro.api.pubsub.PSData;
import cn.jmicro.api.timer.TimerTicker;
import cn.jmicro.common.util.JsonUtils;
import cn.jmicro.pubsub.ISubscriberCallback;
import cn.jmicro.pubsub.ItemStorage;
import cn.jmicro.pubsub.PubSubServer;
import cn.jmicro.pubsub.SendItem;
import cn.jmicro.pubsub.SubcriberManager;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ResendManager {
    private static final String RESEND_TIMER = "PubsubServerResendTimer";
    private static final Logger logger = LoggerFactory.getLogger(ResendManager.class);
    private ItemStorage<SendItem> resendStorage;
    private ItemStorage<SendItem> failStorage;
    private boolean openDebug = false;
    private IObjectFactory of;
    private Map<Long, TimerTicker> resendTimers = new ConcurrentHashMap<Long, TimerTicker>();
    private Map<String, List<SendItem>> sendItems = new HashMap<String, List<SendItem>>();
    private int maxFailItemCount = 100000;
    private long doResendInterval = 1000L;
    private SubcriberManager subManager;
    private ExecutorService executor = null;

    ResendManager(IObjectFactory of, boolean openDebug, int maxFailItemCount, long doResendInterval) {
        this.openDebug = openDebug;
        this.of = of;
        this.resendStorage = new ItemStorage(of, "/" + Config.getClientId() + "/pubsubResend/");
        this.failStorage = new ItemStorage(of, "/" + Config.getClientId() + "/failItem/");
        this.maxFailItemCount = maxFailItemCount;
        this.doResendInterval = doResendInterval;
        if (this.maxFailItemCount <= 0) {
            logger.warn("Invalid maxFailItemCount: {}, set to default:{}", (Object)this.maxFailItemCount, (Object)10000);
            this.maxFailItemCount = 10000;
        }
        ExecutorConfig config = new ExecutorConfig();
        config.setMsCoreSize(1);
        config.setMsMaxSize(30);
        config.setTaskQueueSize(5000);
        config.setThreadNamePrefix("ResendManager");
        this.executor = ((ExecutorFactory)of.get(ExecutorFactory.class)).createExecutor(config);
        this.resetResendTimer();
    }

    private void resetResendTimer() {
        logger.info("Reset timer with doResendInterval0:{},doResendInterval:{}", (Object)this.doResendInterval, (Object)this.doResendInterval);
        TimerTicker.getTimer(this.resendTimers, (Long)this.doResendInterval).removeListener(RESEND_TIMER, true);
        TimerTicker.getTimer(this.resendTimers, (Long)this.doResendInterval).setOpenDebug(this.openDebug).addListener(RESEND_TIMER, null, (key, att) -> {
            try {
                this.doResend();
            }
            catch (Throwable e) {
                logger.error("Submit doResend fail: ", e);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doResend() {
        int batchSize = 10;
        int total = 500;
        int curCnt = 0;
        if (this.sendItems.isEmpty()) {
            Set<String> keys = this.subManager.topics();
            for (String k : keys) {
                long l = 0L;
                l = this.resendStorage.len(k);
                if (l <= 0L) continue;
                if (l > (long)batchSize) {
                    l = batchSize;
                }
                List<SendItem> ll = this.resendStorage.pops(k, l);
                this.sendItems.put(k, ll);
                if ((curCnt += ll.size()) <= total) continue;
                break;
            }
        }
        if (this.sendItems.isEmpty()) {
            return;
        }
        for (Map.Entry<String, List<SendItem>> e : this.sendItems.entrySet()) {
            if (e.getValue().isEmpty()) continue;
            List<SendItem> ll = e.getValue();
            int size = ll.size();
            if (size > batchSize) {
                size = batchSize;
            }
            List<SendItem> list = ll;
            synchronized (list) {
                Iterator<SendItem> ite = ll.iterator();
                for (int i = 0; ite.hasNext() && i < batchSize; ++i) {
                    SendItem si = ite.next();
                    this.executor.submit(new Worker(si));
                    ite.remove();
                }
            }
        }
    }

    void queueItem(SendItem item) {
        if (item.retryCnt > 2) {
            this.failStorage.push(item.topic, item);
            logger.error("Fail item:" + JsonUtils.getIns().toJson((Object)item));
            if (item.cb != null) {
                // empty if block
            }
        } else {
            long l = this.resendStorage.len(item.topic);
            if (l < (long)this.maxFailItemCount) {
                this.resendStorage.push(item.topic, item);
            } else {
                this.failStorage.push(item.topic, item);
                logger.error("\u7f13\u5b58\u6d88\u606f\u91cf\u5df2\u7ecf\u8fbe\u4e0a\u9650\uff1a" + JsonUtils.getIns().toJson((Object)item));
                LG.log((byte)5, PubSubServer.class, (String)("\u7f13\u5b58\u6d88\u606f\u91cf\u5df2\u7ecf\u8fbe\u4e0a\u9650\uff1a" + JsonUtils.getIns().toJson((Object)item)));
            }
        }
    }

    public void setSubManager(SubcriberManager subManager) {
        this.subManager = subManager;
    }

    private class Worker
    implements Runnable {
        private SendItem item = null;
        private Set<ISubscriberCallback> callbacks = null;
        private ISubscriberCallback callback = null;

        public Worker(SendItem item) {
            this.item = item;
            if (item.sm == null) {
                this.callbacks = ResendManager.this.subManager.getCallback(item.topic);
            } else {
                this.callback = ResendManager.this.subManager.getCallback(item.sm);
            }
        }

        @Override
        public void run() {
            try {
                if (this.callback == null && this.callbacks == null) {
                    ResendManager.this.queueItem(this.item);
                } else if (this.callback != null) {
                    this.callback.onMessage(this.item.items).then((psds, fail, ctx) -> {
                        if (psds != null && ((PSData[])psds).length > 0) {
                            this.item.items = psds;
                            ResendManager.this.queueItem(this.item);
                        } else if (fail != null) {
                            logger.error(fail.toString());
                        }
                    });
                } else if (this.callbacks.isEmpty()) {
                    ResendManager.this.queueItem(this.item);
                } else {
                    for (ISubscriberCallback c : this.callbacks) {
                        c.onMessage(this.item.items).then((psds, fail, ctx) -> {
                            if (psds != null && ((PSData[])psds).length > 0) {
                                SendItem si = new SendItem(1, c, (PSData[])psds, this.item.retryCnt);
                                ResendManager.this.queueItem(si);
                            } else if (fail != null) {
                                logger.error(fail.toString());
                            }
                        });
                    }
                }
            }
            catch (Throwable e) {
                logger.error("", e);
                ResendManager.this.queueItem(this.item);
            }
        }
    }
}

