/*
 * Decompiled with CFR 0.152.
 */
package org.redisson;

import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.RedisPubSubTopicListenerWrapper;
import org.redisson.RedissonObject;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;

public class RedissonTopic<M>
extends RedissonObject
implements RTopic<M> {
    private final CountDownLatch subscribeLatch = new CountDownLatch(1);
    private final AtomicBoolean subscribeOnce = new AtomicBoolean();
    private final Map<Integer, RedisPubSubTopicListenerWrapper<String, M>> listeners = new ConcurrentHashMap<Integer, RedisPubSubTopicListenerWrapper<String, M>>();
    private final ConnectionManager connectionManager;
    private ConnectionManager.PubSubEntry pubSubEntry;

    RedissonTopic(ConnectionManager connectionManager, String name) {
        super(name);
        this.connectionManager = connectionManager;
    }

    public void subscribe() {
        if (this.subscribeOnce.compareAndSet(false, true)) {
            RedisPubSubAdapter listener = new RedisPubSubAdapter<String, M>(){

                @Override
                public void subscribed(String channel, long count) {
                    if (channel.equals(RedissonTopic.this.getName())) {
                        RedissonTopic.this.subscribeLatch.countDown();
                    }
                }
            };
            this.pubSubEntry = this.connectionManager.subscribe(listener, this.getName());
        }
        try {
            this.subscribeLatch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void publish(M message) {
        RedisConnection<String, M> conn = this.connectionManager.connection();
        try {
            conn.publish(this.getName(), message);
        }
        finally {
            this.connectionManager.release(conn);
        }
    }

    @Override
    public int addListener(MessageListener<M> listener) {
        RedisPubSubTopicListenerWrapper<String, M> pubSubListener = new RedisPubSubTopicListenerWrapper<String, M>(listener, this.getName());
        this.listeners.put(pubSubListener.hashCode(), pubSubListener);
        this.pubSubEntry.addListener(pubSubListener);
        return pubSubListener.hashCode();
    }

    @Override
    public void removeListener(int listenerId) {
        RedisPubSubTopicListenerWrapper<String, M> pubSubListener = this.listeners.remove(listenerId);
        this.pubSubEntry.removeListener(pubSubListener);
    }

    @Override
    public void close() {
        this.connectionManager.unsubscribe(this.pubSubEntry, this.getName());
    }
}

