/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.connection;

import io.netty.resolver.AddressResolver;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.SentinelServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.CountableListener;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SentinelConnectionManager
extends MasterSlaveConnectionManager {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap();
    private final AtomicReference<String> currentMaster = new AtomicReference();
    private final Set<String> slaves = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap());
    private final Set<URI> disconnectedSlaves = new HashSet<URI>();
    private String masterName;
    private ScheduledFuture<?> monitorFuture;
    private AddressResolver<InetSocketAddress> sentinelResolver;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
        super(config);
        if (cfg.getMasterName() == null) {
            throw new IllegalArgumentException("masterName parameter is not defined!");
        }
        this.masterName = cfg.getMasterName();
        this.config = this.create(cfg);
        this.initTimer(this.config);
        this.sentinelResolver = this.resolverGroup.getResolver(this.getGroup().next());
        for (URI addr : cfg.getSentinelAddresses()) {
            RedisClient client = this.createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts(), null);
            try {
                RedisConnection connection = client.connect();
                if (!connection.isActive()) continue;
                List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
                String masterHost = this.createAddress(master.get(0), master.get(1));
                this.config.setMasterAddress(masterHost);
                this.currentMaster.set(masterHost);
                this.log.info("master: {} added", (Object)masterHost);
                this.slaves.add(masterHost);
                List sentinelSlaves = (List)connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
                for (Map map : sentinelSlaves) {
                    if (map.isEmpty()) continue;
                    String ip = (String)map.get("ip");
                    String string = (String)map.get("port");
                    String flags = (String)map.get("flags");
                    String host = this.createAddress(ip, string);
                    this.config.addSlaveAddress(host);
                    this.slaves.add(host);
                    this.log.debug("slave {} state: {}", (Object)host, (Object)map);
                    this.log.info("slave: {} added", (Object)host);
                    if (!flags.contains("s_down") && !flags.contains("disconnected")) continue;
                    URI uri = URIBuilder.create(host);
                    this.disconnectedSlaves.add(uri);
                    this.log.warn("slave: {} is down", (Object)host);
                }
                List sentinelSentinels = (List)connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
                ArrayList<RFuture<Void>> connectionFutures = new ArrayList<RFuture<Void>>(sentinelSentinels.size());
                for (Map map : sentinelSentinels) {
                    if (map.isEmpty()) continue;
                    String ip = (String)map.get("ip");
                    String port = (String)map.get("port");
                    String host = this.createAddress(ip, port);
                    URI sentinelAddr = URIBuilder.create(host);
                    RFuture<Void> future = this.registerSentinel(sentinelAddr, this.config);
                    connectionFutures.add(future);
                }
                for (RFuture rFuture : connectionFutures) {
                    rFuture.awaitUninterruptibly(this.config.getConnectTimeout());
                }
                break;
            }
            catch (RedisConnectionException e) {
                this.log.warn("Can't connect to sentinel server. {}", (Object)e.getMessage());
            }
            finally {
                client.shutdownAsync();
            }
        }
        if (this.currentMaster.get() == null) {
            this.stopThreads();
            throw new RedisConnectionException("Can't connect to servers!");
        }
        this.initSingleEntry();
        this.scheduleChangeCheck(cfg, null);
    }

    @Override
    protected void startDNSMonitoring(RedisClient masterHost) {
        if (this.config.getDnsMonitoringInterval() == -1L) {
            return;
        }
        this.scheduleSentinelDNSCheck();
    }

    protected void scheduleSentinelDNSCheck() {
        this.monitorFuture = this.group.schedule(new Runnable(){

            @Override
            public void run() {
                ArrayList sentinels = new ArrayList(SentinelConnectionManager.this.sentinels.values());
                final AtomicInteger sentinelsCounter = new AtomicInteger(sentinels.size());
                FutureListener<List<InetSocketAddress>> commonListener = new FutureListener<List<InetSocketAddress>>(){

                    @Override
                    public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
                        if (sentinelsCounter.decrementAndGet() == 0) {
                            SentinelConnectionManager.this.scheduleSentinelDNSCheck();
                        }
                    }
                };
                for (final RedisClient client : sentinels) {
                    Future allNodes = SentinelConnectionManager.this.sentinelResolver.resolveAll(InetSocketAddress.createUnresolved(client.getAddr().getHostName(), client.getAddr().getPort()));
                    allNodes.addListener(new FutureListener<List<InetSocketAddress>>(){

                        @Override
                        public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
                            if (!future.isSuccess()) {
                                SentinelConnectionManager.this.log.error("Unable to resolve " + client.getAddr().getHostName(), future.cause());
                                return;
                            }
                            boolean clientFound = false;
                            for (InetSocketAddress addr : future.getNow()) {
                                boolean found = false;
                                for (RedisClient currentSentinel : SentinelConnectionManager.this.sentinels.values()) {
                                    if (!currentSentinel.getAddr().getAddress().getHostAddress().equals(addr.getAddress().getHostAddress()) || currentSentinel.getAddr().getPort() != addr.getPort()) continue;
                                    found = true;
                                    break;
                                }
                                if (!found) {
                                    URI uri = SentinelConnectionManager.this.convert(addr.getAddress().getHostAddress(), "" + addr.getPort());
                                    SentinelConnectionManager.this.registerSentinel(uri, SentinelConnectionManager.this.getConfig());
                                }
                                if (!client.getAddr().getAddress().getHostAddress().equals(addr.getAddress().getHostAddress()) || client.getAddr().getPort() != addr.getPort()) continue;
                                clientFound = true;
                            }
                            if (!clientFound) {
                                String addr = client.getAddr().getAddress().getHostAddress() + ":" + client.getAddr().getPort();
                                RedisClient sentinel = (RedisClient)SentinelConnectionManager.this.sentinels.remove(addr);
                                if (sentinel != null) {
                                    sentinel.shutdownAsync();
                                    SentinelConnectionManager.this.log.warn("sentinel: {} has down", (Object)addr);
                                }
                            }
                        }
                    });
                    allNodes.addListener(commonListener);
                }
            }
        }, this.config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS);
    }

    private void scheduleChangeCheck(final SentinelServersConfig cfg, final Iterator<RedisClient> iterator) {
        this.monitorFuture = this.group.schedule(new Runnable(){

            @Override
            public void run() {
                AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
                Iterator<RedisClient> iter = iterator;
                if (iter == null) {
                    iter = SentinelConnectionManager.this.sentinels.values().iterator();
                }
                SentinelConnectionManager.this.checkState(cfg, iter, lastException);
            }
        }, (long)cfg.getScanInterval(), TimeUnit.MILLISECONDS);
    }

    protected void checkState(final SentinelServersConfig cfg, final Iterator<RedisClient> iterator, final AtomicReference<Throwable> lastException) {
        if (!iterator.hasNext()) {
            this.log.error("Can't update cluster state", lastException.get());
            this.scheduleChangeCheck(cfg, null);
            return;
        }
        if (!this.getShutdownLatch().acquire()) {
            return;
        }
        RedisClient client = iterator.next();
        RFuture<RedisConnection> connectionFuture = this.connectToNode(null, null, client, null);
        connectionFuture.addListener(new FutureListener<RedisConnection>(){

            @Override
            public void operationComplete(Future<RedisConnection> future) throws Exception {
                if (!future.isSuccess()) {
                    lastException.set(future.cause());
                    SentinelConnectionManager.this.getShutdownLatch().release();
                    SentinelConnectionManager.this.checkState(cfg, iterator, lastException);
                    return;
                }
                RedisConnection connection = future.getNow();
                SentinelConnectionManager.this.updateState(cfg, connection, iterator);
            }
        });
    }

    protected void updateState(final SentinelServersConfig cfg, final RedisConnection connection, final Iterator<RedisClient> iterator) {
        final AtomicInteger commands = new AtomicInteger(2);
        FutureListener<Object> commonListener = new FutureListener<Object>(){
            private final AtomicBoolean failed = new AtomicBoolean();

            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (commands.decrementAndGet() == 0) {
                    SentinelConnectionManager.this.getShutdownLatch().release();
                    if (this.failed.get()) {
                        SentinelConnectionManager.this.scheduleChangeCheck(cfg, iterator);
                    } else {
                        SentinelConnectionManager.this.scheduleChangeCheck(cfg, null);
                    }
                }
                if (!future.isSuccess() && this.failed.compareAndSet(false, true)) {
                    SentinelConnectionManager.this.log.error("Can't execute SENTINEL commands on " + connection.getRedisClient().getAddr(), future.cause());
                    SentinelConnectionManager.this.closeNodeConnection(connection);
                }
            }
        };
        RFuture masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
        masterFuture.addListener(new FutureListener<List<String>>(){

            @Override
            public void operationComplete(Future<List<String>> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                List<String> master = future.getNow();
                String current = (String)SentinelConnectionManager.this.currentMaster.get();
                String newMaster = SentinelConnectionManager.this.createAddress(master.get(0), master.get(1));
                if (!newMaster.equals(current) && SentinelConnectionManager.this.currentMaster.compareAndSet(current, newMaster)) {
                    SentinelConnectionManager.this.changeMaster(SentinelConnectionManager.this.singleSlotRange.getStartSlot(), URIBuilder.create(newMaster));
                }
            }
        });
        masterFuture.addListener(commonListener);
        if (!this.config.checkSkipSlavesInit()) {
            RFuture slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
            commands.incrementAndGet();
            slavesFuture.addListener(new FutureListener<List<Map<String, String>>>(){

                @Override
                public void operationComplete(Future<List<Map<String, String>>> future) throws Exception {
                    if (!future.isSuccess()) {
                        return;
                    }
                    List<Map<String, String>> slavesMap = future.getNow();
                    final HashSet<String> currentSlaves = new HashSet<String>(slavesMap.size());
                    ArrayList<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
                    for (Map<String, String> map : slavesMap) {
                        if (map.isEmpty()) continue;
                        String string = map.get("ip");
                        String port = map.get("port");
                        String flags = map.get("flags");
                        String masterHost = map.get("master-host");
                        String masterPort = map.get("master-port");
                        if (flags.contains("s_down") || flags.contains("disconnected")) {
                            SentinelConnectionManager.this.slaveDown(string, port);
                            continue;
                        }
                        if (!SentinelConnectionManager.this.isUseSameMaster(string, port, masterHost, masterPort)) continue;
                        String slaveAddr = SentinelConnectionManager.this.createAddress(string, port);
                        currentSlaves.add(slaveAddr);
                        RFuture<Void> slaveFuture = SentinelConnectionManager.this.addSlave(string, port, slaveAddr);
                        futures.add(slaveFuture);
                    }
                    CountableListener<Void> listener = new CountableListener<Void>(){

                        @Override
                        protected void onSuccess(Void value) {
                            HashSet removedSlaves = new HashSet(SentinelConnectionManager.this.slaves);
                            removedSlaves.removeAll(currentSlaves);
                            for (String slave : removedSlaves) {
                                SentinelConnectionManager.this.slaves.remove(slave);
                                String[] parts = slave.replace("redis://", "").split(":");
                                SentinelConnectionManager.this.slaveDown(parts[0], parts[1]);
                            }
                        }
                    };
                    listener.setCounter(futures.size());
                    for (RFuture rFuture : futures) {
                        rFuture.addListener(listener);
                    }
                }
            });
            slavesFuture.addListener(commonListener);
        }
        RFuture sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
        sentinelsFuture.addListener(new FutureListener<List<Map<String, String>>>(){

            @Override
            public void operationComplete(Future<List<Map<String, String>>> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                List<Map<String, String>> list = future.getNow();
                for (Map<String, String> map : list) {
                    if (map.isEmpty()) continue;
                    String ip = map.get("ip");
                    String port = map.get("port");
                    URI sentinelAddr = SentinelConnectionManager.this.convert(ip, port);
                    SentinelConnectionManager.this.registerSentinel(sentinelAddr, SentinelConnectionManager.this.getConfig());
                }
            }
        });
        sentinelsFuture.addListener(commonListener);
    }

    private String createAddress(String host, Object port) {
        if (host.contains(":")) {
            host = "[" + host + "]";
        }
        return "redis://" + host + ":" + port;
    }

    @Override
    protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, HashSet<ClusterSlotRange> slots) {
        MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
        List<RFuture<Void>> fs = entry.initSlaveBalancer(this.disconnectedSlaves);
        for (RFuture<Void> future : fs) {
            future.syncUninterruptibly();
        }
        return entry;
    }

    private RFuture<Void> registerSentinel(final URI addr, final MasterSlaveServersConfig c) {
        String key = addr.getHost() + ":" + addr.getPort();
        RedisClient client = (RedisClient)this.sentinels.get(key);
        if (client != null) {
            return RedissonPromise.newSucceededFuture(null);
        }
        client = this.createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts(), null);
        RedisClient oldClient = this.sentinels.putIfAbsent(key, client);
        if (oldClient != null) {
            return RedissonPromise.newSucceededFuture(null);
        }
        RFuture<RedisPubSubConnection> pubsubFuture = client.connectPubSubAsync();
        pubsubFuture.addListener(new FutureListener<RedisPubSubConnection>(){

            @Override
            public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
                if (!future.isSuccess()) {
                    SentinelConnectionManager.this.log.warn("Can't connect to sentinel: {}", (Object)addr);
                    return;
                }
                RedisPubSubConnection pubsub = future.getNow();
                pubsub.addListener(new BaseRedisPubSubListener(){

                    @Override
                    public void onMessage(String channel, Object msg) {
                        SentinelConnectionManager.this.log.debug("message {} from {}", msg, (Object)channel);
                        if ("+sentinel".equals(channel)) {
                            SentinelConnectionManager.this.onSentinelAdded((String)msg, c);
                        }
                        if ("+slave".equals(channel)) {
                            SentinelConnectionManager.this.onSlaveAdded(addr, (String)msg);
                        }
                        if ("+sdown".equals(channel)) {
                            SentinelConnectionManager.this.onNodeDown(addr, (String)msg);
                        }
                        if ("-sdown".equals(channel)) {
                            SentinelConnectionManager.this.onNodeUp(addr, (String)msg);
                        }
                        if ("+switch-master".equals(channel)) {
                            SentinelConnectionManager.this.onMasterChange(addr, (String)msg);
                        }
                    }

                    @Override
                    public boolean onStatus(PubSubType type, String channel) {
                        if (type == PubSubType.SUBSCRIBE) {
                            SentinelConnectionManager.this.log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
                        }
                        return true;
                    }
                });
                pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel");
                SentinelConnectionManager.this.log.info("sentinel: {}:{} added", (Object)addr.getHost(), (Object)addr.getPort());
            }
        });
        return RedissonPromise.newSucceededFuture(null);
    }

    protected void onSentinelAdded(String msg, MasterSlaveServersConfig c) {
        String[] parts = msg.split(" ");
        if ("sentinel".equals(parts[0])) {
            String ip = parts[2];
            String port = parts[3];
            URI uri = this.convert(ip, port);
            this.registerSentinel(uri, c);
        }
    }

    protected void onSlaveAdded(URI addr, String msg) {
        String[] parts = msg.split(" ");
        if (parts.length > 4 && "slave".equals(parts[0])) {
            String ip = parts[2];
            String port = parts[3];
            if (!this.isUseSameMaster(parts)) {
                return;
            }
            String slaveAddr = this.createAddress(ip, port);
            this.addSlave(ip, port, slaveAddr);
        } else {
            this.log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
        }
    }

    protected RFuture<Void> addSlave(final String ip, final String port, final String slaveAddr) {
        final RedissonPromise<Void> result = new RedissonPromise<Void>();
        final MasterSlaveEntry entry = this.getEntry(this.singleSlotRange.getStartSlot());
        final URI uri = this.convert(ip, port);
        if (this.slaves.add(slaveAddr) && !this.config.checkSkipSlavesInit()) {
            RFuture<Void> future = entry.addSlave(URIBuilder.create(slaveAddr));
            future.addListener(new FutureListener<Void>(){

                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    if (!future.isSuccess()) {
                        SentinelConnectionManager.this.slaves.remove(slaveAddr);
                        result.tryFailure(future.cause());
                        SentinelConnectionManager.this.log.error("Can't add slave: " + slaveAddr, future.cause());
                        return;
                    }
                    if (entry.isSlaveUnfreezed(uri) || entry.slaveUp(uri, ClientConnectionsEntry.FreezeReason.MANAGER)) {
                        String slaveAddr2 = ip + ":" + port;
                        SentinelConnectionManager.this.log.info("slave: {} added", (Object)slaveAddr2);
                        result.trySuccess(null);
                    }
                }
            });
        } else {
            if (entry.hasSlave(uri)) {
                this.slaveUp(ip, port);
            }
            result.trySuccess(null);
        }
        return result;
    }

    protected URI convert(String ip, String port) {
        String addr = this.createAddress(ip, port);
        URI uri = URIBuilder.create(addr);
        return uri;
    }

    private void onNodeDown(URI sentinelAddr, String msg) {
        String[] parts = msg.split(" ");
        if (parts.length > 3) {
            if ("slave".equals(parts[0])) {
                String ip = parts[2];
                String port = parts[3];
                this.slaveDown(ip, port);
            } else if ("sentinel".equals(parts[0])) {
                String ip = parts[2];
                String port = parts[3];
                String addr = ip + ":" + port;
                RedisClient sentinel = (RedisClient)this.sentinels.remove(addr);
                if (sentinel != null) {
                    sentinel.shutdownAsync();
                    this.log.warn("sentinel: {} has down", (Object)addr);
                }
            } else if ("master".equals(parts[0])) {
                String ip = parts[2];
                String string = parts[3];
            }
        } else {
            this.log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());
        }
    }

    private void slaveDown(String ip, String port) {
        if (this.config.checkSkipSlavesInit()) {
            this.log.warn("slave: {}:{} has down", (Object)ip, (Object)port);
        } else {
            URI uri;
            MasterSlaveEntry entry = this.getEntry(this.singleSlotRange.getStartSlot());
            if (entry.slaveDown(uri = this.convert(ip, port), ClientConnectionsEntry.FreezeReason.MANAGER)) {
                this.log.warn("slave: {}:{} has down", (Object)ip, (Object)port);
            }
        }
    }

    private boolean isUseSameMaster(String[] parts) {
        return this.isUseSameMaster(parts[2], parts[3], parts[6], parts[7]);
    }

    protected boolean isUseSameMaster(String slaveIp, String slavePort, String slaveMasterHost, String slaveMasterPort) {
        String slaveMaster;
        String master = this.currentMaster.get();
        if (!master.equals(slaveMaster = this.createAddress(slaveMasterHost, slaveMasterPort))) {
            this.log.warn("Skipped slave up {} for master {} differs from current {}", slaveIp + ":" + slavePort, slaveMaster, master);
            return false;
        }
        return true;
    }

    private void onNodeUp(URI addr, String msg) {
        String[] parts = msg.split(" ");
        if (parts.length > 3) {
            if ("slave".equals(parts[0])) {
                String ip = parts[2];
                String port = parts[3];
                if (!this.isUseSameMaster(parts)) {
                    return;
                }
                this.slaveUp(ip, port);
            } else if ("master".equals(parts[0])) {
                String ip = parts[2];
                String port = parts[3];
                URI uri = this.convert(ip, port);
                MasterSlaveEntry entry = this.getEntry(this.singleSlotRange.getStartSlot());
                if (entry.isFreezed() && URIBuilder.compare(entry.getClient().getAddr(), uri)) {
                    entry.unfreeze();
                    String masterAddr = ip + ":" + port;
                    this.log.info("master: {} has up", (Object)masterAddr);
                }
            } else {
                this.log.warn("onSlaveUp. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
            }
        }
    }

    private void slaveUp(String ip, String port) {
        if (this.config.checkSkipSlavesInit()) {
            String slaveAddr = ip + ":" + port;
            this.log.info("slave: {} has up", (Object)slaveAddr);
            return;
        }
        URI uri = this.convert(ip, port);
        if (this.getEntry(this.singleSlotRange.getStartSlot()).slaveUp(uri, ClientConnectionsEntry.FreezeReason.MANAGER)) {
            String slaveAddr = ip + ":" + port;
            this.log.info("slave: {} has up", (Object)slaveAddr);
        }
    }

    private void onMasterChange(URI addr, String msg) {
        String[] parts = msg.split(" ");
        if (parts.length > 3) {
            if (this.masterName.equals(parts[0])) {
                String ip = parts[3];
                String port = parts[4];
                String current = this.currentMaster.get();
                String newMaster = this.createAddress(ip, port);
                if (!newMaster.equals(current) && this.currentMaster.compareAndSet(current, newMaster)) {
                    this.changeMaster(this.singleSlotRange.getStartSlot(), URIBuilder.create(newMaster));
                }
            }
        } else {
            this.log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
        }
    }

    @Override
    protected MasterSlaveServersConfig create(BaseMasterSlaveServersConfig<?> cfg) {
        MasterSlaveServersConfig res = super.create(cfg);
        res.setDatabase(((SentinelServersConfig)cfg).getDatabase());
        return res;
    }

    @Override
    public void shutdown() {
        this.monitorFuture.cancel(true);
        ArrayList<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
        for (RedisClient redisClient : this.sentinels.values()) {
            RFuture<Void> future = redisClient.shutdownAsync();
            futures.add(future);
        }
        for (RFuture rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
        super.shutdown();
    }
}

