package cn.jmicro.pubsub;

import cn.jmicro.api.JMicro;
import cn.jmicro.api.JMicroContext;
import cn.jmicro.api.annotation.Cfg;
import cn.jmicro.api.annotation.Component;
import cn.jmicro.api.annotation.Inject;
import cn.jmicro.api.annotation.SMethod;
import cn.jmicro.api.annotation.Service;
import cn.jmicro.api.basket.BasketFactory;
import cn.jmicro.api.basket.IBasket;
import cn.jmicro.api.classloader.RpcClassLoader;
import cn.jmicro.api.config.Config;
import cn.jmicro.api.executor.ExecutorConfig;
import cn.jmicro.api.executor.ExecutorFactory;
import cn.jmicro.api.internal.pubsub.IInternalSubRpc;
import cn.jmicro.api.monitor.LG;
import cn.jmicro.api.objectfactory.IObjectFactory;
import cn.jmicro.api.pubsub.PSData;
import cn.jmicro.api.pubsub.PubSubManager;
import cn.jmicro.api.raft.IDataOperator;
import cn.jmicro.api.utils.TimeUtils;
import cn.jmicro.common.Utils;
import cn.jmicro.common.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPool;

@Service(clientId = -1, limit2Packages = {"cn.jmicro.api.pubsub"}, version = "0.0.1", retryCnt = 0, monitorEnable = 0, timeout = 5000)
@Component(level = 5)
/* loaded from: input_file:cn/jmicro/pubsub/PubSubServer.class */
public class PubSubServer implements IInternalSubRpc {
    private static final Logger logger = LoggerFactory.getLogger(PubSubServer.class);

    @Inject
    private IObjectFactory of;

    @Inject
    private PubSubManager pubsubManager;

    @Inject
    private RpcClassLoader cl;

    @Inject
    private JedisPool cache;

    @Inject
    private IDataOperator dataOp;

    @Inject
    private PubsubMessageStatis sta;
    private SubcriberManager subManager;
    private ResendManager resendManager;
    private ItemStorage<PSData> cacheStorage;

    @Cfg("/PubSubServer/openDebug")
    private boolean openDebug = false;

    @Cfg("/PubSubServer/maxFailItemCount")
    private int maxFailItemCount = 100;

    @Cfg(value = "/PubSubServer/maxCachePersistItem", defGlobal = true)
    private int maxCachePersistItem = 10000000;

    @Cfg(value = "/PubSubServer/maxMemoryItem", defGlobal = true)
    private int maxMemoryItem = 1000;

    @Cfg("/PubSubServer/reOpenThreadInterval")
    private long reOpenThreadInterval = 1000;

    @Cfg(value = "/PubSubServer/doResendInterval", changeListener = "resetResendTimer")
    private long doResendInterval = 1000;
    private int batchSize = 100;
    private int sendInterval = 300;
    private AtomicInteger cacheItemsCnt = new AtomicInteger();
    private BasketFactory<PSData> basketFactory = null;
    private Map<String, List<PSData>> sendCache = new HashMap();
    private Map<String, Long> lastSendTimes = new HashMap();
    private ExecutorService executor = null;
    private Object syncLocker = new Object();

    /* loaded from: input_file:cn/jmicro/pubsub/PubSubServer$PubsubServerAbortPolicy.class */
    private class PubsubServerAbortPolicy implements RejectedExecutionHandler {
        public PubsubServerAbortPolicy() {
        }

