/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.masterslave;

import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.internal.AsyncCloseable;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceLists;
import io.lettuce.core.masterslave.SentinelTopologyRefreshConnections;
import io.lettuce.core.masterslave.Timeout;
import io.lettuce.core.protocol.LettuceCharsets;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Supplier;

class SentinelTopologyRefresh
implements AsyncCloseable,
Closeable {
    private static final InternalLogger LOG = InternalLoggerFactory.getInstance(SentinelTopologyRefresh.class);
    private static final StringCodec CODEC = new StringCodec(LettuceCharsets.ASCII);
    private static final Set<String> PROCESSING_CHANNELS = new HashSet<String>(Arrays.asList("failover-end", "failover-end-for-timeout"));
    private final Map<RedisURI, ConnectionFuture<StatefulRedisPubSubConnection<String, String>>> pubSubConnections = new ConcurrentHashMap<RedisURI, ConnectionFuture<StatefulRedisPubSubConnection<String, String>>>();
    private final RedisClient redisClient;
    private final List<RedisURI> sentinels;
    private final List<Runnable> refreshRunnables = new CopyOnWriteArrayList<Runnable>();
    private final RedisPubSubAdapter<String, String> adapter = new RedisPubSubAdapter<String, String>(){

        @Override
        public void message(String pattern, String channel, String message) {
            SentinelTopologyRefresh.this.processMessage(pattern, channel, message);
        }
    };
    private final PubSubMessageActionScheduler topologyRefresh;
    private final PubSubMessageActionScheduler sentinelReconnect;
    private final CompletableFuture<Void> closeFuture = new CompletableFuture();
    private volatile boolean closed = false;

    SentinelTopologyRefresh(RedisClient redisClient, String masterId, List<RedisURI> sentinels) {
        this.redisClient = redisClient;
        this.sentinels = LettuceLists.newList(sentinels);
        this.topologyRefresh = new PubSubMessageActionScheduler(redisClient.getResources().eventExecutorGroup(), new TopologyRefreshMessagePredicate(masterId));
        this.sentinelReconnect = new PubSubMessageActionScheduler(redisClient.getResources().eventExecutorGroup(), new SentinelReconnectMessagePredicate());
    }

    @Override
    public void close() {
        this.closeAsync().join();
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (this.closed) {
            return this.closeFuture;
        }
        this.closed = true;
        HashMap<RedisURI, ConnectionFuture<StatefulRedisPubSubConnection<String, String>>> connections = new HashMap<RedisURI, ConnectionFuture<StatefulRedisPubSubConnection<String, String>>>(this.pubSubConnections);
        ArrayList futures = new ArrayList();
        connections.forEach((k, f) -> {
            futures.add(f.exceptionally(t -> null).thenCompose(c -> {
                if (c == null) {
                    return CompletableFuture.completedFuture(null);
                }
                c.removeListener(this.adapter);
                return c.closeAsync();
            }).toCompletableFuture());
            this.pubSubConnections.remove(k);
        });
        Futures.allOf(futures).whenComplete((aVoid, throwable) -> {
            if (throwable != null) {
                this.closeFuture.completeExceptionally((Throwable)throwable);
            } else {
                this.closeFuture.complete(null);
            }
        });
        return this.closeFuture;
    }

    CompletionStage<Void> bind(Runnable runnable) {
        this.refreshRunnables.add(runnable);
        return this.initializeSentinels();
    }

    private CompletionStage<Void> initializeSentinels() {
        if (this.closed) {
            return this.closeFuture;
        }
        Duration timeout = this.getTimeout();
        List<ConnectionFuture<StatefulRedisPubSubConnection<String, String>>> connectionFutures = this.potentiallyConnectSentinels();
        if (connectionFutures.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.closed) {
            return this.closeAsync();
        }
        SentinelTopologyRefreshConnections collector = this.collectConnections(connectionFutures);
        CompletionStage completionStage = collector.getOrTimeout(timeout, (ScheduledExecutorService)this.redisClient.getResources().eventExecutorGroup());
        return completionStage.whenComplete((aVoid, throwable) -> {
            if (throwable != null) {
                this.closeAsync();
            }
        }).thenApply(noop -> null);
    }

    private List<ConnectionFuture<StatefulRedisPubSubConnection<String, String>>> potentiallyConnectSentinels() {
        ArrayList<ConnectionFuture<StatefulRedisPubSubConnection<String, String>>> connectionFutures = new ArrayList<ConnectionFuture<StatefulRedisPubSubConnection<String, String>>>();
        for (RedisURI sentinel : this.sentinels) {
            if (this.pubSubConnections.containsKey(sentinel)) continue;
            ConnectionFuture<StatefulRedisPubSubConnection<String, String>> future = this.redisClient.connectPubSubAsync(CODEC, sentinel);
            this.pubSubConnections.put(sentinel, future);
            future.whenComplete((connection, throwable) -> {
                if (throwable != null || this.closed) {
                    this.pubSubConnections.remove(sentinel);
                }
                if (this.closed) {
                    connection.closeAsync();
                }
            });
            connectionFutures.add(future);
        }
        return connectionFutures;
    }

    private SentinelTopologyRefreshConnections collectConnections(List<ConnectionFuture<StatefulRedisPubSubConnection<String, String>>> connectionFutures) {
        SentinelTopologyRefreshConnections collector = new SentinelTopologyRefreshConnections(connectionFutures.size());
        for (ConnectionFuture<StatefulRedisPubSubConnection<String, String>> connectionFuture : connectionFutures) {
            connectionFuture.thenCompose(connection -> {
                connection.addListener(this.adapter);
                return connection.async().psubscribe("*").thenApply(v -> connection).whenComplete((c, t) -> {
                    if (t != null) {
                        connection.closeAsync();
                    }
                });
            }).whenComplete((connection, throwable) -> {
                if (throwable != null) {
                    collector.accept((Throwable)throwable);
                } else {
                    collector.accept(connection);
                }
            });
        }
        return collector;
    }

    private Duration getTimeout() {
        for (RedisURI sentinel : this.sentinels) {
            if (this.pubSubConnections.containsKey(sentinel)) continue;
            return sentinel.getTimeout();
        }
        Iterator<RedisURI> iterator = this.sentinels.iterator();
        if (iterator.hasNext()) {
            RedisURI sentinel;
            sentinel = iterator.next();
            return sentinel.getTimeout();
        }
        return RedisURI.DEFAULT_TIMEOUT_DURATION;
    }

    private void processMessage(String pattern, String channel, String message) {
        this.topologyRefresh.processMessage(channel, message, () -> {
            LOG.debug("Received topology changed signal from Redis Sentinel ({}), scheduling topology update", (Object)channel);
            return () -> this.refreshRunnables.forEach(Runnable::run);
        });
        this.sentinelReconnect.processMessage(channel, message, () -> {
            LOG.debug("Received sentinel state changed signal from Redis Sentinel, scheduling sentinel reconnect attempts");
            return this::initializeSentinels;
        });
    }

    private static class SentinelReconnectMessagePredicate
    implements MessagePredicate {
        private SentinelReconnectMessagePredicate() {
        }

        @Override
        public boolean test(String channel, String message) {
            if (channel.equals("+sentinel")) {
                return true;
            }
            return (channel.equals("-odown") || channel.equals("-sdown")) && message.startsWith("sentinel ");
        }
    }

    private static class TopologyRefreshMessagePredicate
    implements MessagePredicate {
        private final String masterId;
        private Set<String> TOPOLOGY_CHANGE_CHANNELS = new HashSet<String>(Arrays.asList("+slave", "+sdown", "-sdown", "fix-slave-config", "+convert-to-slave", "+role-change"));

        TopologyRefreshMessagePredicate(String masterId) {
            this.masterId = masterId;
        }

        @Override
        public boolean test(String channel, String message) {
            if ((channel.equals("+elected-leader") || channel.equals("+reset-master")) && message.startsWith(String.format("master %s ", this.masterId))) {
                return true;
            }
            if (this.TOPOLOGY_CHANGE_CHANNELS.contains(channel) && message.contains(String.format("@ %s ", this.masterId))) {
                return true;
            }
            if (channel.equals("+switch-master") && message.startsWith(String.format("%s ", this.masterId))) {
                return true;
            }
            return PROCESSING_CHANNELS.contains(channel);
        }
    }

    static interface MessagePredicate
    extends BiPredicate<String, String> {
        @Override
        public boolean test(String var1, String var2);
    }

    static class TimedSemaphore {
        private final AtomicReference<Timeout> timeoutRef = new AtomicReference();
        private final int timeout = 5;
        private final TimeUnit timeUnit = TimeUnit.SECONDS;

        TimedSemaphore() {
        }

        protected void onEvent(Consumer<Timeout> timeoutConsumer) {
            Timeout existingTimeout = this.timeoutRef.get();
            if (existingTimeout != null && !existingTimeout.isExpired()) {
                return;
            }
            this.getClass();
            Timeout timeout = new Timeout(5L, this.timeUnit);
            boolean state = this.timeoutRef.compareAndSet(existingTimeout, timeout);
            if (state) {
                timeoutConsumer.accept(timeout);
            }
        }
    }

    private static class PubSubMessageActionScheduler {
        private final TimedSemaphore timedSemaphore = new TimedSemaphore();
        private final EventExecutorGroup eventExecutors;
        private final MessagePredicate filter;

        PubSubMessageActionScheduler(EventExecutorGroup eventExecutors, MessagePredicate filter) {
            this.eventExecutors = eventExecutors;
            this.filter = filter;
        }

        void processMessage(String channel, String message, Supplier<Runnable> runnableSupplier) {
            if (!this.processingAllowed(channel, message)) {
                return;
            }
            this.timedSemaphore.onEvent(timeout -> {
                Runnable runnable = (Runnable)runnableSupplier.get();
                if (timeout == null) {
                    this.eventExecutors.submit(runnable);
                } else {
                    this.eventExecutors.schedule(runnable, timeout.remaining(), TimeUnit.MILLISECONDS);
                }
            });
        }

        private boolean processingAllowed(String channel, String message) {
            if (this.eventExecutors.isShuttingDown()) {
                return false;
            }
            return this.filter.test(channel, message);
        }
    }
}

