/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.connection;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.PlatformDependent;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.redisson.Version;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandSyncService;
import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionEventsHub;
import org.redisson.connection.ConnectionInitializer;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.IdleConnectionWatcher;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.connection.RedisClientEntry;
import org.redisson.connection.SingleEntry;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.TransferListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MasterSlaveConnectionManager
implements ConnectionManager {
    private final Timeout dummyTimeout = new Timeout(){

        @Override
        public Timer timer() {
            return null;
        }

        @Override
        public TimerTask task() {
            return null;
        }

        @Override
        public boolean isExpired() {
            return false;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean cancel() {
            return false;
        }
    };
    public static final int MAX_SLOT = 16384;
    protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, 16383);
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private HashedWheelTimer timer;
    protected Codec codec;
    protected EventLoopGroup group;
    protected ConnectionInitializer connectListener = new DefaultConnectionListener();
    protected Class<? extends SocketChannel> socketChannelClass;
    protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
    protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
    protected MasterSlaveServersConfig config;
    private final Map<Integer, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap();
    private final RPromise<Boolean> shutdownPromise;
    private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
    private final Set<RedisClientEntry> clients = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap());
    private IdleConnectionWatcher connectionWatcher;
    private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub();
    private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
    private final ExecutorService executor;
    private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
    private final boolean sharedEventLoopGroup;
    private final boolean sharedExecutor;
    private final CommandSyncService commandExecutor;

    public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
        this(config);
        this.initTimer(cfg);
        this.init(cfg);
    }

    public MasterSlaveConnectionManager(Config cfg) {
        for (int i = 0; i < this.locks.length; ++i) {
            this.locks[i] = new AsyncSemaphore(1);
        }
        Version.logVersion();
        if (cfg.isUseLinuxNativeEpoll()) {
            this.group = cfg.getEventLoopGroup() == null ? new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")) : cfg.getEventLoopGroup();
            this.socketChannelClass = EpollSocketChannel.class;
        } else {
            this.group = cfg.getEventLoopGroup() == null ? new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")) : cfg.getEventLoopGroup();
            this.socketChannelClass = NioSocketChannel.class;
        }
        if (cfg.getExecutor() == null) {
            int threads = Runtime.getRuntime().availableProcessors() * 2;
            if (cfg.getThreads() != 0) {
                threads = cfg.getThreads();
            }
            this.executor = Executors.newFixedThreadPool(threads, new DefaultThreadFactory("redisson"));
        } else {
            this.executor = cfg.getExecutor();
        }
        this.codec = cfg.getCodec();
        this.shutdownPromise = this.newPromise();
        this.sharedEventLoopGroup = cfg.getEventLoopGroup() != null;
        this.sharedExecutor = cfg.getExecutor() != null;
        this.commandExecutor = new CommandSyncService(this);
    }

    @Override
    public boolean isClusterMode() {
        return false;
    }

    @Override
    public CommandSyncService getCommandExecutor() {
        return this.commandExecutor;
    }

    @Override
    public IdleConnectionWatcher getConnectionWatcher() {
        return this.connectionWatcher;
    }

    @Override
    public MasterSlaveServersConfig getConfig() {
        return this.config;
    }

    @Override
    public Codec getCodec() {
        return this.codec;
    }

    @Override
    public Set<MasterSlaveEntry> getEntrySet() {
        return new HashSet<MasterSlaveEntry>(this.entries.values());
    }

    protected void init(MasterSlaveServersConfig config) {
        this.config = config;
        this.connectionWatcher = new IdleConnectionWatcher(this, config);
        try {
            this.initEntry(config);
        }
        catch (RuntimeException e) {
            this.stopThreads();
            throw e;
        }
    }

    protected void initTimer(MasterSlaveServersConfig config) {
        int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};
        Arrays.sort(timeouts);
        int minTimeout = timeouts[0];
        minTimeout = minTimeout % 100 != 0 ? minTimeout % 100 / 2 : (minTimeout == 100 ? 50 : 100);
        this.timer = new HashedWheelTimer(Executors.defaultThreadFactory(), minTimeout, TimeUnit.MILLISECONDS, 1024);
        try {
            Field leakField = HashedWheelTimer.class.getDeclaredField("leak");
            leakField.setAccessible(true);
            leakField.set(this.timer, null);
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    @Override
    public ConnectionInitializer getConnectListener() {
        return this.connectListener;
    }

    protected void initEntry(MasterSlaveServersConfig config) {
        MasterSlaveEntry entry;
        HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
        slots.add(this.singleSlotRange);
        if (config.getReadMode() == ReadMode.MASTER) {
            entry = new SingleEntry(slots, this, config);
            RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
            f.syncUninterruptibly();
        } else {
            entry = this.createMasterSlaveEntry(config, slots);
        }
        for (int slot = this.singleSlotRange.getStartSlot(); slot < this.singleSlotRange.getEndSlot() + 1; ++slot) {
            this.addEntry(slot, entry);
        }
    }

    protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, HashSet<ClusterSlotRange> slots) {
        MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
        List<RFuture<Void>> fs = entry.initSlaveBalancer(Collections.emptySet());
        for (RFuture<Void> future : fs) {
            future.syncUninterruptibly();
        }
        RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
        f.syncUninterruptibly();
        return entry;
    }

    protected MasterSlaveServersConfig create(BaseMasterSlaveServersConfig<?> cfg) {
        MasterSlaveServersConfig c = new MasterSlaveServersConfig();
        c.setRetryInterval(cfg.getRetryInterval());
        c.setRetryAttempts(cfg.getRetryAttempts());
        c.setTimeout(cfg.getTimeout());
        c.setPingTimeout(cfg.getPingTimeout());
        c.setLoadBalancer(cfg.getLoadBalancer());
        c.setPassword(cfg.getPassword());
        c.setClientName(cfg.getClientName());
        c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
        c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
        c.setSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
        c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
        c.setConnectTimeout(cfg.getConnectTimeout());
        c.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
        c.setFailedAttempts(cfg.getFailedAttempts());
        c.setReconnectionTimeout(cfg.getReconnectionTimeout());
        c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize());
        c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize());
        c.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());
        c.setReadMode(cfg.getReadMode());
        c.setSubscriptionMode(cfg.getSubscriptionMode());
        return c;
    }

    @Override
    public RedisClient createClient(NodeType type, String host, int port) {
        RedisClient client = this.createClient(host, port, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts());
        this.clients.add(new RedisClientEntry(client, this.commandExecutor, type));
        return client;
    }

    @Override
    public void shutdownAsync(RedisClient client) {
        this.clients.remove(new RedisClientEntry(client, this.commandExecutor, null));
        client.shutdownAsync();
    }

    @Override
    public RedisClient createClient(String host, int port, int timeout, int commandTimeout) {
        return new RedisClient(this.timer, this.executor, this.group, this.socketChannelClass, host, port, timeout, commandTimeout);
    }

    @Override
    public int calcSlot(String key) {
        return this.singleSlotRange.getStartSlot();
    }

    @Override
    public PubSubConnectionEntry getPubSubEntry(String channelName) {
        return (PubSubConnectionEntry)this.name2PubSubConnection.get(channelName);
    }

    @Override
    public RFuture<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec, final RedisPubSubListener<?> listener) {
        final AsyncSemaphore lock = this.getSemaphore(channelName);
        final RPromise<PubSubConnectionEntry> result = this.newPromise();
        lock.acquire(new Runnable(){

            @Override
            public void run() {
                RFuture<PubSubConnectionEntry> future = MasterSlaveConnectionManager.this.psubscribe(channelName, codec, listener, lock);
                future.addListener(new TransferListener(result));
            }
        });
        return result;
    }

    @Override
    public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener, AsyncSemaphore semaphore) {
        RPromise<PubSubConnectionEntry> promise = this.newPromise();
        this.subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE, semaphore);
        return promise;
    }

    @Override
    public RFuture<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener) {
        final AsyncSemaphore lock = this.getSemaphore(channelName);
        final RPromise<PubSubConnectionEntry> result = this.newPromise();
        lock.acquire(new Runnable(){

            @Override
            public void run() {
                RFuture<PubSubConnectionEntry> future = MasterSlaveConnectionManager.this.subscribe(codec, channelName, listener, lock);
                future.addListener(new TransferListener(result));
            }
        });
        return result;
    }

    @Override
    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, AsyncSemaphore semaphore) {
        RPromise<PubSubConnectionEntry> promise = this.newPromise();
        this.subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore);
        return promise;
    }

    @Override
    public AsyncSemaphore getSemaphore(String channelName) {
        return this.locks[Math.abs(channelName.hashCode() % this.locks.length)];
    }

    private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener, final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
        final PubSubConnectionEntry connEntry = (PubSubConnectionEntry)this.name2PubSubConnection.get(channelName);
        if (connEntry != null) {
            connEntry.addListener(channelName, listener);
            connEntry.getSubscribeFuture(channelName, type).addListener((GenericFutureListener<Future<Void>>)new FutureListener<Void>(){

                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    lock.release();
                    promise.trySuccess(connEntry);
                }
            });
            return;
        }
        this.freePubSubLock.acquire(new Runnable(){

            @Override
            public void run() {
                if (promise.isDone()) {
                    return;
                }
                final PubSubConnectionEntry freeEntry = MasterSlaveConnectionManager.this.freePubSubConnections.peek();
                if (freeEntry == null) {
                    MasterSlaveConnectionManager.this.connect(codec, channelName, listener, promise, type, lock);
                    return;
                }
                int remainFreeAmount = freeEntry.tryAcquire();
                if (remainFreeAmount == -1) {
                    throw new IllegalStateException();
                }
                final PubSubConnectionEntry oldEntry = MasterSlaveConnectionManager.this.name2PubSubConnection.putIfAbsent(channelName, freeEntry);
                if (oldEntry != null) {
                    freeEntry.release();
                    MasterSlaveConnectionManager.this.freePubSubLock.release();
                    oldEntry.addListener(channelName, listener);
                    oldEntry.getSubscribeFuture(channelName, type).addListener((GenericFutureListener<Future<Void>>)new FutureListener<Void>(){

                        @Override
                        public void operationComplete(Future<Void> future) throws Exception {
                            lock.release();
                            promise.trySuccess(oldEntry);
                        }
                    });
                    return;
                }
                if (remainFreeAmount == 0) {
                    MasterSlaveConnectionManager.this.freePubSubConnections.poll();
                }
                MasterSlaveConnectionManager.this.freePubSubLock.release();
                freeEntry.addListener(channelName, listener);
                freeEntry.getSubscribeFuture(channelName, type).addListener((GenericFutureListener<Future<Void>>)new FutureListener<Void>(){

                    @Override
                    public void operationComplete(Future<Void> future) throws Exception {
                        lock.release();
                        promise.trySuccess(freeEntry);
                    }
                });
                if (PubSubType.PSUBSCRIBE == type) {
                    freeEntry.psubscribe(codec, channelName);
                } else {
                    freeEntry.subscribe(codec, channelName);
                }
            }
        });
    }

    private void connect(final Codec codec, final String channelName, final RedisPubSubListener<?> listener, final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
        final int slot = this.calcSlot(channelName);
        RFuture<RedisPubSubConnection> connFuture = this.nextPubSubConnection(slot);
        connFuture.addListener(new FutureListener<RedisPubSubConnection>(){

            @Override
            public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
                if (!future.isSuccess()) {
                    MasterSlaveConnectionManager.this.freePubSubLock.release();
                    lock.release();
                    promise.tryFailure(future.cause());
                    return;
                }
                RedisPubSubConnection conn = future.getNow();
                final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, MasterSlaveConnectionManager.this.config.getSubscriptionsPerConnection());
                entry.tryAcquire();
                final PubSubConnectionEntry oldEntry = MasterSlaveConnectionManager.this.name2PubSubConnection.putIfAbsent(channelName, entry);
                if (oldEntry != null) {
                    MasterSlaveConnectionManager.this.releaseSubscribeConnection(slot, entry);
                    MasterSlaveConnectionManager.this.freePubSubLock.release();
                    oldEntry.addListener(channelName, listener);
                    oldEntry.getSubscribeFuture(channelName, type).addListener((GenericFutureListener<Future<Void>>)new FutureListener<Void>(){

                        @Override
                        public void operationComplete(Future<Void> future) throws Exception {
                            lock.release();
                            promise.trySuccess(oldEntry);
                        }
                    });
                    return;
                }
                MasterSlaveConnectionManager.this.freePubSubConnections.add(entry);
                MasterSlaveConnectionManager.this.freePubSubLock.release();
                entry.addListener(channelName, listener);
                entry.getSubscribeFuture(channelName, type).addListener((GenericFutureListener<Future<Void>>)new FutureListener<Void>(){

                    @Override
                    public void operationComplete(Future<Void> future) throws Exception {
                        lock.release();
                        promise.trySuccess(entry);
                    }
                });
                if (PubSubType.PSUBSCRIBE == type) {
                    entry.psubscribe(codec, channelName);
                } else {
                    entry.subscribe(codec, channelName);
                }
            }
        });
    }

    @Override
    public Codec unsubscribe(final String channelName, final AsyncSemaphore lock) {
        final PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.remove(channelName);
        if (entry == null) {
            lock.release();
            return null;
        }
        Codec entryCodec = entry.getConnection().getChannels().get(channelName);
        entry.unsubscribe(channelName, new BaseRedisPubSubListener(){

            @Override
            public boolean onStatus(PubSubType type, String channel) {
                if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
                    if (entry.release() == 1) {
                        MasterSlaveConnectionManager.this.freePubSubConnections.add(entry);
                    }
                    lock.release();
                    return true;
                }
                return false;
            }
        });
        return entryCodec;
    }

    @Override
    public RFuture<Codec> unsubscribe(final String channelName, boolean temporaryDown) {
        PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.remove(channelName);
        if (entry == null) {
            return null;
        }
        this.freePubSubConnections.remove(entry);
        final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
        if (temporaryDown) {
            final RPromise<Codec> result = this.newPromise();
            entry.unsubscribe(channelName, new BaseRedisPubSubListener(){

                @Override
                public boolean onStatus(PubSubType type, String channel) {
                    if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
                        result.trySuccess(entryCodec);
                        return true;
                    }
                    return false;
                }
            });
            return result;
        }
        entry.unsubscribe(channelName, null);
        return this.newSucceededFuture(entryCodec);
    }

    @Override
    public Codec punsubscribe(final String channelName, final AsyncSemaphore lock) {
        final PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.remove(channelName);
        if (entry == null) {
            lock.release();
            return null;
        }
        Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName);
        entry.punsubscribe(channelName, new BaseRedisPubSubListener(){

            @Override
            public boolean onStatus(PubSubType type, String channel) {
                if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
                    if (entry.release() == 1) {
                        MasterSlaveConnectionManager.this.freePubSubConnections.add(entry);
                    }
                    lock.release();
                    return true;
                }
                return false;
            }
        });
        return entryCodec;
    }

    @Override
    public RFuture<Codec> punsubscribe(final String channelName, boolean temporaryDown) {
        PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.remove(channelName);
        if (entry == null) {
            return null;
        }
        this.freePubSubConnections.remove(entry);
        final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
        if (temporaryDown) {
            final RPromise<Codec> result = this.newPromise();
            entry.punsubscribe(channelName, new BaseRedisPubSubListener(){

                @Override
                public boolean onStatus(PubSubType type, String channel) {
                    if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
                        result.trySuccess(entryCodec);
                        return true;
                    }
                    return false;
                }
            });
            return result;
        }
        entry.punsubscribe(channelName, null);
        return this.newSucceededFuture(entryCodec);
    }

    @Override
    public MasterSlaveEntry getEntry(InetSocketAddress addr) {
        for (Map.Entry<Integer, MasterSlaveEntry> entry : this.entries.entrySet()) {
            if (!entry.getValue().getClient().getAddr().equals(addr)) continue;
            return entry.getValue();
        }
        return null;
    }

    @Override
    public MasterSlaveEntry getEntry(int slot) {
        return this.entries.get(slot);
    }

    protected void slaveDown(ClusterSlotRange slotRange, String host, int port, ClientConnectionsEntry.FreezeReason freezeReason) {
        this.getEntry(slotRange.getStartSlot()).slaveDown(host, port, freezeReason);
    }

    protected void changeMaster(int slot, String host, int port) {
        this.getEntry(slot).changeMaster(host, port);
    }

    protected void addEntry(Integer slot, MasterSlaveEntry entry) {
        this.entries.put(slot, entry);
    }

    protected MasterSlaveEntry removeMaster(Integer slot) {
        return this.entries.remove(slot);
    }

    @Override
    public RFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
        MasterSlaveEntry entry = source.getEntry();
        if (entry == null) {
            entry = this.getEntry(source);
        }
        return entry.connectionWriteOp(command);
    }

    private MasterSlaveEntry getEntry(NodeSource source) {
        if (source.getRedirect() != null) {
            MasterSlaveEntry e = this.getEntry(source.getAddr());
            if (e == null) {
                throw new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
            }
            return e;
        }
        MasterSlaveEntry e = this.getEntry(source.getSlot());
        if (e == null) {
            throw new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
        }
        return e;
    }

    @Override
    public RFuture<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {
        MasterSlaveEntry entry = source.getEntry();
        if (entry == null && source.getSlot() != null) {
            entry = this.getEntry(source.getSlot());
        }
        if (source.getAddr() != null) {
            entry = this.getEntry(source.getAddr());
            if (entry == null) {
                for (MasterSlaveEntry e : this.getEntrySet()) {
                    if (!e.hasSlave(source.getAddr())) continue;
                    entry = e;
                    break;
                }
            }
            return entry.connectionReadOp(command, source.getAddr());
        }
        return entry.connectionReadOp(command);
    }

    RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) {
        return this.getEntry(slot).nextPubSubConnection();
    }

    protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry entry) {
        this.getEntry(slot).returnPubSubConnection(entry);
    }

    @Override
    public void releaseWrite(NodeSource source, RedisConnection connection) {
        MasterSlaveEntry entry = source.getEntry();
        if (entry == null) {
            entry = this.getEntry(source);
        }
        entry.releaseWrite(connection);
    }

    @Override
    public void releaseRead(NodeSource source, RedisConnection connection) {
        MasterSlaveEntry entry = source.getEntry();
        if (entry == null) {
            entry = this.getEntry(source);
        }
        entry.releaseRead(connection);
    }

    @Override
    public void shutdown() {
        this.shutdown(2L, 15L, TimeUnit.SECONDS);
    }

    @Override
    public void shutdown(long quietPeriod, long timeout, TimeUnit unit) {
        this.shutdownLatch.close();
        this.shutdownPromise.trySuccess(true);
        this.shutdownLatch.awaitUninterruptibly();
        for (MasterSlaveEntry entry : this.entries.values()) {
            entry.shutdown();
        }
        if (!this.sharedExecutor) {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(timeout, unit);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        if (!this.sharedEventLoopGroup) {
            this.group.shutdownGracefully(quietPeriod, timeout, unit).syncUninterruptibly();
        }
        this.timer.stop();
    }

    @Override
    public boolean isShuttingDown() {
        return this.shutdownLatch.isClosed();
    }

    @Override
    public boolean isShutdown() {
        return this.group.isTerminated();
    }

    @Override
    public Collection<RedisClientEntry> getClients() {
        return Collections.unmodifiableCollection(this.clients);
    }

    @Override
    public <R> RPromise<R> newPromise() {
        return new RedissonPromise();
    }

    @Override
    public <R> RFuture<R> newSucceededFuture(R value) {
        return RedissonPromise.newSucceededFuture(value);
    }

    @Override
    public <R> RFuture<R> newFailedFuture(Throwable cause) {
        return RedissonPromise.newFailedFuture(cause);
    }

    @Override
    public EventLoopGroup getGroup() {
        return this.group;
    }

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        try {
            return this.timer.newTimeout(task, delay, unit);
        }
        catch (IllegalStateException e) {
            return this.dummyTimeout;
        }
    }

    @Override
    public InfinitySemaphoreLatch getShutdownLatch() {
        return this.shutdownLatch;
    }

    @Override
    public RFuture<Boolean> getShutdownPromise() {
        return this.shutdownPromise;
    }

    @Override
    public ConnectionEventsHub getConnectionEventsHub() {
        return this.connectionEventsHub;
    }

    protected void stopThreads() {
        this.timer.stop();
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(15L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.group.shutdownGracefully().syncUninterruptibly();
    }

    @Override
    public ExecutorService getExecutor() {
        return this.executor;
    }

    @Override
    public URL getLastClusterNode() {
        return null;
    }
}

