/*
 * Decompiled with CFR 0.152.
 */
package cn.jmicro.pubsub;

import cn.jmicro.api.JMicro;
import cn.jmicro.api.JMicroContext;
import cn.jmicro.api.annotation.Cfg;
import cn.jmicro.api.annotation.Component;
import cn.jmicro.api.annotation.Inject;
import cn.jmicro.api.annotation.SMethod;
import cn.jmicro.api.annotation.Service;
import cn.jmicro.api.basket.BasketFactory;
import cn.jmicro.api.basket.IBasket;
import cn.jmicro.api.classloader.RpcClassLoader;
import cn.jmicro.api.config.Config;
import cn.jmicro.api.executor.ExecutorConfig;
import cn.jmicro.api.executor.ExecutorFactory;
import cn.jmicro.api.internal.pubsub.IInternalSubRpc;
import cn.jmicro.api.monitor.LG;
import cn.jmicro.api.objectfactory.IObjectFactory;
import cn.jmicro.api.pubsub.PSData;
import cn.jmicro.api.pubsub.PubSubManager;
import cn.jmicro.api.raft.IDataOperator;
import cn.jmicro.api.utils.TimeUtils;
import cn.jmicro.common.Utils;
import cn.jmicro.common.util.StringUtils;
import cn.jmicro.pubsub.ISubscriberCallback;
import cn.jmicro.pubsub.ItemStorage;
import cn.jmicro.pubsub.PubsubMessageStatis;
import cn.jmicro.pubsub.ResendManager;
import cn.jmicro.pubsub.SendItem;
import cn.jmicro.pubsub.SubcriberManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPool;

