/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.common.handler;

import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.allocate.server.AllocateMessageQueueAveragely;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.convert.RegisterNodeInfoConverter;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.dto.InstanceKey;
import com.aizuda.snailjob.server.common.dto.InstanceLiveInfo;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.handler.InstanceManager;
import com.aizuda.snailjob.server.common.register.ServerRegister;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.springframework.stereotype.Component;

@Component
public class ServerNodeBalance
implements Lifecycle,
Runnable {
    private final InstanceManager instanceManager;
    public static final Long INITIAL_DELAY = 10L;
    private final ServerNodeMapper serverNodeMapper;
    private final SystemProperties systemProperties;
    private Thread thread = null;
    private List<Integer> bucketList;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doBalance() {
        SnailJobLog.LOCAL.info("rebalance start", new Object[0]);
        DistributeInstance.RE_BALANCE_ING.set(Boolean.TRUE);
        try {
            Set<InstanceLiveInfo> instanceALiveInfoSet = this.instanceManager.getInstanceALiveInfoSet("DEFAULT_SERVER_NAMESPACE_ID", "DEFAULT_SERVER");
            Set registerNodeInfoSet = StreamUtils.toSet(instanceALiveInfoSet, InstanceLiveInfo::getNodeInfo);
            Set podIpSet = StreamUtils.toSet((Collection)registerNodeInfoSet, RegisterNodeInfo::getHostId);
            if (CollUtil.isEmpty((Collection)podIpSet)) {
                SnailJobLog.LOCAL.error("server node is empty", new Object[0]);
            }
            DistributeInstance.INSTANCE.clearConsumerBucket();
            if (CollUtil.isEmpty((Collection)podIpSet)) {
                return;
            }
            List<Integer> allocate = new AllocateMessageQueueAveragely().allocate(ServerRegister.CURRENT_CID, this.bucketList, (List<String>)new ArrayList<String>(podIpSet));
            DistributeInstance.INSTANCE.setConsumerBucket(allocate);
            SnailJobLog.LOCAL.info("rebalance complete. allocate:[{}]", new Object[]{allocate});
        }
        catch (Exception e) {
            SnailJobLog.LOCAL.error("rebalance error. ", new Object[]{e});
        }
        finally {
            DistributeInstance.RE_BALANCE_ING.set(Boolean.FALSE);
        }
    }

    @Override
    public void start() {
        int bucketTotal = this.systemProperties.getBucketTotal();
        this.bucketList = new ArrayList<Integer>(bucketTotal);
        for (int i = 0; i < bucketTotal; ++i) {
            this.bucketList.add(i);
        }
        SnailJobLog.LOCAL.info("ServerNodeBalance start", new Object[0]);
        this.thread = new Thread((Runnable)this, "server-node-balance");
        this.thread.start();
    }

    private void removeNode(Set<String> remoteHostIds, Set<String> localHostIds) {
        localHostIds.removeAll(remoteHostIds);
        for (String localHostId : localHostIds) {
            InstanceKey instanceKey = InstanceKey.builder().namespaceId("DEFAULT_SERVER_NAMESPACE_ID").groupName("DEFAULT_SERVER").hostId(localHostId).build();
            this.instanceManager.remove(instanceKey);
        }
    }

    private void refreshExpireAtCache(List<ServerNode> remotePods) {
        this.refreshCache(remotePods);
    }

    private void refreshCache(List<ServerNode> remotePods) {
        for (ServerNode node : remotePods) {
            this.instanceManager.registerOrUpdate(RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(node));
        }
    }

    @Override
    public void close() {
        this.thread.interrupt();
        SnailJobLog.LOCAL.info("ServerNodeBalance start. ", new Object[0]);
        int i = this.serverNodeMapper.delete((Wrapper)new LambdaQueryWrapper().eq(ServerNode::getHostId, (Object)ServerRegister.CURRENT_CID));
        if (1 == i) {
            SnailJobLog.LOCAL.info("delete node success. [{}]", new Object[]{ServerRegister.CURRENT_CID});
        } else {
            SnailJobLog.LOCAL.info("delete node  error. [{}]", new Object[]{ServerRegister.CURRENT_CID});
        }
        SnailJobLog.LOCAL.info("ServerNodeBalance close complete", new Object[0]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(INITIAL_DELAY);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        while (!Thread.currentThread().isInterrupted()) {
            try {
                List remotePods = this.serverNodeMapper.selectList((Wrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().ge(ServerNode::getExpireAt, (Object)LocalDateTime.now())).eq(ServerNode::getNodeType, (Object)NodeTypeEnum.SERVER.getType()));
                Set<InstanceLiveInfo> instanceALiveInfoSet = this.instanceManager.getInstanceALiveInfoSet("DEFAULT_SERVER_NAMESPACE_ID", "DEFAULT_SERVER");
                Set remoteHostIds = StreamUtils.toSet((Collection)remotePods, ServerNode::getHostId);
                Set localHostIds = StreamUtils.toSet((Collection)StreamUtils.toSet(instanceALiveInfoSet, InstanceLiveInfo::getNodeInfo), RegisterNodeInfo::getHostId);
                if (CollUtil.isEmpty(instanceALiveInfoSet) || this.isNodeSizeNotEqual(instanceALiveInfoSet.size(), remotePods.size()) || this.isNodeNotMatch(remoteHostIds, localHostIds) || this.checkConsumerBucket(remoteHostIds)) {
                    this.removeNode(remoteHostIds, localHostIds);
                    this.refreshCache(remotePods);
                    this.doBalance();
                    TimeUnit.SECONDS.sleep(INITIAL_DELAY);
                    continue;
                }
                this.refreshExpireAtCache(remotePods);
            }
            catch (InterruptedException e) {
                SnailJobLog.LOCAL.info("check balance stop", new Object[0]);
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("check balance error", new Object[]{e});
            }
            finally {
                try {
                    TimeUnit.SECONDS.sleep(this.systemProperties.getLoadBalanceCycleTime());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private boolean isNodeNotMatch(Set<String> remoteHostIds, Set<String> localHostIds) {
        boolean b;
        boolean bl = b = !remoteHostIds.containsAll(localHostIds);
        if (b) {
            SnailJobLog.LOCAL.info("Determine if remote nodes match local nodes. Remote host IDs:[{}] Local host IDs:[{}]", new Object[]{localHostIds, remoteHostIds});
        }
        if (CollUtil.isEmpty(DistributeInstance.INSTANCE.getConsumerBucket()) && remoteHostIds.size() <= this.systemProperties.getBucketTotal()) {
            return true;
        }
        return b;
    }

    public boolean checkConsumerBucket(Set<String> remoteHostIds) {
        return CollUtil.isEmpty(DistributeInstance.INSTANCE.getConsumerBucket()) && remoteHostIds.size() <= this.systemProperties.getBucketTotal();
    }

    private boolean isNodeSizeNotEqual(int localNodeSize, int remoteNodeSize) {
        boolean b;
        boolean bl = b = localNodeSize != remoteNodeSize;
        if (b) {
            SnailJobLog.LOCAL.info("If the number of nodes cached remotely and locally is inconsistent, trigger rebalance. Local node size:[{}] Remote node size:[{}]", new Object[]{localNodeSize, remoteNodeSize});
        }
        return b;
    }

    @Generated
    public ServerNodeBalance(InstanceManager instanceManager, ServerNodeMapper serverNodeMapper, SystemProperties systemProperties) {
        this.instanceManager = instanceManager;
        this.serverNodeMapper = serverNodeMapper;
        this.systemProperties = systemProperties;
    }
}

