/*
 * Decompiled with CFR 0.152.
 */
package cn.jmicro.pubsub;

import cn.jmicro.api.JMicroContext;
import cn.jmicro.api.async.IPromise;
import cn.jmicro.api.async.PromiseUtils;
import cn.jmicro.api.internal.async.PromiseImpl;
import cn.jmicro.api.internal.pubsub.IInternalSubRpc;
import cn.jmicro.api.monitor.LG;
import cn.jmicro.api.net.Message;
import cn.jmicro.api.net.ServerError;
import cn.jmicro.api.objectfactory.IObjectFactory;
import cn.jmicro.api.persist.IObjectStorage;
import cn.jmicro.api.pubsub.PSData;
import cn.jmicro.api.registry.ServiceMethod;
import cn.jmicro.api.registry.UniqueServiceMethodKey;
import cn.jmicro.codegenerator.AsyncClientUtils;
import cn.jmicro.common.CommonException;
import cn.jmicro.common.util.JsonUtils;
import cn.jmicro.common.util.StringUtils;
import cn.jmicro.pubsub.ISubscriberCallback;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriberCallbackImpl
implements ISubscriberCallback {
    private static final Class<?> TAG = SubscriberCallbackImpl.class;
    private static final int ARR = 1;
    private static final int SINGLE = 2;
    private static final int DATA = 3;
    private static final int NONE = 4;
    private static final Logger logger = LoggerFactory.getLogger(SubscriberCallbackImpl.class);
    private ServiceMethod sm = null;
    private Object srvProxy = null;
    private IObjectFactory of;
    private IObjectStorage os;
    private Map<String, Holder> key2Holder = new HashMap<String, Holder>();
    private IInternalSubRpc pubsubServer;
    private int type;

    public SubscriberCallbackImpl(ServiceMethod sm, Object srv, IObjectFactory of) {
        if (sm == null) {
            throw new CommonException("SubCallback service method cannot be null");
        }
        if (srv == null) {
            throw new CommonException("SubCallback service cannot be null");
        }
        this.of = of;
        this.os = (IObjectStorage)of.get(IObjectStorage.class);
        this.sm = sm;
        this.srvProxy = srv;
        this.pubsubServer = (IInternalSubRpc)of.get(IInternalSubRpc.class);
    }

    @Override
    public IPromise<PSData[]> onMessage(PSData[] items) {
        switch (this.type) {
            case 1: {
                return this.callAsArra(items);
            }
            case 2: 
            case 3: 
            case 4: {
                return this.callOneByOne(items, this.type);
            }
        }
        throw new CommonException(0, "onMessage topic:" + this.sm.getTopic() + ", type: " + this.type + "," + this.sm.getKey().toKey(false, false, false));
    }

    private void notiryResultFail(int code, String msg, Object cxt, List<PSData> fs, PromiseImpl<PSData[]> p) {
        PSData[] items = (PSData[])cxt;
        if (fs != null && !fs.isEmpty()) {
            PSData[] failItems = new PSData[fs.size()];
            fs.toArray(failItems);
            p.setResult((Object)failItems);
            this.resultItem(failItems, (byte)-7);
            PSData[] successItems = new PSData[items.length - fs.size()];
            int idx = 0;
            if (fs.size() < items.length) {
                for (PSData pd : items) {
                    boolean f = false;
                    for (PSData fpd : fs) {
                        if (pd.getId() != fpd.getId()) continue;
                        f = true;
                    }
                    if (f) continue;
                    successItems[idx++] = pd;
                }
                this.resultItem(successItems, (byte)0);
            }
        } else {
            this.resultItem(items, (byte)0);
            p.setResult(null);
        }
    }

    private IPromise<PSData[]> callAsArra(PSData[] items) {
        PromiseImpl p = new PromiseImpl();
        try {
            PromiseUtils.callService((Object)this.srvProxy, (String)this.sm.getKey().getMethod(), (Object)items, (Object[])new Object[]{items}).success((obj, ctx) -> {
                IPromise<List<PSData>> fsPro = this.notifyResult(obj, items, 0);
                fsPro.fail((code, msg, cxt) -> {
                    this.notiryResultFail(code, msg, cxt, (List)fsPro.getResult(), (PromiseImpl<PSData[]>)p);
                    p.setFail(code, msg);
                    p.done();
                }).success((rst, is) -> {
                    this.resultItem(items, (byte)0);
                    p.done();
                });
            }).fail((code, msg, pda) -> {
                IPromise<List<PSData>> fsPro = this.notifyResult(new ServerError(code, msg), items, -6);
                fsPro.fail((code0, msg0, cxt) -> {
                    logger.error("code: " + code0 + ",msg: " + msg0);
                    this.notiryResultFail(code, msg, items, Arrays.asList(items), (PromiseImpl<PSData[]>)p);
                    p.setFail(code, msg);
                    p.done();
                }).success((rst, is) -> {
                    this.resultItem(items, (byte)-6);
                    p.setFail(code, msg);
                    p.done();
                });
            });
            return p;
        }
        catch (Throwable e) {
            String msg2 = "callAsArra topic:" + this.sm.getTopic() + ",mkey:" + this.sm.getKey().toKey(false, false, false);
            logger.error(msg2, e);
            LG.log((byte)5, TAG, (String)msg2, (Throwable)e);
            this.resultItem(items, (byte)-6);
            p.setResult((Object)items);
            p.setFail(1, msg2);
            p.done();
            return p;
        }
    }

    private IPromise<List<PSData>> notifyResult(Object obj, PSData[] items, int resultCode) {
        PromiseImpl outp = new PromiseImpl();
        ArrayList<PSData> fails = new ArrayList<PSData>();
        AtomicInteger cbcnt = new AtomicInteger(0);
        for (PSData pd : items) {
            if (!StringUtils.isNotEmpty((String)pd.getCallback())) continue;
            cbcnt.incrementAndGet();
        }
        if (cbcnt.get() == 0) {
            outp.setResult(null);
            outp.done();
        } else {
            for (PSData pd : items) {
                try {
                    if (!StringUtils.isNotEmpty((String)pd.getCallback())) continue;
                    IPromise<PSData> pro = this.callback(pd, obj, resultCode);
                    pro.then((proData, fail, ctx) -> {
                        int cnt;
                        if (proData != null) {
                            fails.add((PSData)proData);
                            logger.error(fail.toString());
                        }
                        if ((cnt = cbcnt.decrementAndGet()) <= 0) {
                            outp.setResult((Object)fails);
                            outp.done();
                        }
                    });
                }
                catch (Throwable e) {
                    String msg = "callOneByOne pd:" + pd.getId() + ", topic:" + pd.getTopic() + ",mkey:" + this.sm.getKey().toKey(false, false, false);
                    logger.error(msg, e);
                    LG.log((byte)5, TAG, (String)msg, (Throwable)e);
                    fails.add(pd);
                    int cnt = cbcnt.decrementAndGet();
                    if (cnt > 0) continue;
                    outp.setResult(fails);
                    outp.done();
                }
            }
        }
        return outp;
    }

    private IPromise<PSData[]> callOneByOne(PSData[] items, int type) {
        PromiseImpl p = new PromiseImpl();
        p.setResult(null);
        ArrayList<PSData> fails = new ArrayList<PSData>();
        AtomicInteger ai = new AtomicInteger(items.length);
        for (PSData pd : items) {
            try {
                IPromise rePromise = null;
                if (type == 2) {
                    rePromise = PromiseUtils.callService((Object)this.srvProxy, (String)this.sm.getKey().getMethod(), (Object)pd, (Object[])new Object[]{pd});
                } else if (type == 4) {
                    rePromise = PromiseUtils.callService((Object)this.srvProxy, (String)this.sm.getKey().getMethod(), (Object)pd, (Object[])new Object[0]);
                } else if (type == 3) {
                    Object[] args = (Object[])pd.getData();
                    rePromise = PromiseUtils.callService((Object)this.srvProxy, (String)this.sm.getKey().getMethod(), (Object)pd, (Object[])args);
                }
                rePromise.success((result, pdItem) -> {
                    PSData pda = (PSData)pdItem;
                    if (StringUtils.isNotEmpty((String)pda.getCallback())) {
                        this.resultItem(pda, (byte)0);
                    } else {
                        IPromise<PSData> inPro = this.callback((PSData)pdItem, result, 0);
                        inPro.success((iobj, actx) -> {
                            this.resultItem(pda, (byte)0);
                            int cnt = ai.decrementAndGet();
                            if (cnt == 0) {
                                if (!fails.isEmpty()) {
                                    PSData[] pds = new PSData[fails.size()];
                                    fails.toArray(pds);
                                    p.setResult((Object)pds);
                                    p.setFail(1, "fail item in result");
                                }
                                p.done();
                            }
                        }).fail((code, msg, objItem) -> this.resultItem(pda, (byte)-7));
                    }
                }).fail((code, msg, pdItem) -> {
                    PSData pda = (PSData)pdItem;
                    logger.error("code:" + code + ", msg: " + msg);
                    fails.add(pd);
                    int cnt = ai.decrementAndGet();
                    if (cnt == 0) {
                        if (!fails.isEmpty()) {
                            PSData[] pds = new PSData[fails.size()];
                            fails.toArray(pds);
                            p.setResult((Object)pds);
                            p.setFail(1, "fail item in result");
                        }
                        p.done();
                    }
                    this.resultItem(pda, (byte)-6);
                });
            }
            catch (Throwable e) {
                String msg2 = "callOneByOne pd:" + pd.getId() + ", topic:" + pd.getTopic() + ",mkey:" + this.sm.getKey().toKey(false, false, false);
                logger.error(msg2, e);
                LG.log((byte)5, TAG, (String)msg2, (Throwable)e);
                fails.add(pd);
                this.resultItem(pd, (byte)-6);
                int cnt = ai.decrementAndGet();
                if (cnt != 0) continue;
                if (!fails.isEmpty()) {
                    PSData[] pds = new PSData[fails.size()];
                    fails.toArray(pds);
                    p.setResult((Object)pds);
                    p.setFail(1, "fail item in result");
                }
                p.done();
            }
        }
        return p;
    }

    private void resultItem(PSData pda, byte b) {
        try {
            if (pda.isPersist() || b != 0) {
                Document d = Document.parse((String)JsonUtils.getIns().toJson((Object)pda));
                d.put("result", (Object)b);
                d.put("id", (Object)pda.getId());
                this.os.updateOrSaveById("t_pubsub_items", (Object)d, Document.class, "id", true);
            }
        }
        catch (Exception e) {
            logger.error("resultItem", (Throwable)e);
        }
    }

    private void resultItem(PSData[] pdas, byte b) {
        for (PSData pd : pdas) {
            this.resultItem(pd, b);
        }
    }

    public IPromise<PSData> callback(PSData item, Object result, int statuCode) {
        PromiseImpl p = new PromiseImpl();
        p.setResult(null);
        if (StringUtils.isEmpty((String)item.getCallback())) {
            p.done();
            return p;
        }
        if (item.isCallbackMethod()) {
            return this.callbackServiceMethod(item, result, statuCode);
        }
        PSData d = new PSData();
        d.setTopic(item.getCallback());
        d.setData((Object)new Object[]{result, item.getId(), statuCode});
        d.setPersist(true);
        d.setSrcClientId(item.getSrcClientId());
        d.put("_spid_", (Object)item.getId());
        d.setPersist(item.isPersist());
        this.pubsubServer.publishItem(d);
        p.done();
        return p;
    }

    public IPromise<PSData> callbackServiceMethod(PSData item, Object result, int statuCode) {
        PromiseImpl p = new PromiseImpl();
        p.setResult(null);
        if (StringUtils.isEmpty((String)item.getCallback())) {
            p.setFail(-1, "callback is null");
            p.done();
            return p;
        }
        Map cxt = item.getContext();
        Long linkId = (Long)cxt.get("_linkerId");
        String key = item.getCallback();
        try {
            UniqueServiceMethodKey mkey = UniqueServiceMethodKey.fromKey((String)key);
            Holder h = null;
            if (this.key2Holder.containsKey(key)) {
                h = this.key2Holder.get(key);
            } else {
                h = new Holder();
                h.srv = this.of.getRemoteServie(mkey.getServiceName(), mkey.getNamespace(), mkey.getVersion(), null);
                if (h.srv == null) {
                    String msg2 = "Fail to create async service proxy src:" + this.sm.getKey().toString() + ",target:" + key;
                    LG.log((byte)5, SubscriberCallbackImpl.class, (String)msg2);
                    p.setFail(1, msg2);
                    p.done();
                    return p;
                }
                h.key = mkey;
                if (Message.is((int)item.getFlag(), (int)2)) {
                    h.m = h.srv.getClass().getMethod(AsyncClientUtils.genAsyncMethodName((String)mkey.getMethod()), result.getClass());
                } else if (Message.is((int)item.getFlag(), (int)4)) {
                    h.m = h.srv.getClass().getMethod(AsyncClientUtils.genAsyncMethodName((String)mkey.getMethod()), Integer.TYPE, Long.TYPE, Map.class);
                }
                if (h.m == null) {
                    String msg3 = "Async service method not found: src:" + this.sm.getKey().toString() + ",target:" + key;
                    LG.log((byte)5, SubscriberCallbackImpl.class, (String)msg3);
                    p.setFail(2, msg3);
                    return p;
                }
                this.key2Holder.put(key, h);
            }
            JMicroContext.get().setLong("_linkerId", linkId);
            IPromise cp = null;
            if (Message.is((int)item.getFlag(), (int)2)) {
                cp = PromiseUtils.callService((Object)h.srv, (String)h.key.getMethod(), null, (Object[])new Object[]{result});
            } else if (Message.is((int)item.getFlag(), (int)4)) {
                cp = PromiseUtils.callService((Object)h.srv, (String)h.key.getMethod(), null, (Object[])new Object[]{result, item.getId(), statuCode});
            }
            if (cp == null) {
                p.setFail(3, "Invkke error: " + key);
                p.setResult((Object)item);
                p.done();
            } else {
                cp.success((rst, actx) -> p.done()).fail((code, msg, actx) -> {
                    p.setResult((Object)item);
                    p.setFail(code, msg);
                    p.done();
                });
            }
            return p;
        }
        catch (Throwable e) {
            String msg4 = "Fail to callback src service:" + this.sm.getKey().toString() + ",c allback: " + key;
            LG.log((byte)5, SubscriberCallbackImpl.class, (String)msg4, (Throwable)e);
            logger.error("", e);
            p.setResult((Object)item);
            p.setFail(5, msg4);
            return p;
        }
    }

    public void init() {
        try {
            Class[] argsCls = UniqueServiceMethodKey.paramsClazzes((String)this.sm.getKey().getParamsStr());
            this.type = argsCls == null || argsCls.length == 0 ? 4 : (argsCls.length == 1 && argsCls[0] == PSData.class ? 2 : (argsCls.length == 1 && argsCls[0] == new PSData[0].getClass() ? 1 : 3));
        }
        catch (Throwable e) {
            logger.error("init error: " + this.sm.getKey() + ", error " + e.getMessage());
            throw new CommonException("", e);
        }
    }

    @Override
    public String info() {
        return this.sm.getKey().toKey(false, false, false);
    }

    public String toString() {
        return this.info();
    }

    public int hashCode() {
        return this.info().hashCode();
    }

    public boolean equals(Object obj) {
        return this.hashCode() == obj.hashCode();
    }

    @Override
    public ServiceMethod getSm() {
        return this.sm;
    }

    public void setSm(ServiceMethod sm) {
        this.sm = sm;
    }

    private class Holder {
        public Object srv;
        public Method m;
        public UniqueServiceMethodKey key;

        private Holder() {
        }
    }
}

