/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.pubsub;

import io.netty.util.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.redisson.PubSubPatternStatusListener;
import org.redisson.PubSubStatusListener;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisException;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.AsyncSemaphore;
import org.redisson.pubsub.CountDownLatchPubSub;
import org.redisson.pubsub.LockPubSub;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.SemaphorePubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishSubscribeService {
    private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);
    private final ConnectionManager connectionManager;
    private final MasterSlaveServersConfig config;
    private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
    private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
    private final Map<ChannelName, Collection<PubSubConnectionEntry>> name2entry = new ConcurrentHashMap<ChannelName, Collection<PubSubConnectionEntry>>();
    private final ConcurrentMap<PubSubKey, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<PubSubKey, PubSubConnectionEntry>();
    private final ConcurrentMap<MasterSlaveEntry, PubSubEntry> entry2PubSubConnection = new ConcurrentHashMap<MasterSlaveEntry, PubSubEntry>();
    private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
    private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this);
    private final LockPubSub lockPubSub = new LockPubSub(this);
    private boolean shardingSupported = false;

    public PublishSubscribeService(ConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
        this.config = connectionManager.getServiceManager().getConfig();
        for (int i = 0; i < this.locks.length; ++i) {
            this.locks[i] = new AsyncSemaphore(1);
        }
    }

    public LockPubSub getLockPubSub() {
        return this.lockPubSub;
    }

    public CountDownLatchPubSub getCountDownLatchPubSub() {
        return this.countDownLatchPubSub;
    }

    public SemaphorePubSub getSemaphorePubSub() {
        return this.semaphorePubSub;
    }

    public int countListeners(ChannelName channelName) {
        Collection entries = this.name2entry.getOrDefault(channelName, Collections.emptySet());
        Iterator it = entries.iterator();
        if (it.hasNext()) {
            return ((PubSubConnectionEntry)it.next()).countListeners(channelName);
        }
        return 0;
    }

    public boolean hasEntry(ChannelName channelName) {
        return this.name2entry.containsKey(channelName);
    }

    public CompletableFuture<Collection<PubSubConnectionEntry>> psubscribe(ChannelName channelName, Codec codec, RedisPubSubListener<?> ... listeners) {
        if (this.isMultiEntity(channelName)) {
            Collection<MasterSlaveEntry> entrySet = this.connectionManager.getEntrySet();
            final AtomicInteger statusCounter = new AtomicInteger(entrySet.size());
            RedisPubSubListener[] ls = (RedisPubSubListener[])Arrays.stream(listeners).map(l -> {
                if (l instanceof PubSubPatternStatusListener) {
                    return new PubSubPatternStatusListener((PubSubPatternStatusListener)l){

                        @Override
                        public void onStatus(PubSubType type, CharSequence channel) {
                            if (statusCounter.decrementAndGet() == 0) {
                                super.onStatus(type, channel);
                            }
                        }
                    };
                }
                return l;
            }).toArray(RedisPubSubListener[]::new);
            ArrayList<CompletableFuture<PubSubConnectionEntry>> futures = new ArrayList<CompletableFuture<PubSubConnectionEntry>>();
            for (MasterSlaveEntry entry : entrySet) {
                CompletableFuture<PubSubConnectionEntry> future = this.subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, null, ls);
                futures.add(future);
            }
            CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            return future.thenApply(r -> futures.stream().map(v -> v.getNow(null)).collect(Collectors.toList()));
        }
        MasterSlaveEntry entry = this.getEntry(channelName);
        if (entry == null) {
            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
            CompletableFuture<Collection<PubSubConnectionEntry>> promise = new CompletableFuture<Collection<PubSubConnectionEntry>>();
            promise.completeExceptionally(ex);
            return promise;
        }
        CompletableFuture<PubSubConnectionEntry> f = this.subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, null, listeners);
        return f.thenApply(res -> Collections.singletonList(res));
    }

    public boolean isMultiEntity(ChannelName channelName) {
        return this.connectionManager.isClusterMode() && (channelName.toString().startsWith("__keyspace@") || channelName.toString().startsWith("__keyevent@"));
    }

    public CompletableFuture<PubSubConnectionEntry> subscribe(MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, Codec codec, ChannelName channelName, RedisPubSubListener<?> ... listeners) {
        return this.subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, clientEntry, listeners);
    }

    public CompletableFuture<List<PubSubConnectionEntry>> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?> ... listeners) {
        if (this.isMultiEntity(channelName)) {
            Collection<MasterSlaveEntry> entrySet = this.connectionManager.getEntrySet();
            final AtomicInteger statusCounter = new AtomicInteger(entrySet.size());
            RedisPubSubListener[] ls = (RedisPubSubListener[])Arrays.stream(listeners).map(l -> {
                if (l instanceof PubSubStatusListener) {
                    return new PubSubStatusListener(((PubSubStatusListener)l).getListener(), ((PubSubStatusListener)l).getName()){

                        @Override
                        public void onStatus(PubSubType type, CharSequence channel) {
                            if (statusCounter.decrementAndGet() == 0) {
                                super.onStatus(type, channel);
                            }
                        }
                    };
                }
                return l;
            }).toArray(RedisPubSubListener[]::new);
            ArrayList<CompletableFuture<PubSubConnectionEntry>> futures = new ArrayList<CompletableFuture<PubSubConnectionEntry>>();
            for (MasterSlaveEntry entry : entrySet) {
                CompletableFuture<PubSubConnectionEntry> future = this.subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, ls);
                futures.add(future);
            }
            CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            return future.thenApply(r -> futures.stream().map(v -> v.getNow(null)).collect(Collectors.toList()));
        }
        MasterSlaveEntry entry = this.getEntry(channelName);
        if (entry == null) {
            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
            CompletableFuture<List<PubSubConnectionEntry>> promise = new CompletableFuture<List<PubSubConnectionEntry>>();
            promise.completeExceptionally(ex);
            return promise;
        }
        CompletableFuture<PubSubConnectionEntry> f = this.subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, listeners);
        return f.thenApply(res -> Collections.singletonList(res));
    }

    public CompletableFuture<PubSubConnectionEntry> ssubscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?> ... listeners) {
        MasterSlaveEntry entry = this.getEntry(channelName);
        if (entry == null) {
            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
            CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<PubSubConnectionEntry>();
            promise.completeExceptionally(ex);
            return promise;
        }
        return this.subscribe(PubSubType.SSUBSCRIBE, codec, channelName, entry, null, listeners);
    }

    private CompletableFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName, MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, RedisPubSubListener<?> ... listeners) {
        CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<PubSubConnectionEntry>();
        AsyncSemaphore lock = this.getSemaphore(channelName);
        int timeout = this.config.getSubscriptionTimeout();
        long start = System.nanoTime();
        Timeout lockTimeout = this.connectionManager.getServiceManager().newTimeout(t -> promise.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + timeout + "ms. Try to increase 'timeout', 'subscriptionsPerConnection', 'subscriptionConnectionPoolSize' parameters.")), timeout, TimeUnit.MILLISECONDS);
        lock.acquire().thenAccept(r -> {
            if (!lockTimeout.cancel() || promise.isDone()) {
                lock.release();
                return;
            }
            long newTimeout = (long)timeout - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
            this.subscribeNoTimeout(codec, channelName, entry, clientEntry, promise, type, lock, new AtomicInteger(), listeners);
            this.timeout(promise, newTimeout);
        });
        return promise;
    }

    CompletableFuture<PubSubConnectionEntry> subscribeNoTimeout(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?> ... listeners) {
        MasterSlaveEntry entry = this.getEntry(new ChannelName(channelName));
        if (entry == null) {
            CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<PubSubConnectionEntry>();
            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
            promise.completeExceptionally(ex);
            return promise;
        }
        PubSubType type = this.shardingSupported ? PubSubType.SSUBSCRIBE : PubSubType.SUBSCRIBE;
        CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<PubSubConnectionEntry>();
        this.subscribeNoTimeout(codec, new ChannelName(channelName), entry, null, promise, type, semaphore, new AtomicInteger(), listeners);
        return promise;
    }

    AsyncSemaphore getSemaphore(ChannelName channelName) {
        return this.locks[Math.abs(channelName.hashCode() % this.locks.length)];
    }

    void timeout(CompletableFuture<?> promise) {
        this.timeout(promise, this.config.getSubscriptionTimeout());
    }

    void timeout(CompletableFuture<?> promise, long timeout) {
        Timeout task = this.connectionManager.getServiceManager().newTimeout(t -> promise.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + timeout + "ms. Try to increase 'subscriptionTimeout', 'subscriptionsPerConnection', 'subscriptionConnectionPoolSize' parameters.")), timeout, TimeUnit.MILLISECONDS);
        promise.whenComplete((r, e) -> task.cancel());
    }

    private void trySubscribe(Codec codec, ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?> ... listeners) {
        if (attempts.get() == this.config.getRetryAttempts()) {
            lock.release();
            MasterSlaveEntry entry = this.getEntry(channelName);
            if (entry == null) {
                RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
                promise.completeExceptionally(ex);
                return;
            }
            promise.completeExceptionally(new RedisTimeoutException("Unable to acquire connection for subscription after " + attempts.get() + " attempts. Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
            return;
        }
        attempts.incrementAndGet();
        MasterSlaveEntry entry = this.getEntry(channelName);
        if (entry == null) {
            this.connectionManager.getServiceManager().newTimeout(tt -> this.trySubscribe(codec, channelName, promise, type, lock, attempts, listeners), this.config.getRetryInterval(), TimeUnit.MILLISECONDS);
            return;
        }
        this.subscribeNoTimeout(codec, channelName, entry, null, promise, type, lock, attempts, listeners);
    }

    private void subscribeNoTimeout(Codec codec, ChannelName channelName, MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, CompletableFuture<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?> ... listeners) {
        PubSubConnectionEntry connEntry = (PubSubConnectionEntry)this.name2PubSubConnection.get(new PubSubKey(channelName, entry));
        if (connEntry != null) {
            connEntry.addListeners(channelName, promise, type, lock, listeners);
            return;
        }
        this.freePubSubLock.acquire().thenAccept(c -> {
            if (promise.isDone()) {
                lock.release();
                this.freePubSubLock.release();
                return;
            }
            PubSubEntry freePubSubConnections = this.entry2PubSubConnection.getOrDefault(entry, new PubSubEntry());
            PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek();
            if (freeEntry == null) {
                this.freePubSubLock.release();
                this.connect(codec, channelName, entry, clientEntry, promise, type, lock, attempts, listeners);
                return;
            }
            int remainFreeAmount = freeEntry.tryAcquire();
            if (remainFreeAmount == -1) {
                throw new IllegalStateException();
            }
            PubSubKey key = new PubSubKey(channelName, entry);
            PubSubConnectionEntry oldEntry = this.name2PubSubConnection.putIfAbsent(key, freeEntry);
            if (oldEntry != null) {
                freeEntry.release();
                this.freePubSubLock.release();
                oldEntry.addListeners(channelName, promise, type, lock, listeners);
                return;
            }
            Collection coll = this.name2entry.computeIfAbsent(channelName, k -> Collections.newSetFromMap(new ConcurrentHashMap()));
            coll.add(freeEntry);
            if (remainFreeAmount == 0) {
                freePubSubConnections.getEntries().poll();
            }
            this.freePubSubLock.release();
            freeEntry.subscribe(codec, channelName, promise, type, lock, listeners);
        });
    }

    private MasterSlaveEntry getEntry(ChannelName channelName) {
        int slot = this.connectionManager.calcSlot(channelName.getName());
        return this.connectionManager.getWriteEntry(slot);
    }

    private void connect(Codec codec, ChannelName channelName, MasterSlaveEntry msEntry, ClientConnectionsEntry clientEntry, CompletableFuture<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?> ... listeners) {
        CompletableFuture<RedisPubSubConnection> connFuture = msEntry.nextPubSubConnection(clientEntry);
        this.connectionManager.getServiceManager().newTimeout(t -> {
            if (!connFuture.cancel(false) && !connFuture.isCompletedExceptionally()) {
                return;
            }
            this.trySubscribe(codec, channelName, promise, type, lock, attempts, listeners);
        }, this.config.getRetryInterval(), TimeUnit.MILLISECONDS);
        promise.whenComplete((res, e) -> {
            if (e != null) {
                connFuture.completeExceptionally((Throwable)e);
            }
        });
        connFuture.thenAccept(conn -> this.freePubSubLock.acquire().thenAccept(c -> {
            PubSubConnectionEntry entry = new PubSubConnectionEntry((RedisPubSubConnection)conn, this.connectionManager, msEntry);
            int remainFreeAmount = entry.tryAcquire();
            PubSubKey key = new PubSubKey(channelName, msEntry);
            PubSubConnectionEntry oldEntry = this.name2PubSubConnection.putIfAbsent(key, entry);
            if (oldEntry != null) {
                msEntry.returnPubSubConnection(entry.getConnection());
                this.freePubSubLock.release();
                oldEntry.addListeners(channelName, promise, type, lock, listeners);
                return;
            }
            Collection coll = this.name2entry.computeIfAbsent(channelName, k -> Collections.newSetFromMap(new ConcurrentHashMap()));
            coll.add(entry);
            if (remainFreeAmount > 0) {
                PubSubEntry psEntry = this.entry2PubSubConnection.computeIfAbsent(msEntry, e -> new PubSubEntry());
                psEntry.getEntries().add(entry);
            }
            this.freePubSubLock.release();
            entry.subscribe(codec, channelName, promise, type, lock, listeners);
        }));
    }

    CompletableFuture<Void> unsubscribeLocked(ChannelName channelName) {
        Collection<PubSubConnectionEntry> coll = this.name2entry.get(channelName);
        if (coll == null || coll.isEmpty()) {
            RedisException ex = new RedisException("Channel: " + channelName + " is not registered");
            CompletableFuture<Void> promise = new CompletableFuture<Void>();
            promise.completeExceptionally(ex);
            return promise;
        }
        PubSubType topicType = PubSubType.UNSUBSCRIBE;
        if (this.shardingSupported) {
            topicType = PubSubType.SUNSUBSCRIBE;
        }
        return this.unsubscribeLocked(topicType, channelName, coll.iterator().next());
    }

    CompletableFuture<Void> unsubscribeLocked(final PubSubType topicType, final ChannelName channelName, final PubSubConnectionEntry ce) {
        this.name2PubSubConnection.remove(new PubSubKey(channelName, ce.getEntry()));
        this.remove(channelName, ce);
        final CompletableFuture<Void> result = new CompletableFuture<Void>();
        BaseRedisPubSubListener listener = new BaseRedisPubSubListener(){

            @Override
            public void onStatus(PubSubType type, CharSequence channel) {
                if (type == topicType && channel.equals(channelName)) {
                    PublishSubscribeService.this.freePubSubLock.acquire().thenAccept(c -> {
                        PublishSubscribeService.this.release(ce);
                        PublishSubscribeService.this.freePubSubLock.release();
                        result.complete(null);
                    });
                }
            }
        };
        ce.unsubscribe(topicType, channelName, listener);
        return result;
    }

    private void remove(ChannelName channelName, PubSubConnectionEntry entry) {
        Collection<PubSubConnectionEntry> ee = this.name2entry.get(channelName);
        if (ee == null) {
            return;
        }
        ee.remove(entry);
        if (ee.isEmpty()) {
            this.name2entry.remove(channelName);
        }
    }

    private void release(PubSubConnectionEntry entry) {
        entry.release();
        if (entry.isFree()) {
            PubSubEntry ee = (PubSubEntry)this.entry2PubSubConnection.get(entry.getEntry());
            if (ee != null) {
                ee.getEntries().remove(entry);
            }
            entry.getEntry().returnPubSubConnection(entry.getConnection());
            return;
        }
        PubSubEntry ee = this.entry2PubSubConnection.computeIfAbsent(entry.getEntry(), e -> new PubSubEntry());
        if (!ee.getEntries().contains(entry)) {
            ee.getEntries().add(entry);
        }
    }

    public void remove(MasterSlaveEntry entry) {
        this.entry2PubSubConnection.remove(entry);
        this.name2entry.values().removeIf(v -> {
            v.removeIf(e -> e.getEntry().equals(entry));
            return v.isEmpty();
        });
    }

    public CompletableFuture<Codec> unsubscribe(ChannelName channelName, PubSubType topicType) {
        Collection<PubSubConnectionEntry> coll = this.name2entry.get(channelName);
        if (coll == null || coll.isEmpty()) {
            RedisException ex = new RedisException("Channel: " + channelName + " is not registered");
            CompletableFuture<Codec> promise = new CompletableFuture<Codec>();
            promise.completeExceptionally(ex);
            return promise;
        }
        return this.unsubscribe(channelName, coll.iterator().next(), topicType);
    }

    CompletableFuture<Codec> unsubscribe(ChannelName channelName, PubSubConnectionEntry entry, PubSubType topicType) {
        if (this.connectionManager.getServiceManager().isShuttingDown()) {
            return CompletableFuture.completedFuture(null);
        }
        AsyncSemaphore lock = this.getSemaphore(channelName);
        CompletableFuture<Void> f = lock.acquire();
        return f.thenCompose(v -> {
            Codec entryCodec = topicType == PubSubType.PUNSUBSCRIBE ? entry.getConnection().getPatternChannels().get(channelName) : (topicType == PubSubType.SUNSUBSCRIBE ? entry.getConnection().getShardedChannels().get(channelName) : entry.getConnection().getChannels().get(channelName));
            CompletableFuture<Void> result = this.unsubscribeLocked(topicType, channelName, entry);
            return result.thenApply(r -> {
                lock.release();
                return entryCodec;
            });
        });
    }

    public void reattachPubSub(int slot) {
        this.name2PubSubConnection.entrySet().stream().filter(e -> this.connectionManager.calcSlot(((PubSubKey)e.getKey()).getChannelName().getName()) == slot).forEach(entry -> {
            Codec patternCodec;
            Codec scodec;
            PubSubConnectionEntry pubSubEntry = (PubSubConnectionEntry)entry.getValue();
            Codec codec = pubSubEntry.getConnection().getChannels().get(((PubSubKey)entry.getKey()).getChannelName());
            if (codec != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(((PubSubKey)entry.getKey()).getChannelName());
                this.unsubscribe(((PubSubKey)entry.getKey()).getChannelName(), pubSubEntry, PubSubType.UNSUBSCRIBE);
                this.subscribe(codec, ((PubSubKey)entry.getKey()).getChannelName(), listeners.toArray(new RedisPubSubListener[0]));
            }
            if ((scodec = pubSubEntry.getConnection().getShardedChannels().get(((PubSubKey)entry.getKey()).getChannelName())) != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(((PubSubKey)entry.getKey()).getChannelName());
                this.unsubscribe(((PubSubKey)entry.getKey()).getChannelName(), pubSubEntry, PubSubType.SUNSUBSCRIBE);
                this.subscribe(codec, ((PubSubKey)entry.getKey()).getChannelName(), listeners.toArray(new RedisPubSubListener[0]));
            }
            if ((patternCodec = pubSubEntry.getConnection().getPatternChannels().get(((PubSubKey)entry.getKey()).getChannelName())) != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(((PubSubKey)entry.getKey()).getChannelName());
                this.unsubscribe(((PubSubKey)entry.getKey()).getChannelName(), pubSubEntry, PubSubType.PUNSUBSCRIBE);
                this.psubscribe(((PubSubKey)entry.getKey()).getChannelName(), patternCodec, listeners.toArray(new RedisPubSubListener[0]));
            }
        });
    }

    public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
        MasterSlaveEntry en = this.connectionManager.getEntry(redisPubSubConnection.getRedisClient());
        if (en == null) {
            return;
        }
        this.reattachPubSubListeners(redisPubSubConnection.getChannels().keySet(), en, PubSubType.UNSUBSCRIBE);
        this.reattachPubSubListeners(redisPubSubConnection.getShardedChannels().keySet(), en, PubSubType.SUNSUBSCRIBE);
        this.reattachPubSubListeners(redisPubSubConnection.getPatternChannels().keySet(), en, PubSubType.PUNSUBSCRIBE);
    }

    private void reattachPubSubListeners(Set<ChannelName> channels, MasterSlaveEntry en, PubSubType topicType) {
        for (ChannelName channelName : channels) {
            PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.get(new PubSubKey(channelName, en));
            if (entry == null) continue;
            Queue<RedisPubSubListener<?>> listeners = entry.getListeners(channelName);
            CompletableFuture<Codec> subscribeCodecFuture = this.unsubscribe(channelName, entry, topicType);
            if (listeners.isEmpty()) continue;
            subscribeCodecFuture.whenComplete((subscribeCodec, e) -> {
                if (subscribeCodec == null) {
                    return;
                }
                if (topicType == PubSubType.PUNSUBSCRIBE) {
                    this.psubscribe(en, channelName, (Collection<RedisPubSubListener<?>>)listeners, (Codec)subscribeCodec);
                } else if (topicType == PubSubType.SUNSUBSCRIBE) {
                    this.ssubscribe(channelName, (Collection<RedisPubSubListener<?>>)listeners, (Codec)subscribeCodec);
                } else {
                    this.subscribe(channelName, (Collection<RedisPubSubListener<?>>)listeners, (Codec)subscribeCodec);
                }
            });
        }
    }

    private void subscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, Codec subscribeCodec) {
        MasterSlaveEntry entry = this.getEntry(channelName);
        if (this.isMultiEntity(channelName)) {
            entry = this.connectionManager.getEntrySet().stream().filter(e -> !this.name2PubSubConnection.containsKey(new PubSubKey(channelName, (MasterSlaveEntry)e))).findFirst().orElse(null);
        }
        CompletableFuture<PubSubConnectionEntry> subscribeFuture = entry != null ? this.subscribe(PubSubType.SUBSCRIBE, subscribeCodec, channelName, entry, null, listeners.toArray(new RedisPubSubListener[0])) : this.subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[0])).thenApply(r -> (PubSubConnectionEntry)r.iterator().next());
        subscribeFuture.whenComplete((res, e) -> {
            if (e != null) {
                this.connectionManager.getServiceManager().newTimeout(task -> this.subscribe(channelName, listeners, subscribeCodec), 1L, TimeUnit.SECONDS);
                return;
            }
            log.info("listeners of '{}' channel have been resubscribed to '{}'", (Object)channelName, res);
        });
    }

    private void ssubscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, Codec subscribeCodec) {
        CompletableFuture<PubSubConnectionEntry> subscribeFuture = this.ssubscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[0]));
        subscribeFuture.whenComplete((res, e) -> {
            if (e != null) {
                this.connectionManager.getServiceManager().newTimeout(task -> this.ssubscribe(channelName, listeners, subscribeCodec), 1L, TimeUnit.SECONDS);
                return;
            }
            log.info("listeners of '{}' channel have been resubscribed to '{}'", (Object)channelName, res);
        });
    }

    private void psubscribe(MasterSlaveEntry oldEntry, ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, Codec subscribeCodec) {
        MasterSlaveEntry entry = this.getEntry(channelName);
        if (this.isMultiEntity(channelName)) {
            entry = this.connectionManager.getEntrySet().stream().filter(e -> !this.name2PubSubConnection.containsKey(new PubSubKey(channelName, (MasterSlaveEntry)e)) && e != oldEntry).findFirst().orElse(null);
        }
        if (entry == null) {
            this.connectionManager.getServiceManager().newTimeout(task -> this.psubscribe(oldEntry, channelName, listeners, subscribeCodec), 1L, TimeUnit.SECONDS);
            return;
        }
        CompletableFuture<PubSubConnectionEntry> subscribeFuture = this.subscribe(PubSubType.PSUBSCRIBE, subscribeCodec, channelName, entry, null, listeners.toArray(new RedisPubSubListener[0]));
        subscribeFuture.whenComplete((res, e) -> {
            if (e != null) {
                this.connectionManager.getServiceManager().newTimeout(task -> this.psubscribe(oldEntry, channelName, listeners, subscribeCodec), 1L, TimeUnit.SECONDS);
                return;
            }
            log.info("listeners of '{}' channel-pattern have been resubscribed to '{}'", (Object)channelName, res);
        });
    }

    public CompletableFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, EventListener listener) {
        return this.removeListenerAsync(type, channelName, (PubSubConnectionEntry entry) -> entry.removeListener(channelName, listener));
    }

    public CompletableFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, Integer ... listenerIds) {
        return this.removeListenerAsync(type, channelName, (PubSubConnectionEntry entry) -> {
            Integer[] integerArray = listenerIds;
            int n = integerArray.length;
            for (int i = 0; i < n; ++i) {
                int id = integerArray[i];
                entry.removeListener(channelName, id);
            }
        });
    }

    private CompletableFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, Consumer<PubSubConnectionEntry> consumer) {
        if (!this.name2entry.containsKey(channelName)) {
            return CompletableFuture.completedFuture(null);
        }
        AsyncSemaphore semaphore = this.getSemaphore(channelName);
        CompletableFuture<Void> sf = semaphore.acquire();
        int timeout = this.config.getSubscriptionTimeout();
        this.connectionManager.getServiceManager().newTimeout(t -> sf.completeExceptionally(new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + channelName + " topic")), timeout, TimeUnit.MILLISECONDS);
        return sf.thenCompose(res -> {
            Collection<PubSubConnectionEntry> entries = this.name2entry.get(channelName);
            if (entries == null || entries.isEmpty()) {
                semaphore.release();
                return CompletableFuture.completedFuture(null);
            }
            ArrayList<CompletableFuture<Object>> futures = new ArrayList<CompletableFuture<Object>>(entries.size());
            for (PubSubConnectionEntry entry : entries) {
                consumer.accept(entry);
                CompletionStage<Object> f = !entry.hasListeners(channelName) ? this.unsubscribeLocked(type, channelName, entry).exceptionally(ex -> null) : CompletableFuture.completedFuture(null);
                futures.add((CompletableFuture<Object>)f);
            }
            CompletableFuture<Void> ff = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            return ff.whenComplete((v, e) -> semaphore.release());
        });
    }

    public CompletableFuture<Void> removeAllListenersAsync(PubSubType type, ChannelName channelName) {
        if (!this.name2entry.containsKey(channelName)) {
            return CompletableFuture.completedFuture(null);
        }
        AsyncSemaphore semaphore = this.getSemaphore(channelName);
        CompletableFuture<Void> sf = semaphore.acquire();
        int timeout = this.config.getSubscriptionTimeout();
        this.connectionManager.getServiceManager().newTimeout(t -> sf.completeExceptionally(new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + channelName + " topic")), timeout, TimeUnit.MILLISECONDS);
        CompletionStage f = sf.thenCompose(r -> {
            Collection entries = this.name2entry.getOrDefault(channelName, Collections.emptySet());
            if (entries.isEmpty()) {
                semaphore.release();
                return CompletableFuture.completedFuture(null);
            }
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
            for (PubSubConnectionEntry entry : entries) {
                if (!entry.hasListeners(channelName)) continue;
                CompletableFuture<Void> ff = this.unsubscribeLocked(type, channelName, entry);
                futures.add(ff);
            }
            if (!futures.isEmpty()) {
                return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((res, e) -> semaphore.release());
            }
            semaphore.release();
            return CompletableFuture.completedFuture(null);
        });
        return f;
    }

    public void setShardingSupported(boolean value) {
        this.shardingSupported = value;
    }

    public boolean isShardingSupported() {
        return this.shardingSupported;
    }

    public String getPublishCommand() {
        if (this.shardingSupported) {
            return RedisCommands.SPUBLISH.getName();
        }
        return RedisCommands.PUBLISH.getName();
    }

    public String toString() {
        return "PublishSubscribeService [name2PubSubConnection=" + this.name2PubSubConnection + ", entry2PubSubConnection=" + this.entry2PubSubConnection + "]";
    }

    public static class PubSubKey {
        private final ChannelName channelName;
        private final MasterSlaveEntry entry;

        public PubSubKey(ChannelName channelName, MasterSlaveEntry entry) {
            this.channelName = channelName;
            this.entry = entry;
        }

        public ChannelName getChannelName() {
            return this.channelName;
        }

        public MasterSlaveEntry getEntry() {
            return this.entry;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PubSubKey key = (PubSubKey)o;
            return Objects.equals(this.channelName, key.channelName) && Objects.equals(this.entry, key.entry);
        }

        public int hashCode() {
            return Objects.hash(this.channelName, this.entry);
        }

        public String toString() {
            return "PubSubKey{channelName=" + this.channelName + ", entry=" + this.entry + '}';
        }
    }

    public static class PubSubEntry {
        Queue<PubSubConnectionEntry> entries = new ConcurrentLinkedQueue<PubSubConnectionEntry>();

        public Queue<PubSubConnectionEntry> getEntries() {
            return this.entries;
        }
    }
}

