/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.session.registry;

import com.alipay.sofa.registry.common.model.store.BaseInfo;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.common.model.store.StoreData;
import com.alipay.sofa.registry.common.model.store.Subscriber;
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.common.model.store.Watcher;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.node.NodeManager;
import com.alipay.sofa.registry.server.session.node.service.DataNodeService;
import com.alipay.sofa.registry.server.session.registry.Registry;
import com.alipay.sofa.registry.server.session.store.DataStore;
import com.alipay.sofa.registry.server.session.store.Interests;
import com.alipay.sofa.registry.server.session.store.Watchers;
import com.alipay.sofa.registry.server.session.strategy.SessionRegistryStrategy;
import com.alipay.sofa.registry.task.listener.TaskEvent;
import com.alipay.sofa.registry.task.listener.TaskListenerManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;

public class SessionRegistry
implements Registry {
    private static final Logger LOGGER = LoggerFactory.getLogger(SessionRegistry.class);
    private static final Logger TASK_LOGGER = LoggerFactory.getLogger(SessionRegistry.class, (String)"[Task]");
    @Autowired
    private Interests sessionInterests;
    @Autowired
    private Watchers sessionWatchers;
    @Autowired
    private DataStore sessionDataStore;
    @Autowired
    private DataNodeService dataNodeService;
    @Autowired
    private TaskListenerManager taskListenerManager;
    @Autowired
    private NodeManager dataNodeManager;
    @Autowired
    private SessionServerConfig sessionServerConfig;
    @Autowired
    private Exchange boltExchange;
    @Autowired
    private SessionRegistryStrategy sessionRegistryStrategy;

    public void register(StoreData storeData) {
        this.checkConnect(storeData);
        switch (storeData.getDataType()) {
            case PUBLISHER: {
                Publisher publisher = (Publisher)storeData;
                this.dataNodeService.register(publisher);
                this.sessionDataStore.add(publisher);
                this.sessionRegistryStrategy.afterPublisherRegister(publisher);
                break;
            }
            case SUBSCRIBER: {
                Subscriber subscriber = (Subscriber)storeData;
                this.sessionInterests.add(subscriber);
                this.sessionRegistryStrategy.afterSubscriberRegister(subscriber);
                break;
            }
            case WATCHER: {
                Watcher watcher = (Watcher)storeData;
                this.sessionWatchers.add(watcher);
                this.sessionRegistryStrategy.afterWatcherRegister(watcher);
                break;
            }
        }
    }

    @Override
    public void unRegister(StoreData<String> storeData) {
        switch (storeData.getDataType()) {
            case PUBLISHER: {
                Publisher publisher = (Publisher)storeData;
                this.sessionDataStore.deleteById(storeData.getId(), publisher.getDataInfoId());
                this.dataNodeService.unregister(publisher);
                this.sessionRegistryStrategy.afterPublisherUnRegister(publisher);
                break;
            }
            case SUBSCRIBER: {
                Subscriber subscriber = (Subscriber)storeData;
                this.sessionInterests.deleteById(storeData.getId(), subscriber.getDataInfoId());
                this.sessionRegistryStrategy.afterSubscriberUnRegister(subscriber);
                break;
            }
            case WATCHER: {
                Watcher watcher = (Watcher)storeData;
                this.sessionWatchers.deleteById(watcher.getId(), watcher.getDataInfoId());
                this.sessionRegistryStrategy.afterWatcherUnRegister(watcher);
                break;
            }
        }
    }

    @Override
    public void cancel(List<String> connectIds) {
        TaskEvent taskEvent = new TaskEvent(connectIds, TaskEvent.TaskType.CANCEL_DATA_TASK);
        TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", (Object)taskEvent);
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    @Override
    public void fetchChangData() {
        if (!this.sessionServerConfig.isBeginDataFetchTask()) {
            try {
                TimeUnit.MILLISECONDS.sleep(500L);
            }
            catch (InterruptedException e) {
                LOGGER.error("fetchChangData task sleep InterruptedException", (Throwable)e);
            }
            return;
        }
        this.fetchChangDataProcess();
    }

    @Override
    public void fetchChangDataProcess() {
        ArrayList<String> checkDataInfoIds = new ArrayList<String>();
        this.sessionInterests.getInterestDataInfoIds().forEach(dataInfoId -> {
            Collection<Subscriber> subscribers = this.sessionInterests.getInterests((String)dataInfoId);
            if (subscribers != null && !subscribers.isEmpty()) {
                checkDataInfoIds.add((String)dataInfoId);
            }
        });
        Map<String, Collection<String>> map = this.calculateDataNode(checkDataInfoIds);
        map.forEach((address, dataInfoIds) -> {
            Map<String, Map<String, Long>> dataVersions = this.dataNodeService.fetchDataVersion(URL.valueOf((String)address), (Collection<String>)dataInfoIds);
            if (dataVersions != null) {
                this.sessionRegistryStrategy.doFetchChangDataProcess(dataVersions);
            } else {
                LOGGER.warn("Fetch no change data versions info from {}", address);
            }
        });
    }

    private Map<String, Collection<String>> calculateDataNode(Collection<String> dataInfoIds) {
        HashMap<String, Collection<String>> map = new HashMap<String, Collection<String>>();
        if (dataInfoIds != null) {
            dataInfoIds.forEach(dataInfoId -> {
                Object dataNode = this.dataNodeManager.getNode((String)dataInfoId);
                URL url = new URL(dataNode.getNodeUrl().getIpAddress(), this.sessionServerConfig.getDataServerPort());
                Collection list = map.computeIfAbsent(url.getAddressString(), k -> new ArrayList());
                list.add(dataInfoId);
            });
        }
        return map;
    }

    private void checkConnect(StoreData storeData) {
        BaseInfo baseInfo = (BaseInfo)storeData;
        Server sessionServer = this.boltExchange.getServer(Integer.valueOf(this.sessionServerConfig.getServerPort()));
        Channel channel = sessionServer.getChannel(baseInfo.getSourceAddress());
        if (channel == null) {
            throw new RuntimeException(String.format("Register address %s  has not connected session server!", baseInfo.getSourceAddress()));
        }
    }

    public Interests getSessionInterests() {
        return this.sessionInterests;
    }

    public DataStore getSessionDataStore() {
        return this.sessionDataStore;
    }

    public TaskListenerManager getTaskListenerManager() {
        return this.taskListenerManager;
    }
}