        @Override // java.util.concurrent.RejectedExecutionHandler
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            String str = "JMcro Pubsub Server Task " + runnable.toString() + " rejected from " + threadPoolExecutor.toString();
            LG.log((byte) 5, getClass(), str);
            throw new RejectedExecutionException(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cn/jmicro/pubsub/PubSubServer$Worker.class */
    public class Worker implements Runnable {
        private PSData[] items;
        private Set<ISubscriberCallback> subscribers;
        private String topic;

        public Worker(PSData[] pSDataArr, String str) {
            this.items = null;
            this.subscribers = null;
            this.topic = null;
            this.items = pSDataArr;
            this.topic = str;
            this.subscribers = PubSubServer.this.subManager.getCallback(str);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                TimeUtils.getCurTime();
                if (this.subscribers == null || this.subscribers.isEmpty()) {
                    PubSubServer.this.resendManager.queueItem(new SendItem(1, null, this.items, 0));
                    PubSubServer.this.foreachItem(this.items, pSData -> {
                        PubSubServer.this.sta.getSc(pSData.getTopic(), Integer.valueOf(pSData.getSrcClientId())).add((short) 66, 1L);
                    });
                    String str = "Push to resend component topic:" + this.topic;
                    LG.log((byte) 5, getClass(), str);
                    PubSubServer.logger.error(str);
                } else {
                    for (ISubscriberCallback iSubscriberCallback : this.subscribers) {
                        try {
                            iSubscriberCallback.onMessage(this.items).then((pSDataArr, asyncFailResult, obj) -> {
                                if (pSDataArr == null || pSDataArr.length <= 0) {
                                    return;
                                }
                                if (asyncFailResult != null) {
                                    PubSubServer.logger.error(asyncFailResult.toString());
                                }
                                PubSubServer.this.resendManager.queueItem(new SendItem(1, iSubscriberCallback, pSDataArr, 0));
                                PubSubServer.this.foreachItem(this.items, pSData2 -> {
                                    PubSubServer.this.sta.getSc(pSData2.getTopic(), Integer.valueOf(pSData2.getSrcClientId())).add((short) 65, 1L);
                                });
                                String str2 = "Push to resend component:" + iSubscriberCallback.getSm().getKey().toKey(true, true, true);
                                LG.log((byte) 5, getClass(), str2);
                                PubSubServer.logger.error(str2);
                            });
                            PubSubServer.this.foreachItem(this.items, pSData2 -> {
                                PubSubServer.this.sta.getSc(pSData2.getTopic(), Integer.valueOf(pSData2.getSrcClientId())).add((short) 53, 1L);
                            });
                        } catch (Throwable th) {
                            PubSubServer.this.resendManager.queueItem(new SendItem(1, iSubscriberCallback, this.items, 0));
                            PubSubServer.this.foreachItem(this.items, pSData3 -> {
                                PubSubServer.this.sta.getSc(pSData3.getTopic(), Integer.valueOf(pSData3.getSrcClientId())).add((short) 65, 1L);
                            });
                            LG.log((byte) 5, getClass(), "Worker get exception", th);
                            PubSubServer.logger.error("Worker get exception", th);
                        }
                    }
                }
                Thread.currentThread().setContextClassLoader(null);
            } catch (Throwable th2) {
                Thread.currentThread().setContextClassLoader(null);
                throw th2;
            }
        }
    }

    public static void main(String[] strArr) {
        JMicro.getObjectFactoryAndStart(strArr);
        Utils.getIns().waitForShutdown();
    }

    public void init() {
        this.subManager = new SubcriberManager(this.of, this.openDebug);
        this.resendManager = new ResendManager(this.of, this.openDebug, this.maxFailItemCount, this.doResendInterval);
        this.resendManager.setSubManager(this.subManager);
        this.cacheStorage = new ItemStorage<>(this.of, "/" + Config.getClientId() + "/pubsubCache/");
        if (this.reOpenThreadInterval <= 0) {
            logger.warn("Invalid reOpenThreadInterval: {}, set to default:{}", Long.valueOf(this.reOpenThreadInterval), 1000);
            this.reOpenThreadInterval = 1000L;
        }
        ExecutorConfig executorConfig = new ExecutorConfig();
        executorConfig.setMsCoreSize(10);
        executorConfig.setMsMaxSize(100);
        executorConfig.setTaskQueueSize(10000);
        executorConfig.setThreadNamePrefix("PublishExecurot");
        executorConfig.setRejectedExecutionHandler(new PubsubServerAbortPolicy());
        this.executor = ((ExecutorFactory) this.of.get(ExecutorFactory.class)).createExecutor(executorConfig);
        this.basketFactory = new BasketFactory<>(2000, 20);
    }

    public void ready() {
        Thread thread = new Thread(this::doCheck, "JMicro-" + Config.getInstanceName() + "-PubSubServer");
        thread.setDaemon(true);
        thread.start();
    }

    public boolean hasTopic(String str) {
        return this.subManager.isValidTopic(str);
    }

    @SMethod(timeout = 5000, retryCnt = 0, asyncable = false, debugMode = 0)
    public int publishItem(PSData pSData) {
        return publishItems(pSData.getTopic(), new PSData[]{pSData});
    }

