/*
 * Decompiled with CFR 0.152.
 */
package com.corundumstudio.socketio.store;

import com.corundumstudio.socketio.handler.SocketIOException;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class RedisPubSubStore
implements PubSubStore {
    private final ExecutorService executorService = Executors.newFixedThreadPool(5);
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final Jedis sub;
    private final Jedis pub;
    private final Long nodeId;
    private final JsonSupport jsonSupport;
    private final ConcurrentMap<String, Class> mapping = new ConcurrentHashMap<String, Class>();
    private final ConcurrentMap<String, Queue<PubSubListener>> map = new ConcurrentHashMap<String, Queue<PubSubListener>>();
    final Semaphore s = new Semaphore(1);
    final JedisPubSub jedisPubSub = new JedisPubSub(){

        public void onUnsubscribe(String channel, int subscribedChannels) {
        }

        public void onSubscribe(String channel, int subscribedChannels) {
            RedisPubSubStore.this.s.release();
        }

        public void onPUnsubscribe(String pattern, int subscribedChannels) {
        }

        public void onPSubscribe(String pattern, int subscribedChannels) {
        }

        public void onPMessage(String pattern, String channel, String message) {
        }

        public void onMessage(String channel, String message) {
            try {
                RedisPubSubStore.this.log.trace("onMessage: {}", (Object)message);
                Class clazz = (Class)RedisPubSubStore.this.mapping.get(channel);
                if (clazz == null) {
                    return;
                }
                PubSubMessage data = (PubSubMessage)RedisPubSubStore.this.jsonSupport.readValue(message, clazz);
                if (!RedisPubSubStore.this.nodeId.equals(data.getNodeId())) {
                    Queue listeners = (Queue)RedisPubSubStore.this.map.get(channel);
                    if (listeners == null) {
                        return;
                    }
                    for (PubSubListener listener : listeners) {
                        listener.onMessage(data);
                    }
                }
            }
            catch (IOException e) {
                RedisPubSubStore.this.log.error(e.getMessage(), (Throwable)e);
            }
        }
    };

    public RedisPubSubStore(Jedis pub, Jedis sub, Long nodeId, JsonSupport jsonSupport) {
        this.nodeId = nodeId;
        this.jsonSupport = jsonSupport;
        this.pub = pub;
        this.sub = sub;
    }

    @Override
    public void publish(String name, PubSubMessage msg) {
        msg.setNodeId(this.nodeId);
        this.log.trace("publish: {} msg: {}", (Object)name, (Object)msg);
        try {
            this.pub.publish(name, this.jsonSupport.writeValueAsString(msg));
        }
        catch (IOException e) {
            throw new SocketIOException(e);
        }
    }

    @Override
    public <T extends PubSubMessage> void subscribe(final String name, PubSubListener<T> listener, Class<T> clazz) {
        Queue oldList;
        Queue<PubSubListener<Object>> list = (ConcurrentLinkedQueue<PubSubListener<T>>)this.map.get(name);
        if (list == null && (oldList = (Queue)this.map.putIfAbsent(name, list = new ConcurrentLinkedQueue<PubSubListener<T>>())) != null) {
            list = oldList;
        }
        list.add(listener);
        this.mapping.put(name, clazz);
        this.s.acquireUninterruptibly();
        this.executorService.execute(new Runnable(){

            @Override
            public void run() {
                block2: {
                    try {
                        RedisPubSubStore.this.sub.subscribe(RedisPubSubStore.this.jedisPubSub, new String[]{name});
                    }
                    catch (Exception e) {
                        if (RedisPubSubStore.this.executorService.isShutdown()) break block2;
                        RedisPubSubStore.this.log.error(e.getMessage(), (Throwable)e);
                    }
                }
            }
        });
    }

    @Override
    public void unsubscribe(String name) {
        this.mapping.remove(name);
        this.map.remove(name);
    }

    @Override
    public void shutdown() {
        this.executorService.shutdownNow();
        try {
            this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            this.log.error(e.getMessage(), (Throwable)e);
        }
    }
}