@Service(clientId=-1, limit2Packages={"cn.jmicro.api.pubsub"}, version="0.0.1", retryCnt=0, monitorEnable=0, timeout=5000)
@Component(level=5)
public class PubSubServer
implements IInternalSubRpc {
    private static final Logger logger = LoggerFactory.getLogger(PubSubServer.class);
    @Cfg(value="/PubSubServer/openDebug")
    private boolean openDebug = false;
    @Cfg(value="/PubSubServer/maxFailItemCount")
    private int maxFailItemCount = 100;
    @Cfg(value="/PubSubServer/maxCachePersistItem", defGlobal=true)
    private int maxCachePersistItem = 10000000;
    @Cfg(value="/PubSubServer/maxMemoryItem", defGlobal=true)
    private int maxMemoryItem = 1000;
    @Cfg(value="/PubSubServer/reOpenThreadInterval")
    private long reOpenThreadInterval = 1000L;
    @Cfg(value="/PubSubServer/doResendInterval", changeListener="resetResendTimer")
    private long doResendInterval = 1000L;
    private int batchSize = 100;
    private int sendInterval = 300;
    private AtomicInteger cacheItemsCnt = new AtomicInteger();
    @Inject
    private IObjectFactory of;
    @Inject
    private PubSubManager pubsubManager;
    @Inject
    private RpcClassLoader cl;
    @Inject
    private JedisPool cache;
    @Inject
    private IDataOperator dataOp;
    @Inject
    private PubsubMessageStatis sta;
    private SubcriberManager subManager;
    private ResendManager resendManager;
    private ItemStorage<PSData> cacheStorage;
    private BasketFactory<PSData> basketFactory = null;
    private Map<String, List<PSData>> sendCache = new HashMap<String, List<PSData>>();
    private Map<String, Long> lastSendTimes = new HashMap<String, Long>();
    private ExecutorService executor = null;
    private Object syncLocker = new Object();

    public static void main(String[] args) {
        JMicro.getObjectFactoryAndStart((String[])args);
        Utils.getIns().waitForShutdown();
    }

    public void init() {
        this.subManager = new SubcriberManager(this.of, this.openDebug);
        this.resendManager = new ResendManager(this.of, this.openDebug, this.maxFailItemCount, this.doResendInterval);
        this.resendManager.setSubManager(this.subManager);
        this.cacheStorage = new ItemStorage(this.of, "/" + Config.getClientId() + "/pubsubCache/");
        if (this.reOpenThreadInterval <= 0L) {
            logger.warn("Invalid reOpenThreadInterval: {}, set to default:{}", (Object)this.reOpenThreadInterval, (Object)1000);
            this.reOpenThreadInterval = 1000L;
        }
        ExecutorConfig config = new ExecutorConfig();
        config.setMsCoreSize(10);
        config.setMsMaxSize(100);
        config.setTaskQueueSize(10000);
        config.setThreadNamePrefix("PublishExecurot");
        config.setRejectedExecutionHandler((RejectedExecutionHandler)new PubsubServerAbortPolicy());
        this.executor = ((ExecutorFactory)this.of.get(ExecutorFactory.class)).createExecutor(config);
        this.basketFactory = new BasketFactory(2000, 20);
    }

    public void ready() {
        Thread checkThread = new Thread(this::doCheck, "JMicro-" + Config.getInstanceName() + "-PubSubServer");
        checkThread.setDaemon(true);
        checkThread.start();
    }

    public boolean hasTopic(String topic) {
        return this.subManager.isValidTopic(topic);
    }

    @SMethod(timeout=5000, retryCnt=0, asyncable=false, debugMode=0)
    public int publishItem(PSData item) {
        return this.publishItems(item.getTopic(), new PSData[]{item});
    }

    @SMethod(timeout=5000, retryCnt=0, asyncable=false, debugMode=0)
    public int publishString(String topic, String content) {
        if (!this.subManager.isValidTopic(topic)) {
            if (LG.isLoggable((int)2, (int[])new int[0])) {
                LG.log((byte)2, this.getClass(), (String)(" PUB_TOPIC_INVALID for: " + topic));
            }
            return -4;
        }
        PSData item = new PSData();
        item.setTopic(topic);
        item.setData((Object)content);
        return this.publishItems(topic, new PSData[]{item});
    }

    private void foreachItem(PSData[] items, Consumer<PSData> c) {
        for (PSData pd : items) {
            if (pd == null) continue;
            c.accept(pd);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @SMethod(timeout=5000, retryCnt=0, asyncable=false, debugMode=0)
    public int publishItems(String topic, PSData[] items) {
        long curTime = TimeUtils.getCurTime();
        if (items != null && items.length > 0) {
            this.foreachItem(items, pd -> this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)50), 1L));
        }
        if (!this.subManager.isValidTopic(topic)) {
            this.foreachItem(items, pd -> this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)60), 1L));
            if (LG.isLoggable((int)2, (int[])new int[0])) {
                LG.log((byte)2, this.getClass(), (String)(" PUB_TOPIC_INVALID for: " + topic));
            }
            return -4;
        }
        if (items == null || StringUtils.isEmpty((String)topic) || items.length == 0) {
            this.foreachItem(items, pd -> this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)61), 1L));
            if (LG.isLoggable((int)2, (int[])new int[0])) {
                LG.log((byte)2, this.getClass(), (String)(" PUB_SERVER_DISCARD null items for: " + topic));
            }
            return -2;
        }
        long size = this.basketFactory.size() + items.length;
        if (size > (long)this.maxMemoryItem && items.length + this.cacheItemsCnt.get() > this.maxCachePersistItem) {
            this.foreachItem(items, pd -> this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)62), 1L));
            if (LG.isLoggable((int)4, (int[])new int[0])) {
                LG.log((byte)4, this.getClass(), (String)(" PUB_SERVER_BUSUY : " + topic + "send len: " + items.length + " max " + this.maxCachePersistItem));
            }
            return -3;
        }
        if (!this.lastSendTimes.containsKey(topic)) {
            this.lastSendTimes.put(topic, curTime);
        }
        this.foreachItem(items, pd -> this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)55), 1L));
        if (size < (long)this.maxMemoryItem) {
            int len;
            for (int pos = 0; pos < items.length; pos += len) {
                IBasket b = this.basketFactory.borrowWriteBasket(new boolean[]{true});
                if (b != null) {
                    int re;
                    len = re = b.remainding();
                    if (items.length - pos < re) {
                        len = items.length - pos;
                    }
                    if (b.write((Object[])items, pos, len)) {
                        boolean rst = this.basketFactory.returnWriteBasket(b, true);
                        if (rst) {
                            continue;
                        }
                        this.foreachItem(items, pd -> {
                            this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)59), 1L);
                            this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)51), 1L);
                        });
                        String errMsg = "Fail to return basket fail size: " + (items.length - pos);
                        LG.log((byte)5, this.getClass(), (String)errMsg);
                        logger.error(errMsg);
                    } else {
                        this.basketFactory.returnWriteBasket(b, true);
                        String errMsg = "Fail write basket size: " + (items.length - pos);
                        LG.log((byte)5, this.getClass(), (String)errMsg);
                        logger.error(errMsg);
                        this.foreachItem(items, pd -> this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)51), 1L));
                    }
                } else {
                    this.foreachItem(items, pd -> {
                        this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)59), 1L);
                        this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)51), 1L);
                    });
                    String errMsg = "Fail size: " + (items.length - pos);
                    LG.log((byte)5, this.getClass(), (String)errMsg);
                    logger.error(errMsg);
                }
                break;
            }
        } else {
            this.cacheStorage.push(topic, (PSData[])items, 0, items.length);
            this.cacheItemsCnt.addAndGet(items.length);
            this.foreachItem(items, pd -> this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)63), 1L));
            String errMsg = "push to cache :" + items.length + ",total:" + this.cacheItemsCnt.get();
            LG.log((byte)4, this.getClass(), (String)errMsg);
            logger.warn(errMsg);
        }
        Object object = this.syncLocker;
        synchronized (object) {
            this.syncLocker.notifyAll();
        }
        if (JMicroContext.get().isDebug()) {
            JMicroContext.get().appendCurUseTime("pubsub server finishTime", true);
        }
        return 0;
    }

    private int writeBasket(PSData[] items, long curTime) {
        int pos;
        int len;
        for (pos = 0; pos < items.length; pos += len) {
            IBasket b = this.basketFactory.borrowWriteBasket(new boolean[]{true});
            if (b != null) {
                int re;
                len = re = b.remainding();
                if (items.length - pos < re) {
                    len = items.length - pos;
                }
                if (b.write((Object[])items, pos, len)) {
                    boolean rst = this.basketFactory.returnWriteBasket(b, true);
                    if (rst) {
                        continue;
                    }
                    this.foreachItem(items, pd -> {
                        this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)59), 1L);
                        this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)51), 1L);
                    });
                    String errMsg = "Fail to return basket fail size: " + (items.length - pos);
                    LG.log((byte)5, this.getClass(), (String)errMsg);
                    logger.error(errMsg);
                    break;
                }
                this.basketFactory.returnWriteBasket(b, true);
                String errMsg = "Fail write basket size: " + (items.length - pos);
                LG.log((byte)5, this.getClass(), (String)errMsg);
                logger.error(errMsg);
                this.foreachItem(items, pd -> this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)51), 1L));
                break;
            }
            this.foreachItem(items, pd -> {
                this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)59), 1L);
                this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)51), 1L);
            });
            String errMsg = "Fail size: " + (items.length - pos);
            LG.log((byte)5, this.getClass(), (String)errMsg);
            logger.error(errMsg);
            break;
        }
        return pos;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doCheck() {
        block5: while (true) {
            try {
                while (true) {
                    long len = this.basketFactory.size();
                    long curTime = TimeUtils.getCurTime();
                    if (len < (long)this.batchSize) {
                        for (Map.Entry<String, Long> e : this.lastSendTimes.entrySet()) {
                            List<PSData> items;
                            if (curTime - e.getValue() <= (long)this.sendInterval || this.cacheStorage.len((String)e.getKey()) <= 0L || (items = this.cacheStorage.pops((String)e.getKey(), this.batchSize)) == null || items.size() == 0) continue;
                            PSData[] arr = new PSData[items.size()];
                            items.toArray(arr);
                            int size = this.writeBasket(arr, curTime);
                            if (size < arr.length) {
                                this.cacheStorage.push((String)e.getKey(), (PSData[])arr, size, arr.length - size);
                                this.cacheItemsCnt.addAndGet(-size);
                            }
                            if (LG.isLoggable((int)2, (int[])new int[0])) {
                                String errMsg = "begin get items from cache topic:" + (String)e.getKey();
                                LG.log((byte)2, this.getClass(), (String)errMsg);
                            }
                            if (!this.openDebug) continue;
                            logger.info("begin get items from cache topic:{}", e.getKey());
                        }
                    }
                    if ((len = (long)this.basketFactory.size()) == 0L) {
                        Map.Entry<String, Long> e;
                        boolean dor = false;
                        e = this.sendCache.keySet().iterator();
                        while (e.hasNext()) {
                            String topic = (String)e.next();
                            List<PSData> ll = this.sendCache.get(topic);
                            if (ll.isEmpty() || ll.size() <= this.batchSize && curTime - this.lastSendTimes.get(topic) <= (long)this.sendInterval) continue;
                            dor = true;
                            break;
                        }
                        if (dor) {
                            this.doSend(curTime);
                            continue;
                        }
                        e = this.syncLocker;
                        synchronized (e) {
                            this.syncLocker.wait(1000L);
                            continue block5;
                        }
                    }
                    IBasket rb = null;
                    Iterator readIte = this.basketFactory.iterator(true);
                    while ((rb = (IBasket)readIte.next()) != null) {
                        int rm = rb.remainding();
                        if (rm <= 0) continue;
                        Object[] psd = new PSData[rm];
                        if (!rb.readAll(psd)) {
                            this.basketFactory.returnReadSlot(rb, false);
                            rb = null;
                            if (LG.isLoggable((int)2, (int[])new int[0])) {
                                String errMsg = "Fail to get element from basket remaiding:" + rb.remainding();
                                LG.log((byte)2, this.getClass(), (String)errMsg);
                            }
                            if (!this.openDebug) continue;
                            logger.info("Fail to get element from basket remaiding:{}", (Object)rb.remainding());
                            continue;
                        }
                        this.basketFactory.returnReadSlot(rb, true);
                        rb = null;
                        String topic = psd[0].getTopic();
                        List<PSData> ll = this.sendCache.get(topic);
                        if (ll == null) {
                            ll = new ArrayList<PSData>();
                            this.sendCache.put(topic, ll);
                        }
                        ll.addAll(Arrays.asList(psd));
                    }
                    if (this.sendCache.isEmpty()) continue;
                    this.doSend(curTime);
                }
            }
            catch (Throwable e) {
                logger.error("doCheck\u5f02\u5e38", e);
                LG.log((byte)5, this.getClass(), (String)"", (Throwable)e);
                continue;
            }
            break;
        }
    }

    private void doSend(long curTime) {
        int sendSize = 0;
        for (String topic : this.sendCache.keySet()) {
            long lastSendTime = this.lastSendTimes.get(topic);
            List<PSData> ll = this.sendCache.get(topic);
            if (ll.isEmpty() || ll.size() < this.batchSize && curTime - lastSendTime < (long)this.sendInterval) {
                if (!this.openDebug) continue;
                continue;
            }
            this.lastSendTimes.put(topic, curTime);
            int size = ll.size();
            if (size > this.batchSize) {
                size = this.batchSize;
            }
            PSData[] items = new PSData[size];
            Iterator<PSData> ite = ll.iterator();
            for (int i = 0; ite.hasNext() && i < size; ++i) {
                items[i] = ite.next();
                ite.remove();
            }
            sendSize += items.length;
            this.executor.submit(new Worker(items, topic));
            this.foreachItem(items, pd -> this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)64), 1L));
        }
    }

    private class PubsubServerAbortPolicy
    implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            String errMsg = "JMcro Pubsub Server Task " + r.toString() + " rejected from " + e.toString();
            LG.log((byte)5, this.getClass(), (String)errMsg);
            throw new RejectedExecutionException(errMsg);
        }
    }

    private class Worker
    implements Runnable {
        private PSData[] items = null;
        private Set<ISubscriberCallback> subscribers = null;
        private String topic = null;

        public Worker(PSData[] items, String topic) {
            this.items = items;
            this.topic = topic;
            this.subscribers = PubSubServer.this.subManager.getCallback(topic);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                long curTime = TimeUtils.getCurTime();
                if (this.subscribers == null || this.subscribers.isEmpty()) {
                    SendItem si = new SendItem(1, null, this.items, 0);
                    PubSubServer.this.resendManager.queueItem(si);
                    PubSubServer.this.foreachItem(this.items, pd -> PubSubServer.this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)66), 1L));
                    String errMsg = "Push to resend component topic:" + this.topic;
                    LG.log((byte)5, this.getClass(), (String)errMsg);
                    logger.error(errMsg);
                } else {
                    for (ISubscriberCallback cb : this.subscribers) {
                        try {
                            cb.onMessage(this.items).then((psds, fail, ctx) -> {
                                if (psds != null && ((PSData[])psds).length > 0) {
                                    if (fail != null) {
                                        logger.error(fail.toString());
                                    }
                                    SendItem si = new SendItem(1, cb, (PSData[])psds, 0);
                                    PubSubServer.this.resendManager.queueItem(si);
                                    PubSubServer.this.foreachItem(this.items, pd -> PubSubServer.this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)65), 1L));
                                    String errMsg = "Push to resend component:" + cb.getSm().getKey().toKey(true, true, true);
                                    LG.log((byte)5, this.getClass(), (String)errMsg);
                                    logger.error(errMsg);
                                }
                            });
                            PubSubServer.this.foreachItem(this.items, pd -> PubSubServer.this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)53), 1L));
                        }
                        catch (Throwable e) {
                            SendItem si = new SendItem(1, cb, this.items, 0);
                            PubSubServer.this.resendManager.queueItem(si);
                            PubSubServer.this.foreachItem(this.items, pd -> PubSubServer.this.sta.getSc(pd.getTopic(), pd.getSrcClientId()).add(Short.valueOf((short)65), 1L));
                            String errMsg = "Worker get exception";
                            LG.log((byte)5, this.getClass(), (String)errMsg, (Throwable)e);
                            logger.error(errMsg, e);
                        }
                    }
                }
            }
            finally {
                Thread.currentThread().setContextClassLoader(null);
            }
        }
    }
}

