package com.corundumstudio.socketio.store;

import com.corundumstudio.socketio.handler.SocketIOException;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import java.io.IOException;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/* loaded from: input_file:com/corundumstudio/socketio/store/RedisPubSubStore.class */
public class RedisPubSubStore implements PubSubStore {
    private final Jedis sub;
    private final Jedis pub;
    private final Long nodeId;
    private final JsonSupport jsonSupport;
    private final ExecutorService executorService = Executors.newFixedThreadPool(5);
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final ConcurrentMap<String, Class> mapping = new ConcurrentHashMap();
    private final ConcurrentMap<String, Queue<PubSubListener>> map = new ConcurrentHashMap();
    final Semaphore s = new Semaphore(1);
    final JedisPubSub jedisPubSub = new JedisPubSub() { // from class: com.corundumstudio.socketio.store.RedisPubSubStore.1
        public void onUnsubscribe(String str, int i) {
        }

        public void onSubscribe(String str, int i) {
            RedisPubSubStore.this.s.release();
        }

        public void onPUnsubscribe(String str, int i) {
        }

        public void onPSubscribe(String str, int i) {
        }

        public void onPMessage(String str, String str2, String str3) {
        }

        public void onMessage(String str, String str2) {
            try {
                RedisPubSubStore.this.log.trace("onMessage: {}", str2);
                Class cls = (Class) RedisPubSubStore.this.mapping.get(str);
                if (cls == null) {
                    return;
                }
                PubSubMessage pubSubMessage = (PubSubMessage) RedisPubSubStore.this.jsonSupport.readValue(str2, cls);
                if (!RedisPubSubStore.this.nodeId.equals(pubSubMessage.getNodeId())) {
                    Queue queue = (Queue) RedisPubSubStore.this.map.get(str);
                    if (queue == null) {
                        return;
                    }
                    Iterator it = queue.iterator();
                    while (it.hasNext()) {
                        ((PubSubListener) it.next()).onMessage(pubSubMessage);
                    }
                }
            } catch (IOException e) {
                RedisPubSubStore.this.log.error(e.getMessage(), e);
            }
        }
    };

    public RedisPubSubStore(Jedis jedis, Jedis jedis2, Long l, JsonSupport jsonSupport) {
        this.nodeId = l;
        this.jsonSupport = jsonSupport;
        this.pub = jedis;
        this.sub = jedis2;
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public void publish(String str, PubSubMessage pubSubMessage) {
        pubSubMessage.setNodeId(this.nodeId);
        this.log.trace("publish: {} msg: {}", str, pubSubMessage);
        try {
            this.pub.publish(str, this.jsonSupport.writeValueAsString(pubSubMessage));
        } catch (IOException e) {
            throw new SocketIOException(e);
        }
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public <T extends PubSubMessage> void subscribe(final String str, PubSubListener<T> pubSubListener, Class<T> cls) {
        Queue<PubSubListener> queue = this.map.get(str);
        if (queue == null) {
            queue = new ConcurrentLinkedQueue();
            Queue<PubSubListener> putIfAbsent = this.map.putIfAbsent(str, queue);
            if (putIfAbsent != null) {
                queue = putIfAbsent;
            }
        }
        queue.add(pubSubListener);
        this.mapping.put(str, cls);
        this.s.acquireUninterruptibly();
        this.executorService.execute(new Runnable() { // from class: com.corundumstudio.socketio.store.RedisPubSubStore.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    RedisPubSubStore.this.sub.subscribe(RedisPubSubStore.this.jedisPubSub, new String[]{str});
                } catch (Exception e) {
                    if (RedisPubSubStore.this.executorService.isShutdown()) {
                        return;
                    }
                    RedisPubSubStore.this.log.error(e.getMessage(), e);
                }
            }
        });
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public void unsubscribe(String str) {
        this.mapping.remove(str);
        this.map.remove(str);
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public void shutdown() {
        this.executorService.shutdownNow();
        try {
            this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            this.log.error(e.getMessage(), e);
        }
    }
}
