/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.failover;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisAsyncCommandsImpl;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisConnectionStateListener;
import io.lettuce.core.RedisReactiveCommandsImpl;
import io.lettuce.core.RedisURI;
import io.lettuce.core.annotations.Experimental;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.push.PushListener;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.failover.DatabaseFactory;
import io.lettuce.core.failover.MultiDbFutureSyncInvocationHandler;
import io.lettuce.core.failover.RedisDatabaseDeferredCompletion;
import io.lettuce.core.failover.RedisDatabaseImpl;
import io.lettuce.core.failover.api.CircuitBreakerStateChangeEvent;
import io.lettuce.core.failover.api.DatabaseConfig;
import io.lettuce.core.failover.api.ImmutableRedisURI;
import io.lettuce.core.failover.api.MultiDbOptions;
import io.lettuce.core.failover.api.RedisNoHealthyDatabaseException;
import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection;
import io.lettuce.core.failover.event.AllDatabasesUnhealthyEvent;
import io.lettuce.core.failover.event.DatabaseSwitchEvent;
import io.lettuce.core.failover.event.SwitchReason;
import io.lettuce.core.failover.health.HealthStatusChangeEvent;
import io.lettuce.core.failover.health.HealthStatusManager;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.resource.ClientResources;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.Closeable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Experimental
class StatefulRedisMultiDbConnectionImpl<C extends StatefulRedisConnection<K, V>, K, V>
implements StatefulRedisMultiDbConnection<K, V> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(StatefulRedisMultiDbConnectionImpl.class);
    protected final Map<RedisURI, RedisDatabaseImpl<C>> databases;
    protected final HealthStatusManager healthStatusManager;
    protected volatile RedisDatabaseImpl<C> current;
    protected final RedisCommands<K, V> sync;
    protected final RedisAsyncCommandsImpl<K, V> async;
    protected final RedisReactiveCommandsImpl<K, V> reactive;
    protected final RedisCodec<K, V> codec;
    protected final Set<PushListener> pushListeners = ConcurrentHashMap.newKeySet();
    protected final Set<RedisConnectionStateListener> connectionStateListeners = ConcurrentHashMap.newKeySet();
    protected final DatabaseFactory<C, K, V> connectionFactory;
    private final ReadWriteLock multiDbLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.multiDbLock.readLock();
    private final Lock writeLock = this.multiDbLock.writeLock();
    private final ClientResources clientResources;
    private final MultiDbOptions multiDbOptions;
    private final RedisDatabaseDeferredCompletion<C> completion;
    private final Set<Consumer<Closeable>> onCloseListeners = ConcurrentHashMap.newKeySet();
    private final ScheduledFuture<?> failbackTask;
    private final FailoverRetryState failoverRetryState = new FailoverRetryState();
    private static final int MAX_FAILOVER_RECURSION = 10;

    public StatefulRedisMultiDbConnectionImpl(RedisDatabaseImpl<C> initialDatabase, Map<RedisURI, RedisDatabaseImpl<C>> connections, ClientResources resources, RedisCodec<K, V> codec, DatabaseFactory<C, K, V> connectionFactory, HealthStatusManager healthStatusManager, RedisDatabaseDeferredCompletion<C> completion) {
        this(initialDatabase, connections, resources, codec, connectionFactory, healthStatusManager, completion, MultiDbOptions.create());
    }

    public StatefulRedisMultiDbConnectionImpl(RedisDatabaseImpl<C> initialDatabase, Map<RedisURI, RedisDatabaseImpl<C>> connections, ClientResources resources, RedisCodec<K, V> codec, DatabaseFactory<C, K, V> connectionFactory, HealthStatusManager healthStatusManager, RedisDatabaseDeferredCompletion<C> completion, MultiDbOptions multiDbOptions) {
        if (connections == null || connections.isEmpty()) {
            throw new IllegalArgumentException("connections must not be empty");
        }
        LettuceAssert.notNull((Object)healthStatusManager, "healthStatusManager must not be null");
        LettuceAssert.notNull((Object)multiDbOptions, "multiDbOptions must not be null");
        this.databases = new ConcurrentHashMap<RedisURI, RedisDatabaseImpl<C>>(connections);
        this.clientResources = resources;
        this.codec = codec;
        this.connectionFactory = connectionFactory;
        this.healthStatusManager = healthStatusManager;
        this.multiDbOptions = multiDbOptions;
        this.current = initialDatabase == null ? this.getNextHealthyDatabase(null) : initialDatabase;
        LettuceAssert.notNull(this.current, "InitialDatabase must not be null");
        this.databases.values().forEach(db -> db.getCircuitBreaker().addListener(this::onCircuitBreakerStateChange));
        this.databases.values().forEach(db -> healthStatusManager.registerListener(db.getRedisURI(), this::onHealthStatusChange));
        RedisDatabaseImpl<C> instance = this.current;
        if (!instance.isHealthy()) {
            this.failoverFrom(instance, SwitchReason.FORCED);
            if (!this.current.isHealthy()) {
                this.databases.values().forEach(db -> db.getCircuitBreaker().removeListener(this::onCircuitBreakerStateChange));
                this.databases.values().forEach(db -> healthStatusManager.unregisterListener(db.getRedisURI(), this::onHealthStatusChange));
                throw new IllegalStateException("No healthy database available");
            }
        }
        this.async = this.newRedisAsyncCommandsImpl();
        this.sync = this.newRedisSyncCommandsImpl();
        this.reactive = this.newRedisReactiveCommandsImpl();
        this.completion = completion;
        if (completion != null) {
            completion.whenComplete(this::onDatabaseCompletion);
        }
        if (multiDbOptions.isFailbackSupported()) {
            Duration failbackInterval = multiDbOptions.getFailbackCheckInterval();
            this.failbackTask = resources.eventExecutorGroup().scheduleAtFixedRate(this::periodicFailbackCheck, failbackInterval.toMillis(), failbackInterval.toMillis(), TimeUnit.MILLISECONDS);
        } else {
            this.failbackTask = null;
        }
    }

    private void onDatabaseCompletion(RedisDatabaseImpl<C> db, Throwable e) {
        if (db != null) {
            this.doByExclusiveLock(() -> this.databases.putIfAbsent(db.getRedisURI(), db));
            logger.info("Async database connection completed successfully for {}", (Object)db.getRedisURI());
        } else if (e != null) {
            logger.error("Async database connection failed: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    private void onCircuitBreakerStateChange(CircuitBreakerStateChangeEvent event) {
        RedisDatabaseImpl database;
        if (logger.isInfoEnabled()) {
            logger.info("Circuit breaker id {} status changed from {} to {}", new Object[]{event.getCircuitBreaker().getId(), event.getPreviousState(), event.getNewState()});
        }
        if (logger.isDebugEnabled() && (database = (RedisDatabaseImpl)this.databases.values().stream().filter(db -> db.getCircuitBreaker() == event.getCircuitBreaker()).findAny().orElse(null)) != null) {
            logger.debug("Circuit breaker {} running for {} changed state from {} to {}\nCurrent database at the moment is {}", new Object[]{event.getCircuitBreaker().getId(), database.getId(), event.getPreviousState(), event.getNewState(), this.current.getId()});
        }
        RedisDatabaseImpl<C> fromDb = this.current;
        if (!event.getNewState().isClosed() && event.getCircuitBreaker() == fromDb.getCircuitBreaker()) {
            if (logger.isInfoEnabled()) {
                logger.info("Circuit breaker {} running for {} changed state from {} to {}", new Object[]{event.getCircuitBreaker().getId(), fromDb.getId(), event.getPreviousState(), event.getNewState()});
            }
            fromDb.startGracePeriod(this.multiDbOptions.getGracePeriod());
            this.failoverFrom(fromDb, SwitchReason.CIRCUIT_BREAKER);
        }
    }

    private void onHealthStatusChange(HealthStatusChangeEvent event) {
        RedisDatabaseImpl<C> database;
        if (logger.isInfoEnabled()) {
            logger.info("Health status changed for {} from {} to {}", new Object[]{event.getEndpoint(), event.getOldStatus(), event.getNewStatus()});
        }
        if ((database = this.databases.get(event.getEndpoint())) == null) {
            logger.warn("Health status changed for unknown database: {}", (Object)event.getEndpoint());
            return;
        }
        RedisDatabaseImpl<C> fromDb = this.current;
        if (!event.getNewStatus().isHealthy() && fromDb == database) {
            logger.info("Current database {} became unhealthy, initiating failover", (Object)database.getId());
            fromDb.startGracePeriod(this.multiDbOptions.getGracePeriod());
            this.failoverFrom(fromDb, SwitchReason.HEALTH_CHECK);
        }
    }

    private void failoverFrom(RedisDatabaseImpl<C> fromDb, SwitchReason reason) {
        this.failoverFromRecursive(fromDb, reason, 0);
    }

    private void failoverFromRecursive(RedisDatabaseImpl<C> fromDb, SwitchReason reason, int recursionAttempt) {
        if (10 <= recursionAttempt++) {
            logger.warn("Max failover attempts ({}) reached, staying on current database {}", (Object)10, (Object)this.current.getId());
            return;
        }
        RedisDatabaseImpl<C> selectedDatabase = this.getNextHealthyDatabase(fromDb);
        if (selectedDatabase != null) {
            if (logger.isInfoEnabled()) {
                logger.info("Initiating failover from {} to {} (attempt: {})", new Object[]{StatefulRedisMultiDbConnectionImpl.getDatabaseId(fromDb), selectedDatabase.getId(), recursionAttempt});
            }
            if (this.safeSwitch(selectedDatabase, true, reason)) {
                this.failoverRetryState.resetAttempts();
                if (logger.isInfoEnabled()) {
                    logger.info("Failover successful from {} to {}", (Object)StatefulRedisMultiDbConnectionImpl.getDatabaseId(fromDb), (Object)selectedDatabase.getId());
                }
                if (!selectedDatabase.isHealthy()) {
                    logger.warn("Database {} became unhealthy during failover, attempting cascading failover", (Object)selectedDatabase.getId());
                    this.failoverFromRecursive(selectedDatabase, reason, recursionAttempt);
                }
            } else {
                if (logger.isInfoEnabled()) {
                    logger.info("Failover attempt from {} to {} has failed, retrying...", (Object)StatefulRedisMultiDbConnectionImpl.getDatabaseId(fromDb), (Object)selectedDatabase.getId());
                }
                this.failoverFromRecursive(fromDb, reason, recursionAttempt);
            }
        } else {
            this.handleNoHealthyDatabaseFound(reason);
        }
    }

    private static String getDatabaseId(RedisDatabaseImpl<?> database) {
        return database == null ? "N/A" : database.getId();
    }

    private void handleNoHealthyDatabaseFound(SwitchReason reason) {
        this.failoverRetryState.tryScheduleRetry(() -> this.failoverFrom(null, reason), this.multiDbOptions.getDelayInBetweenFailoverAttempts(), (ScheduledExecutorService)this.clientResources.eventExecutorGroup());
    }

    private void publishAllDatabasesUnhealthyEvent(int failedAttempts) {
        this.clientResources.eventBus().publish(new AllDatabasesUnhealthyEvent(failedAttempts, this.databases.values().stream().map(db -> new ImmutableRedisURI(db.getRedisURI())).collect(Collectors.toList()), this));
    }

    private RedisDatabaseImpl<C> getNextHealthyDatabase(RedisDatabaseImpl<C> dbToExclude) {
        return this.databases.values().stream().filter(RedisDatabaseImpl::isHealthy).filter(DatabasePredicates.isNot(dbToExclude)).max(DatabaseComparators.byWeight).orElse(null);
    }

    private void periodicFailbackCheck() {
        try {
            RedisDatabaseImpl<C> currentDb = this.current;
            if (currentDb == null) {
                return;
            }
            RedisDatabaseImpl highestWeightedHealthy = this.databases.values().stream().filter(RedisDatabaseImpl::isHealthy).max(DatabaseComparators.byWeight).orElse(null);
            if (highestWeightedHealthy != null && highestWeightedHealthy != currentDb && highestWeightedHealthy.getWeight() > currentDb.getWeight()) {
                logger.info("Failback check: Found higher-priority healthy database {} (weight: {}) than current {} (weight: {})", new Object[]{highestWeightedHealthy.getId(), Float.valueOf(highestWeightedHealthy.getWeight()), currentDb.getId(), Float.valueOf(currentDb.getWeight())});
                this.failoverFrom(currentDb, SwitchReason.FAILBACK);
            }
        }
        catch (Exception e) {
            logger.error("Error during periodic failback check", (Throwable)e);
        }
    }

    @Override
    public RedisAsyncCommands<K, V> async() {
        return this.async;
    }

    protected RedisCommands<K, V> newRedisSyncCommandsImpl() {
        return (RedisCommands)this.syncHandler(this.async(), RedisCommands.class, RedisClusterCommands.class);
    }

    protected <T> T syncHandler(Object asyncApi, Class<?> ... interfaces) {
        MultiDbFutureSyncInvocationHandler h = new MultiDbFutureSyncInvocationHandler(this, asyncApi, interfaces);
        return (T)Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, (InvocationHandler)h);
    }

    protected RedisAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() {
        return new RedisAsyncCommandsImpl<K, V>(this, this.codec, () -> this.getOptions().getJsonParser().get());
    }

    @Override
    public RedisReactiveCommands<K, V> reactive() {
        return this.reactive;
    }

    protected RedisReactiveCommandsImpl<K, V> newRedisReactiveCommandsImpl() {
        return new RedisReactiveCommandsImpl<K, V>(this, this.codec, () -> this.getOptions().getJsonParser().get());
    }

    @Override
    public RedisCommands<K, V> sync() {
        return this.sync;
    }

    @Override
    public void addListener(RedisConnectionStateListener listener) {
        this.doBySharedLock(() -> {
            this.connectionStateListeners.add(listener);
            this.current.getConnection().addListener(listener);
        });
    }

    @Override
    public void removeListener(RedisConnectionStateListener listener) {
        this.doBySharedLock(() -> {
            this.connectionStateListeners.remove(listener);
            this.current.getConnection().removeListener(listener);
        });
    }

    @Override
    public void setTimeout(Duration timeout) {
        this.databases.values().forEach(db -> db.getConnection().setTimeout(timeout));
    }

    @Override
    public Duration getTimeout() {
        return this.current.getConnection().getTimeout();
    }

    @Override
    public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
        if (!this.current.isHealthy() && this.hasNoHealthyDb()) {
            command.completeExceptionally(new RedisNoHealthyDatabaseException("No healthy database available!"));
            return command;
        }
        return this.current.getConnection().dispatch(command);
    }

    @Override
    public Collection<RedisCommand<K, V, ?>> dispatch(Collection<? extends RedisCommand<K, V, ?>> commands) {
        if (!this.current.isHealthy() && this.hasNoHealthyDb()) {
            commands.forEach(c -> c.completeExceptionally(new RedisNoHealthyDatabaseException("No healthy database available!")));
            return commands;
        }
        return this.current.getConnection().dispatch(commands);
    }

    private boolean hasNoHealthyDb() {
        return this.databases.values().stream().noneMatch(RedisDatabaseImpl::isHealthy);
    }

    @Override
    public void close() {
        this.closeAsync().join();
    }

    protected void registerAsCloseable(Collection<Closeable> registry) {
        registry.add(this);
        this.onCloseListeners.add(resource -> registry.remove(resource));
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (this.failbackTask != null) {
            this.failbackTask.cancel(false);
        }
        Stream<CompletableFuture> asyncCloseStream = this.databases.values().stream().map(RedisDatabaseImpl::closeAsync);
        CompletableFuture<Void> closeAllFuture = CompletableFuture.allOf((CompletableFuture[])asyncCloseStream.toArray(CompletableFuture[]::new));
        CompletableFuture<Object> deferredsFuture = this.completion != null ? this.completion.closeAsync() : CompletableFuture.completedFuture(null);
        return ((CompletableFuture)((CompletableFuture)closeAllFuture.whenComplete((v, t) -> this.healthStatusManager.close())).thenCompose(v -> deferredsFuture)).whenComplete((v, t) -> this.onCloseListeners.forEach(c -> c.accept(this)));
    }

    @Override
    public boolean isOpen() {
        return this.current.getConnection().isOpen();
    }

    @Override
    public ClientOptions getOptions() {
        return this.current.getConnection().getOptions();
    }

    @Override
    public ClientResources getResources() {
        return this.clientResources;
    }

    @Override
    public void setAutoFlushCommands(boolean autoFlush) {
        this.databases.values().forEach(db -> db.getConnection().setAutoFlushCommands(autoFlush));
    }

    @Override
    public void flushCommands() {
        this.current.getConnection().flushCommands();
    }

    @Override
    public boolean isMulti() {
        return this.current.getConnection().isMulti();
    }

    @Override
    public void addListener(PushListener listener) {
        this.doBySharedLock(() -> {
            this.pushListeners.add(listener);
            this.current.getConnection().addListener(listener);
        });
    }

    @Override
    public void removeListener(PushListener listener) {
        this.doBySharedLock(() -> {
            this.pushListeners.remove(listener);
            this.current.getConnection().removeListener(listener);
        });
    }

    @Override
    public RedisCodec<K, V> getCodec() {
        return this.codec;
    }

    @Override
    public RedisURI getCurrentEndpoint() {
        return this.current.getRedisURI();
    }

    @Override
    public Iterable<RedisURI> getEndpoints() {
        return this.databases.keySet();
    }

    @Override
    public void switchTo(RedisURI redisURI) {
        RedisDatabaseImpl<C> target = this.databases.get(redisURI);
        if (target == null) {
            throw new IllegalArgumentException("Unknown endpoint: " + redisURI);
        }
        if (this.safeSwitch(target, false, SwitchReason.FORCED)) {
            if (!target.isHealthyIgnoreGracePeriod()) {
                this.failoverFrom(target, SwitchReason.FORCED);
                throw new IllegalStateException("Failed to switch to database " + target.getId() + " - target is unhealthy");
            }
        } else {
            throw new IllegalStateException("Failed to switch to database " + target.getId());
        }
    }

    boolean safeSwitch(RedisDatabaseImpl<?> database, boolean internalCall, SwitchReason reason) {
        if (database == null) {
            throw new IllegalArgumentException("Target database to switch to can not be null.");
        }
        logger.info("Initiated safe switching to database {}", (Object)database.getId());
        SwitchContext switchContext = new SwitchContext();
        this.doByExclusiveLock(() -> {
            RedisDatabaseImpl fromDb = this.current;
            RedisDatabaseImpl toDb = this.databases.get(database.getRedisURI());
            if (!this.verifySwitch(database, fromDb, toDb, internalCall)) {
                return;
            }
            switchContext.fromUri = fromDb.getRedisURI();
            switchContext.toUri = toDb.getRedisURI();
            switchContext.switched = true;
            if (fromDb == toDb) {
                return;
            }
            this.current = toDb;
            logger.info("Switched to database {}", (Object)toDb.getId());
            this.connectionStateListeners.forEach(listener -> {
                toDb.getConnection().addListener((RedisConnectionStateListener)listener);
                fromDb.getConnection().removeListener((RedisConnectionStateListener)listener);
                if (logger.isDebugEnabled()) {
                    logger.debug("Moved connection state listener {} from {} to {}", new Object[]{listener, fromDb.getId(), toDb.getId()});
                }
            });
            this.pushListeners.forEach(listener -> {
                toDb.getConnection().addListener((PushListener)listener);
                fromDb.getConnection().removeListener((PushListener)listener);
                if (logger.isDebugEnabled()) {
                    logger.debug("Moved push listener {} from {} to {}", new Object[]{listener, fromDb.getId(), toDb.getId()});
                }
            });
            fromDb.getDatabaseEndpoint().handOverCommandQueue(toDb.getDatabaseEndpoint());
            this.doOnSwitch(fromDb, toDb);
        });
        if (switchContext.switched && !switchContext.toUri.equals(switchContext.fromUri)) {
            this.publishSwitchEvent(reason, switchContext.fromUri, switchContext.toUri);
        }
        return switchContext.switched;
    }

    protected void doOnSwitch(RedisDatabaseImpl<C> fromDb, RedisDatabaseImpl<C> toDb) {
    }

    protected void publishSwitchEvent(SwitchReason reason, RedisURI fromUri, RedisURI toUri) {
        this.clientResources.eventBus().publish(new DatabaseSwitchEvent(reason, new ImmutableRedisURI(fromUri), new ImmutableRedisURI(toUri), this));
    }

    private boolean verifySwitch(RedisDatabaseImpl<?> target, RedisDatabaseImpl<C> fromDb, RedisDatabaseImpl<C> toDb, boolean internalCall) {
        boolean canSwitchTo;
        if (fromDb == null || toDb == null) {
            if (internalCall) {
                logger.info("Failed to switch to database {} - source or destination endpoint not found", (Object)target.getId());
                return false;
            }
            throw new UnsupportedOperationException("Unable to switch between endpoints - the driver was not able to locate the source or destination endpoint.");
        }
        if (target != toDb) {
            if (internalCall) {
                logger.error("Same URI with different database, this should never happen in the driver. Requested database: {}, found database: {} , endpoint: {}", new Object[]{target.getId(), toDb.getId(), toDb.getRedisURI()});
                return false;
            }
            throw new IllegalStateException("Same URI with different database, this should never happen in the driver. Requested database: " + target.getId() + ", found database: " + toDb.getId() + " , endpoint: " + toDb.getRedisURI());
        }
        boolean bl = canSwitchTo = internalCall ? toDb.isHealthy() : toDb.isHealthyIgnoreGracePeriod();
        if (!canSwitchTo) {
            if (internalCall) {
                logger.info("Requested database ({}) is unhealthy or circuit breaker is open. Skipping switch request.", (Object)toDb.getId());
                return false;
            }
            throw new IllegalStateException("Requested database (" + toDb.getId() + ") is unhealthy or circuit breaker is open. Skipping switch request.");
        }
        return true;
    }

    protected void doBySharedLock(Runnable operation) {
        this.readLock.lock();
        try {
            operation.run();
        }
        finally {
            this.readLock.unlock();
        }
    }

    protected void doByExclusiveLock(Runnable operation) {
        this.writeLock.lock();
        try {
            operation.run();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public RedisDatabaseImpl<C> getCurrentDatabase() {
        return this.current;
    }

    @Override
    public RedisDatabaseImpl<C> getDatabase(RedisURI redisURI) {
        RedisDatabaseImpl<C> database = this.databases.get(redisURI);
        if (database == null) {
            throw new IllegalArgumentException("Unknown endpoint: " + redisURI);
        }
        return database;
    }

    @Override
    public boolean isHealthy(RedisURI endpoint) {
        RedisDatabaseImpl<C> database = this.databases.get(endpoint);
        if (database == null) {
            throw new IllegalArgumentException("Unknown endpoint: " + endpoint);
        }
        return database.isHealthy();
    }

    @Override
    public void addDatabase(RedisURI redisURI, float weight) {
        this.addDatabase(DatabaseConfig.builder(redisURI).weight(weight).build());
    }

    @Override
    public void addDatabase(DatabaseConfig databaseConfig) {
        LettuceAssert.notNull((Object)databaseConfig, "DatabaseConfig must not be null");
        if (this.connectionFactory == null) {
            throw new UnsupportedOperationException("Adding databases dynamically is not supported. Connection was created without a DatabaseFactory.");
        }
        RedisURI redisURI = databaseConfig.getRedisURI();
        this.doByExclusiveLock(() -> {
            if (this.databases.containsKey(redisURI)) {
                throw new IllegalArgumentException("Database already exists: " + redisURI);
            }
            this.healthStatusManager.registerListener(redisURI, this::onHealthStatusChange);
            CompletableFuture<RedisDatabaseImpl<C>> databaseFuture = this.connectionFactory.createDatabaseAsync(databaseConfig, this.healthStatusManager);
            try {
                RedisDatabaseImpl<C> database = databaseFuture.get();
                this.databases.put(redisURI, database);
                database.getCircuitBreaker().addListener(this::onCircuitBreakerStateChange);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw RedisConnectionException.create(e);
            }
            catch (Exception e) {
                throw RedisConnectionException.create(Exceptions.unwrap(e));
            }
        });
    }

    @Override
    public void removeDatabase(RedisURI redisURI) {
        LettuceAssert.notNull((Object)redisURI, "RedisURI must not be null");
        this.doByExclusiveLock(() -> {
            RedisDatabaseImpl<C> database = null;
            database = this.databases.get(redisURI);
            if (database == null) {
                throw new IllegalArgumentException("Database not found: " + redisURI);
            }
            if (this.current.getRedisURI().equals(redisURI)) {
                throw new UnsupportedOperationException("Cannot remove the currently active database: " + this.current.getId());
            }
            this.healthStatusManager.unregisterListener(redisURI, this::onHealthStatusChange);
            this.healthStatusManager.remove(redisURI);
            this.databases.remove(redisURI);
            database.closeAsync().join();
        });
    }

    private class FailoverRetryState {
        private final AtomicInteger attempts = new AtomicInteger(0);
        private final AtomicReference<Runnable> scheduledTask = new AtomicReference();

        private FailoverRetryState() {
        }

        void tryScheduleRetry(Runnable retryAction, Duration delay, ScheduledExecutorService executor) {
            Runnable task = () -> {
                this.scheduledTask.set(null);
                retryAction.run();
            };
            if (this.scheduledTask.compareAndSet(null, task)) {
                try {
                    int attemptCount = this.attempts.incrementAndGet();
                    StatefulRedisMultiDbConnectionImpl.this.publishAllDatabasesUnhealthyEvent(attemptCount);
                    executor.schedule(task, delay.toMillis(), TimeUnit.MILLISECONDS);
                }
                catch (RuntimeException e) {
                    logger.error("Failed to schedule failover retry task", (Throwable)e);
                    this.scheduledTask.compareAndSet(task, null);
                }
            }
        }

        void resetAttempts() {
            this.attempts.set(0);
        }
    }

    static class SwitchContext {
        boolean switched;
        RedisURI fromUri;
        RedisURI toUri;

        SwitchContext() {
        }
    }

    static class DatabasePredicates {
        DatabasePredicates() {
        }

        public static Predicate<RedisDatabaseImpl<?>> isNot(RedisDatabaseImpl<?> dbInstance) {
            return db -> !db.equals(dbInstance);
        }
    }

    static class DatabaseComparators {
        public static final Comparator<RedisDatabaseImpl<?>> byWeight = Comparator.comparingDouble(RedisDatabaseImpl::getWeight);

        DatabaseComparators() {
        }
    }
}

