/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.pubsub;

import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.PlatformDependent;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.SubscribeListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishSubscribeService {
    private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);
    private final ConnectionManager connectionManager;
    private final MasterSlaveServersConfig config;
    private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
    private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
    protected final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
    protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();

    public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
        this.connectionManager = connectionManager;
        this.config = config;
        for (int i = 0; i < this.locks.length; ++i) {
            this.locks[i] = new AsyncSemaphore(1);
        }
    }

    public PubSubConnectionEntry getPubSubEntry(ChannelName channelName) {
        return (PubSubConnectionEntry)this.name2PubSubConnection.get(channelName);
    }

    public RFuture<PubSubConnectionEntry> psubscribe(ChannelName channelName, Codec codec, RedisPubSubListener<?> ... listeners) {
        return this.subscribe(PubSubType.PSUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
    }

    public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?> ... listeners) {
        RedissonPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
        this.subscribe(codec, new ChannelName(channelName), promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
        return promise;
    }

    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?> ... listeners) {
        return this.subscribe(PubSubType.SUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
    }

    private RFuture<PubSubConnectionEntry> subscribe(final PubSubType type, final Codec codec, final ChannelName channelName, final RPromise<PubSubConnectionEntry> promise, final RedisPubSubListener<?> ... listeners) {
        final AsyncSemaphore lock = this.getSemaphore(channelName);
        lock.acquire(new Runnable(){

            @Override
            public void run() {
                if (promise.isDone()) {
                    lock.release();
                    return;
                }
                final RedissonPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
                promise.addListener(new FutureListener<PubSubConnectionEntry>(){

                    @Override
                    public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
                        if (!future.isSuccess()) {
                            result.tryFailure(future.cause());
                        }
                    }
                });
                result.addListener(new FutureListener<PubSubConnectionEntry>(){

                    @Override
                    public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
                        if (!future.isSuccess()) {
                            promise.tryFailure(future.cause());
                            return;
                        }
                        promise.trySuccess(result.getNow());
                    }
                });
                PublishSubscribeService.this.subscribe(codec, channelName, result, type, lock, listeners);
            }
        });
        return promise;
    }

    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?> ... listeners) {
        RedissonPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
        this.subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);
        return promise;
    }

    public AsyncSemaphore getSemaphore(ChannelName channelName) {
        return this.locks[Math.abs(channelName.hashCode() % this.locks.length)];
    }

    private void subscribe(final Codec codec, final ChannelName channelName, final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?> ... listeners) {
        PubSubConnectionEntry connEntry = (PubSubConnectionEntry)this.name2PubSubConnection.get(channelName);
        if (connEntry != null) {
            this.subscribe(channelName, promise, type, lock, connEntry, listeners);
            return;
        }
        this.freePubSubLock.acquire(new Runnable(){

            @Override
            public void run() {
                if (promise.isDone()) {
                    lock.release();
                    PublishSubscribeService.this.freePubSubLock.release();
                    return;
                }
                PubSubConnectionEntry freeEntry = PublishSubscribeService.this.freePubSubConnections.peek();
                if (freeEntry == null) {
                    PublishSubscribeService.this.connect(codec, channelName, promise, type, lock, listeners);
                    return;
                }
                int remainFreeAmount = freeEntry.tryAcquire();
                if (remainFreeAmount == -1) {
                    throw new IllegalStateException();
                }
                PubSubConnectionEntry oldEntry = PublishSubscribeService.this.name2PubSubConnection.putIfAbsent(channelName, freeEntry);
                if (oldEntry != null) {
                    freeEntry.release();
                    PublishSubscribeService.this.freePubSubLock.release();
                    PublishSubscribeService.this.subscribe(channelName, promise, type, lock, oldEntry, listeners);
                    return;
                }
                if (remainFreeAmount == 0) {
                    PublishSubscribeService.this.freePubSubConnections.poll();
                }
                PublishSubscribeService.this.freePubSubLock.release();
                PublishSubscribeService.this.subscribe(channelName, promise, type, lock, freeEntry, listeners);
                if (PubSubType.PSUBSCRIBE == type) {
                    freeEntry.psubscribe(codec, channelName);
                } else {
                    freeEntry.subscribe(codec, channelName);
                }
            }
        });
    }

    private void subscribe(final ChannelName channelName, final RPromise<PubSubConnectionEntry> promise, PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry, final RedisPubSubListener<?> ... listeners) {
        for (RedisPubSubListener<?> listener : listeners) {
            connEntry.addListener(channelName, listener);
        }
        SubscribeListener listener = connEntry.getSubscribeFuture(channelName, type);
        final Future<Void> subscribeFuture = listener.getSuccessFuture();
        subscribeFuture.addListener((GenericFutureListener<Future<Void>>)new FutureListener<Void>(){

            @Override
            public void operationComplete(Future<Void> future) throws Exception {
                if (!promise.trySuccess(connEntry)) {
                    for (RedisPubSubListener listener : listeners) {
                        connEntry.removeListener(channelName, listener);
                    }
                    if (!connEntry.hasListeners(channelName)) {
                        PublishSubscribeService.this.unsubscribe(channelName, lock);
                    } else {
                        lock.release();
                    }
                } else {
                    lock.release();
                }
            }
        });
        this.connectionManager.newTimeout(new TimerTask(){

            @Override
            public void run(Timeout timeout) throws Exception {
                if (promise.tryFailure(new RedisTimeoutException())) {
                    subscribeFuture.cancel(false);
                }
            }
        }, this.config.getRetryInterval(), TimeUnit.MILLISECONDS);
    }

    private void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) {
        MasterSlaveEntry entry = this.connectionManager.getEntry(slot);
        if (entry == null) {
            log.error("Node for slot: " + slot + " can't be found");
        } else {
            entry.returnPubSubConnection(pubSubEntry);
        }
    }

    private RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) {
        MasterSlaveEntry entry = this.connectionManager.getEntry(slot);
        if (entry == null) {
            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for slot: " + slot + " hasn't been discovered yet");
            return RedissonPromise.newFailedFuture(ex);
        }
        return entry.nextPubSubConnection();
    }

    private void connect(final Codec codec, final ChannelName channelName, final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?> ... listeners) {
        final int slot = this.connectionManager.calcSlot(channelName.getName());
        final RFuture<RedisPubSubConnection> connFuture = this.nextPubSubConnection(slot);
        promise.addListener(new FutureListener<PubSubConnectionEntry>(){

            @Override
            public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
                if (!future.isSuccess()) {
                    ((RPromise)connFuture).tryFailure(future.cause());
                }
            }
        });
        connFuture.addListener(new FutureListener<RedisPubSubConnection>(){

            @Override
            public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
                if (!future.isSuccess()) {
                    PublishSubscribeService.this.freePubSubLock.release();
                    lock.release();
                    promise.tryFailure(future.cause());
                    return;
                }
                RedisPubSubConnection conn = future.getNow();
                PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, PublishSubscribeService.this.config.getSubscriptionsPerConnection());
                entry.tryAcquire();
                PubSubConnectionEntry oldEntry = PublishSubscribeService.this.name2PubSubConnection.putIfAbsent(channelName, entry);
                if (oldEntry != null) {
                    PublishSubscribeService.this.releaseSubscribeConnection(slot, entry);
                    PublishSubscribeService.this.freePubSubLock.release();
                    PublishSubscribeService.this.subscribe(channelName, promise, type, lock, oldEntry, listeners);
                    return;
                }
                PublishSubscribeService.this.freePubSubConnections.add(entry);
                PublishSubscribeService.this.freePubSubLock.release();
                PublishSubscribeService.this.subscribe(channelName, promise, type, lock, entry, listeners);
                if (PubSubType.PSUBSCRIBE == type) {
                    entry.psubscribe(codec, channelName);
                } else {
                    entry.subscribe(codec, channelName);
                }
            }
        });
    }

    public RFuture<Void> unsubscribe(final ChannelName channelName, final AsyncSemaphore lock) {
        final PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.remove(channelName);
        if (entry == null || this.connectionManager.isShuttingDown()) {
            lock.release();
            return RedissonPromise.newSucceededFuture(null);
        }
        final RedissonPromise<Void> result = new RedissonPromise<Void>();
        entry.unsubscribe(channelName, new BaseRedisPubSubListener(){

            @Override
            public boolean onStatus(PubSubType type, CharSequence channel) {
                if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
                    if (entry.release() == 1) {
                        PublishSubscribeService.this.freePubSubConnections.add(entry);
                    }
                    lock.release();
                    result.trySuccess(null);
                    return true;
                }
                return false;
            }
        });
        return result;
    }

    public RFuture<Codec> unsubscribe(final ChannelName channelName, final PubSubType topicType) {
        if (this.connectionManager.isShuttingDown()) {
            return RedissonPromise.newSucceededFuture(null);
        }
        final RedissonPromise<Codec> result = new RedissonPromise<Codec>();
        final AsyncSemaphore lock = this.getSemaphore(channelName);
        lock.acquire(new Runnable(){

            @Override
            public void run() {
                final PubSubConnectionEntry entry = (PubSubConnectionEntry)PublishSubscribeService.this.name2PubSubConnection.remove(channelName);
                if (entry == null) {
                    lock.release();
                    result.trySuccess(null);
                    return;
                }
                PublishSubscribeService.this.freePubSubLock.acquire(new Runnable(){

                    @Override
                    public void run() {
                        PublishSubscribeService.this.freePubSubConnections.remove(entry);
                        PublishSubscribeService.this.freePubSubLock.release();
                        final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
                        BaseRedisPubSubListener listener = new BaseRedisPubSubListener(){

                            @Override
                            public boolean onStatus(PubSubType type, CharSequence channel) {
                                if (type == topicType && channel.equals(channelName)) {
                                    lock.release();
                                    result.trySuccess(entryCodec);
                                    return true;
                                }
                                return false;
                            }
                        };
                        if (topicType == PubSubType.PUNSUBSCRIBE) {
                            entry.punsubscribe(channelName, listener);
                        } else {
                            entry.unsubscribe(channelName, listener);
                        }
                    }
                });
            }
        });
        return result;
    }

    public void punsubscribe(final ChannelName channelName, final AsyncSemaphore lock) {
        final PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.remove(channelName);
        if (entry == null) {
            lock.release();
            return;
        }
        entry.punsubscribe(channelName, new BaseRedisPubSubListener(){

            @Override
            public boolean onStatus(PubSubType type, CharSequence channel) {
                if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
                    if (entry.release() == 1) {
                        PublishSubscribeService.this.freePubSubConnections.add(entry);
                    }
                    lock.release();
                    return true;
                }
                return false;
            }
        });
    }

    public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
        Collection<RedisPubSubListener<?>> listeners;
        PubSubConnectionEntry pubSubEntry;
        for (ChannelName channelName : redisPubSubConnection.getChannels().keySet()) {
            pubSubEntry = this.getPubSubEntry(channelName);
            listeners = pubSubEntry.getListeners(channelName);
            this.reattachPubSubListeners(channelName, listeners, PubSubType.UNSUBSCRIBE);
        }
        for (ChannelName channelName : redisPubSubConnection.getPatternChannels().keySet()) {
            pubSubEntry = this.getPubSubEntry(channelName);
            listeners = pubSubEntry.getListeners(channelName);
            this.reattachPubSubListeners(channelName, listeners, PubSubType.PUNSUBSCRIBE);
        }
    }

    private void reattachPubSubListeners(final ChannelName channelName, final Collection<RedisPubSubListener<?>> listeners, final PubSubType topicType) {
        RFuture<Codec> subscribeCodec = this.unsubscribe(channelName, topicType);
        if (listeners.isEmpty()) {
            return;
        }
        subscribeCodec.addListener(new FutureListener<Codec>(){

            @Override
            public void operationComplete(Future<Codec> future) throws Exception {
                if (future.get() == null) {
                    return;
                }
                Codec subscribeCodec = (Codec)future.get();
                if (topicType == PubSubType.PUNSUBSCRIBE) {
                    PublishSubscribeService.this.psubscribe(channelName, listeners, subscribeCodec);
                } else {
                    PublishSubscribeService.this.subscribe(channelName, listeners, subscribeCodec);
                }
            }
        });
    }

    private void subscribe(final ChannelName channelName, final Collection<RedisPubSubListener<?>> listeners, final Codec subscribeCodec) {
        RFuture<PubSubConnectionEntry> subscribeFuture = this.subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()]));
        subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>(){

            @Override
            public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
                if (!future.isSuccess()) {
                    PublishSubscribeService.this.subscribe(channelName, listeners, subscribeCodec);
                    return;
                }
                log.info("listeners of '{}' channel to '{}' have been resubscribed", (Object)channelName, (Object)future.getNow().getConnection().getRedisClient());
            }
        });
    }

    private void psubscribe(final ChannelName channelName, final Collection<RedisPubSubListener<?>> listeners, final Codec subscribeCodec) {
        RFuture<PubSubConnectionEntry> subscribeFuture = this.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()]));
        subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>(){

            @Override
            public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
                if (!future.isSuccess()) {
                    PublishSubscribeService.this.psubscribe(channelName, listeners, subscribeCodec);
                    return;
                }
                log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", (Object)channelName, (Object)future.getNow().getConnection().getRedisClient());
            }
        });
    }
}