    @SMethod(timeout = 5000, retryCnt = 0, asyncable = false, debugMode = 0)
    public int publishString(String str, String str2) {
        if (this.subManager.isValidTopic(str)) {
            PSData pSData = new PSData();
            pSData.setTopic(str);
            pSData.setData(str2);
            return publishItems(str, new PSData[]{pSData});
        }
        if (!LG.isLoggable(2, new int[0])) {
            return -4;
        }
        LG.log((byte) 2, getClass(), " PUB_TOPIC_INVALID for: " + str);
        return -4;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void foreachItem(PSData[] pSDataArr, Consumer<PSData> consumer) {
        for (PSData pSData : pSDataArr) {
            if (pSData != null) {
                consumer.accept(pSData);
            }
        }
    }

    @SMethod(timeout = 5000, retryCnt = 0, asyncable = false, debugMode = 0)
    public int publishItems(String str, PSData[] pSDataArr) {
        long curTime = TimeUtils.getCurTime();
        if (pSDataArr != null && pSDataArr.length > 0) {
            foreachItem(pSDataArr, pSData -> {
                this.sta.getSc(pSData.getTopic(), Integer.valueOf(pSData.getSrcClientId())).add((short) 50, 1L);
            });
        }
        if (!this.subManager.isValidTopic(str)) {
            foreachItem(pSDataArr, pSData2 -> {
                this.sta.getSc(pSData2.getTopic(), Integer.valueOf(pSData2.getSrcClientId())).add((short) 60, 1L);
            });
            if (!LG.isLoggable(2, new int[0])) {
                return -4;
            }
            LG.log((byte) 2, getClass(), " PUB_TOPIC_INVALID for: " + str);
            return -4;
        }
        if (pSDataArr == null || StringUtils.isEmpty(str) || pSDataArr.length == 0) {
            foreachItem(pSDataArr, pSData3 -> {
                this.sta.getSc(pSData3.getTopic(), Integer.valueOf(pSData3.getSrcClientId())).add((short) 61, 1L);
            });
            if (!LG.isLoggable(2, new int[0])) {
                return -2;
            }
            LG.log((byte) 2, getClass(), " PUB_SERVER_DISCARD null items for: " + str);
            return -2;
        }
        long size = this.basketFactory.size() + pSDataArr.length;
        if (size > this.maxMemoryItem && pSDataArr.length + this.cacheItemsCnt.get() > this.maxCachePersistItem) {
            foreachItem(pSDataArr, pSData4 -> {
                this.sta.getSc(pSData4.getTopic(), Integer.valueOf(pSData4.getSrcClientId())).add((short) 62, 1L);
            });
            if (!LG.isLoggable(4, new int[0])) {
                return -3;
            }
            LG.log((byte) 4, getClass(), " PUB_SERVER_BUSUY : " + str + "send len: " + pSDataArr.length + " max " + this.maxCachePersistItem);
            return -3;
        }
        if (!this.lastSendTimes.containsKey(str)) {
            this.lastSendTimes.put(str, Long.valueOf(curTime));
        }
        foreachItem(pSDataArr, pSData5 -> {
            this.sta.getSc(pSData5.getTopic(), Integer.valueOf(pSData5.getSrcClientId())).add((short) 55, 1L);
        });
        if (size < this.maxMemoryItem) {
            int i = 0;
            while (true) {
                int i2 = i;
                if (i2 < pSDataArr.length) {
                    IBasket borrowWriteBasket = this.basketFactory.borrowWriteBasket(new boolean[]{true});
                    if (borrowWriteBasket == null) {
                        foreachItem(pSDataArr, pSData6 -> {
                            this.sta.getSc(pSData6.getTopic(), Integer.valueOf(pSData6.getSrcClientId())).add((short) 59, 1L);
                            this.sta.getSc(pSData6.getTopic(), Integer.valueOf(pSData6.getSrcClientId())).add((short) 51, 1L);
                        });
                        String str2 = "Fail size: " + (pSDataArr.length - i2);
                        LG.log((byte) 5, getClass(), str2);
                        logger.error(str2);
                        break;
                    }
                    int remainding = borrowWriteBasket.remainding();
                    int i3 = remainding;
                    if (pSDataArr.length - i2 < remainding) {
                        i3 = pSDataArr.length - i2;
                    }
                    if (!borrowWriteBasket.write(pSDataArr, i2, i3)) {
                        this.basketFactory.returnWriteBasket(borrowWriteBasket, true);
                        String str3 = "Fail write basket size: " + (pSDataArr.length - i2);
                        LG.log((byte) 5, getClass(), str3);
                        logger.error(str3);
                        foreachItem(pSDataArr, pSData7 -> {
                            this.sta.getSc(pSData7.getTopic(), Integer.valueOf(pSData7.getSrcClientId())).add((short) 51, 1L);
                        });
                        break;
                    }
                    if (!this.basketFactory.returnWriteBasket(borrowWriteBasket, true)) {
                        foreachItem(pSDataArr, pSData8 -> {
                            this.sta.getSc(pSData8.getTopic(), Integer.valueOf(pSData8.getSrcClientId())).add((short) 59, 1L);
                            this.sta.getSc(pSData8.getTopic(), Integer.valueOf(pSData8.getSrcClientId())).add((short) 51, 1L);
                        });
                        String str4 = "Fail to return basket fail size: " + (pSDataArr.length - i2);
                        LG.log((byte) 5, getClass(), str4);
                        logger.error(str4);
                        break;
                    }
                    i = i2 + i3;
                } else {
                    break;
                }
            }
        } else {
            this.cacheStorage.push(str, pSDataArr, 0, pSDataArr.length);
            this.cacheItemsCnt.addAndGet(pSDataArr.length);
            foreachItem(pSDataArr, pSData9 -> {
                this.sta.getSc(pSData9.getTopic(), Integer.valueOf(pSData9.getSrcClientId())).add((short) 63, 1L);
            });
            String str5 = "push to cache :" + pSDataArr.length + ",total:" + this.cacheItemsCnt.get();
            LG.log((byte) 4, getClass(), str5);
            logger.warn(str5);
        }
        synchronized (this.syncLocker) {
            this.syncLocker.notifyAll();
        }
        if (!JMicroContext.get().isDebug()) {
            return 0;
        }
        JMicroContext.get().appendCurUseTime("pubsub server finishTime", true);
        return 0;
    }

    private int writeBasket(PSData[] pSDataArr, long j) {
        int i;
        int i2 = 0;
        while (true) {
            i = i2;
            if (i < pSDataArr.length) {
                IBasket borrowWriteBasket = this.basketFactory.borrowWriteBasket(new boolean[]{true});
                if (borrowWriteBasket == null) {
                    foreachItem(pSDataArr, pSData -> {
                        this.sta.getSc(pSData.getTopic(), Integer.valueOf(pSData.getSrcClientId())).add((short) 59, 1L);
                        this.sta.getSc(pSData.getTopic(), Integer.valueOf(pSData.getSrcClientId())).add((short) 51, 1L);
                    });
                    String str = "Fail size: " + (pSDataArr.length - i);
                    LG.log((byte) 5, getClass(), str);
                    logger.error(str);
                    break;
                }
                int remainding = borrowWriteBasket.remainding();
                int i3 = remainding;
                if (pSDataArr.length - i < remainding) {
                    i3 = pSDataArr.length - i;
                }
                if (!borrowWriteBasket.write(pSDataArr, i, i3)) {
                    this.basketFactory.returnWriteBasket(borrowWriteBasket, true);
                    String str2 = "Fail write basket size: " + (pSDataArr.length - i);
                    LG.log((byte) 5, getClass(), str2);
                    logger.error(str2);
                    foreachItem(pSDataArr, pSData2 -> {
                        this.sta.getSc(pSData2.getTopic(), Integer.valueOf(pSData2.getSrcClientId())).add((short) 51, 1L);
                    });
                    break;
                }
                if (!this.basketFactory.returnWriteBasket(borrowWriteBasket, true)) {
                    foreachItem(pSDataArr, pSData3 -> {
                        this.sta.getSc(pSData3.getTopic(), Integer.valueOf(pSData3.getSrcClientId())).add((short) 59, 1L);
                        this.sta.getSc(pSData3.getTopic(), Integer.valueOf(pSData3.getSrcClientId())).add((short) 51, 1L);
                    });
                    String str3 = "Fail to return basket fail size: " + (pSDataArr.length - i);
                    LG.log((byte) 5, getClass(), str3);
                    logger.error(str3);
                    break;
                }
                i2 = i + i3;
            } else {
                break;
            }
        }
        return i;
    }

    private void doCheck() {
        while (true) {
            try {
                long size = this.basketFactory.size();
                long curTime = TimeUtils.getCurTime();
                if (size < this.batchSize) {
                    for (Map.Entry<String, Long> entry : this.lastSendTimes.entrySet()) {
                        if (curTime - entry.getValue().longValue() > this.sendInterval && this.cacheStorage.len(entry.getKey()) > 0) {
                            List<PSData> pops = this.cacheStorage.pops(entry.getKey(), this.batchSize);
                            if (pops != null && pops.size() != 0) {
                                PSData[] pSDataArr = new PSData[pops.size()];
                                pops.toArray(pSDataArr);
                                int writeBasket = writeBasket(pSDataArr, curTime);
                                if (writeBasket < pSDataArr.length) {
                                    this.cacheStorage.push(entry.getKey(), pSDataArr, writeBasket, pSDataArr.length - writeBasket);
                                    this.cacheItemsCnt.addAndGet(-writeBasket);
                                }
                                if (LG.isLoggable(2, new int[0])) {
                                    LG.log((byte) 2, getClass(), "begin get items from cache topic:" + entry.getKey());
                                }
                                if (this.openDebug) {
                                    logger.info("begin get items from cache topic:{}", entry.getKey());
                                }
                            }
                        }
                    }
                }
                if (this.basketFactory.size() == 0) {
                    boolean z = false;
                    for (String str : this.sendCache.keySet()) {
                        List<PSData> list = this.sendCache.get(str);
                        if (!list.isEmpty() && (list.size() > this.batchSize || curTime - this.lastSendTimes.get(str).longValue() > this.sendInterval)) {
                            z = true;
                            break;
                        }
                    }
                    if (z) {
                        doSend(curTime);
                    } else {
                        synchronized (this.syncLocker) {
                            this.syncLocker.wait(1000L);
                        }
                    }
                } else {
                    Iterator it = this.basketFactory.iterator(true);
                    while (true) {
                        IBasket iBasket = (IBasket) it.next();
                        if (iBasket == null) {
                            break;
                        }
                        int remainding = iBasket.remainding();
                        if (remainding > 0) {
                            PSData[] pSDataArr2 = new PSData[remainding];
                            if (iBasket.readAll(pSDataArr2)) {
                                this.basketFactory.returnReadSlot(iBasket, true);
                                String topic = pSDataArr2[0].getTopic();
                                List<PSData> list2 = this.sendCache.get(topic);
                                if (list2 == null) {
                                    Map<String, List<PSData>> map = this.sendCache;
                                    ArrayList arrayList = new ArrayList();
                                    list2 = arrayList;
                                    map.put(topic, arrayList);
                                }
                                list2.addAll(Arrays.asList(pSDataArr2));
                            } else {
                                this.basketFactory.returnReadSlot(iBasket, false);
                                IBasket iBasket2 = null;
                                if (LG.isLoggable(2, new int[0])) {
                                    LG.log((byte) 2, getClass(), "Fail to get element from basket remaiding:" + iBasket2.remainding());
                                }
                                if (this.openDebug) {
                                    logger.info("Fail to get element from basket remaiding:{}", Integer.valueOf(iBasket2.remainding()));
                                }
                            }
                        }
                    }
                    if (!this.sendCache.isEmpty()) {
                        doSend(curTime);
                    }
                }
            } catch (Throwable th) {
                logger.error("doCheck异常", th);
                LG.log((byte) 5, getClass(), "", th);
            }
        }
    }

    private void doSend(long j) {
        int i = 0;
        for (String str : this.sendCache.keySet()) {
            long longValue = this.lastSendTimes.get(str).longValue();
            List<PSData> list = this.sendCache.get(str);
            if (!list.isEmpty() && (list.size() >= this.batchSize || j - longValue >= this.sendInterval)) {
                this.lastSendTimes.put(str, Long.valueOf(j));
                int size = list.size();
                if (size > this.batchSize) {
                    size = this.batchSize;
                }
                PSData[] pSDataArr = new PSData[size];
                Iterator<PSData> it = list.iterator();
                for (int i2 = 0; it.hasNext() && i2 < size; i2++) {
                    pSDataArr[i2] = it.next();
                    it.remove();
                }
                i += pSDataArr.length;
                this.executor.submit(new Worker(pSDataArr, str));
                foreachItem(pSDataArr, pSData -> {
                    this.sta.getSc(pSData.getTopic(), Integer.valueOf(pSData.getSrcClientId())).add((short) 64, 1L);
                });
            } else if (this.openDebug) {
            }
        }
    }
}
