/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.protocol.mqtt.repositories;

import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.shenyu.protocol.mqtt.repositories.BaseRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscribeRepository
implements BaseRepository<List<String>, List<Channel>> {
    private static final Logger LOG = LoggerFactory.getLogger(SubscribeRepository.class);
    private static final Map<String, List<Channel>> TOPIC_CHANNEL_FACTORY = new ConcurrentHashMap<String, List<Channel>>();

    @Override
    public void add(List<String> topics, List<Channel> channels) {
        CompletableFuture.runAsync(() -> topics.parallelStream().forEach(s -> {
            List<Channel> list = this.get((String)s);
            list.addAll(channels);
            TOPIC_CHANNEL_FACTORY.put((String)s, list);
        }));
    }

    @Override
    public void add(Channel channel, List<MqttTopicSubscription> mqttTopicSubscription) {
        CompletableFuture.runAsync(() -> mqttTopicSubscription.parallelStream().forEach(s -> {
            List<Channel> channels = this.get(s.topicName());
            channels.add(channel);
            TOPIC_CHANNEL_FACTORY.put(s.topicName(), channels);
        }));
    }

    @Override
    public void remove(List<String> topics) {
        CompletableFuture.runAsync(() -> topics.parallelStream().forEach(TOPIC_CHANNEL_FACTORY::remove));
    }

    public void remove(List<String> topics, Channel channel) {
        CompletableFuture.runAsync(() -> topics.parallelStream().forEach(topic -> TOPIC_CHANNEL_FACTORY.get(topic).remove(channel)));
    }

    @Override
    public List<Channel> get(List<String> topics) {
        CopyOnWriteArraySet channels = new CopyOnWriteArraySet();
        topics.parallelStream().forEach(s -> channels.addAll(TOPIC_CHANNEL_FACTORY.get(s)));
        return new CopyOnWriteArrayList<Channel>(channels);
    }

    @Override
    public List<Channel> get(String topic) {
        return TOPIC_CHANNEL_FACTORY.getOrDefault(topic, new CopyOnWriteArrayList());
    }
}

