/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.listener;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.ConnectionUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ErrorHandler;

public class RedisMessageListenerContainer
implements InitializingBean,
DisposableBean,
BeanNameAware,
SmartLifecycle {
    protected final Log logger = LogFactory.getLog(this.getClass());
    public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisMessageListenerContainer.class) + "-";
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000L;
    public static final long DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME = 2000L;
    private long initWait = TimeUnit.SECONDS.toMillis(5L);
    @Nullable
    private Executor subscriptionExecutor;
    @Nullable
    private Executor taskExecutor;
    @Nullable
    private RedisConnectionFactory connectionFactory;
    @Nullable
    private String beanName;
    @Nullable
    private ErrorHandler errorHandler;
    private final Object monitor = new Object();
    private volatile boolean running = false;
    private volatile boolean initialized = false;
    private volatile boolean listening = false;
    private volatile boolean manageExecutor = false;
    private final Map<ByteArrayWrapper, Collection<MessageListener>> patternMapping = new ConcurrentHashMap<ByteArrayWrapper, Collection<MessageListener>>();
    private final Map<ByteArrayWrapper, Collection<MessageListener>> channelMapping = new ConcurrentHashMap<ByteArrayWrapper, Collection<MessageListener>>();
    private final Map<MessageListener, Set<Topic>> listenerTopics = new ConcurrentHashMap<MessageListener, Set<Topic>>();
    private final SubscriptionTask subscriptionTask = new SubscriptionTask();
    private volatile RedisSerializer<String> serializer = RedisSerializer.string();
    private long recoveryInterval = 5000L;
    private long maxSubscriptionRegistrationWaitingTime = 2000L;

    public void afterPropertiesSet() {
        if (this.taskExecutor == null) {
            this.manageExecutor = true;
            this.taskExecutor = this.createDefaultTaskExecutor();
        }
        if (this.subscriptionExecutor == null) {
            this.subscriptionExecutor = this.taskExecutor;
        }
        this.initialized = true;
    }

    protected TaskExecutor createDefaultTaskExecutor() {
        String threadNamePrefix = this.beanName != null ? this.beanName + "-" : DEFAULT_THREAD_NAME_PREFIX;
        return new SimpleAsyncTaskExecutor(threadNamePrefix);
    }

    public void destroy() throws Exception {
        this.initialized = false;
        this.stop();
        if (this.manageExecutor && this.taskExecutor instanceof DisposableBean) {
            ((DisposableBean)this.taskExecutor).destroy();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"Stopped internally-managed task executor");
            }
        }
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    public boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        if (!this.running) {
            this.running = true;
            Object object = this.monitor;
            synchronized (object) {
                this.lazyListen();
                if (this.listening) {
                    try {
                        this.monitor.wait(this.initWait);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        this.running = false;
                        return;
                    }
                }
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"Started RedisMessageListenerContainer");
            }
        }
    }

    public void stop() {
        if (this.isRunning()) {
            this.running = false;
            this.subscriptionTask.cancel();
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)"Stopped RedisMessageListenerContainer");
        }
    }

    protected void processMessage(MessageListener listener, Message message, byte[] pattern) {
        this.executeListener(listener, message, pattern);
    }

    protected void executeListener(MessageListener listener, Message message, byte[] pattern) {
        try {
            listener.onMessage(message, pattern);
        }
        catch (Throwable ex) {
            this.handleListenerException(ex);
        }
    }

    public final boolean isActive() {
        return this.initialized;
    }

    protected void handleListenerException(Throwable ex) {
        if (this.isActive()) {
            this.invokeErrorHandler(ex);
        } else {
            this.logger.debug((Object)"Listener exception after container shutdown", ex);
        }
    }

    protected void invokeErrorHandler(Throwable ex) {
        if (this.errorHandler != null) {
            this.errorHandler.handleError(ex);
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn((Object)"Execution of message listener failed, and no ErrorHandler has been set.", ex);
        }
    }

    @Nullable
    public RedisConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
        Assert.notNull((Object)connectionFactory, (String)"ConnectionFactory must not be null!");
        this.connectionFactory = connectionFactory;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void setSubscriptionExecutor(Executor subscriptionExecutor) {
        this.subscriptionExecutor = subscriptionExecutor;
    }

    public void setTopicSerializer(RedisSerializer<String> serializer) {
        this.serializer = serializer;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setMessageListeners(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
        this.initMapping(listeners);
    }

    public void addMessageListener(MessageListener listener, Collection<? extends Topic> topics) {
        this.addListener(listener, topics);
        this.lazyListen();
    }

    public void addMessageListener(MessageListener listener, Topic topic) {
        this.addMessageListener(listener, Collections.singleton(topic));
    }

    public void removeMessageListener(MessageListener listener, Collection<? extends Topic> topics) {
        this.removeListener(listener, topics);
    }

    public void removeMessageListener(MessageListener listener, Topic topic) {
        this.removeMessageListener(listener, Collections.singleton(topic));
    }

    public void removeMessageListener(MessageListener listener) {
        this.removeMessageListener(listener, Collections.emptySet());
    }

    private void initMapping(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
        if (this.isRunning()) {
            this.subscriptionTask.cancel();
        }
        this.patternMapping.clear();
        this.channelMapping.clear();
        this.listenerTopics.clear();
        if (!CollectionUtils.isEmpty(listeners)) {
            for (Map.Entry<? extends MessageListener, Collection<? extends Topic>> entry : listeners.entrySet()) {
                this.addListener(entry.getKey(), entry.getValue());
            }
        }
        if (this.initialized) {
            this.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void lazyListen() {
        boolean debug = this.logger.isDebugEnabled();
        boolean started = false;
        if (this.isRunning() && !this.listening) {
            Object object = this.monitor;
            synchronized (object) {
                if (!(this.listening || this.channelMapping.size() <= 0 && this.patternMapping.size() <= 0)) {
                    this.subscriptionExecutor.execute((Runnable)((Object)this.subscriptionTask));
                    this.listening = true;
                    started = true;
                }
            }
            if (debug) {
                if (started) {
                    this.logger.debug((Object)"Started listening for Redis messages");
                } else {
                    this.logger.debug((Object)"Postpone listening for Redis messages until actual listeners are added");
                }
            }
        }
    }

    private void addListener(MessageListener listener, Collection<? extends Topic> topics) {
        Assert.notNull((Object)listener, (String)"a valid listener is required");
        Assert.notEmpty(topics, (String)"at least one topic is required");
        ArrayList<byte[]> channels = new ArrayList<byte[]>(topics.size());
        ArrayList<byte[]> patterns = new ArrayList<byte[]>(topics.size());
        boolean trace = this.logger.isTraceEnabled();
        Set<Topic> set = this.listenerTopics.get(listener);
        if (set == null) {
            set = new CopyOnWriteArraySet<Topic>();
            this.listenerTopics.put(listener, set);
        }
        set.addAll(topics);
        for (Topic topic : topics) {
            Collection<MessageListener> collection;
            ByteArrayWrapper holder = new ByteArrayWrapper(this.serializer.serialize(topic.getTopic()));
            if (topic instanceof ChannelTopic) {
                collection = this.channelMapping.get(holder);
                if (collection == null) {
                    collection = new CopyOnWriteArraySet<MessageListener>();
                    this.channelMapping.put(holder, collection);
                }
                collection.add(listener);
                channels.add(holder.getArray());
                if (!trace) continue;
                this.logger.trace((Object)("Adding listener '" + listener + "' on channel '" + topic.getTopic() + "'"));
                continue;
            }
            if (topic instanceof PatternTopic) {
                collection = this.patternMapping.get(holder);
                if (collection == null) {
                    collection = new CopyOnWriteArraySet<MessageListener>();
                    this.patternMapping.put(holder, collection);
                }
                collection.add(listener);
                patterns.add(holder.getArray());
                if (!trace) continue;
                this.logger.trace((Object)("Adding listener '" + listener + "' for pattern '" + topic.getTopic() + "'"));
                continue;
            }
            throw new IllegalArgumentException("Unknown topic type '" + topic.getClass() + "'");
        }
        if (this.listening) {
            this.subscriptionTask.subscribeChannel((byte[][])channels.toArray((T[])new byte[channels.size()][]));
            this.subscriptionTask.subscribePattern((byte[][])patterns.toArray((T[])new byte[patterns.size()][]));
        }
    }

    private void removeListener(MessageListener listener, Collection<? extends Topic> topics) {
        boolean trace = this.logger.isTraceEnabled();
        if (listener == null && CollectionUtils.isEmpty(topics)) {
            this.subscriptionTask.cancel();
            return;
        }
        ArrayList<byte[]> channelsToRemove = new ArrayList<byte[]>();
        ArrayList<byte[]> patternsToRemove = new ArrayList<byte[]>();
        if (CollectionUtils.isEmpty(topics)) {
            Set<Topic> set = this.listenerTopics.remove(listener);
            if (set == null) {
                return;
            }
            topics = set;
        }
        for (Topic topic : topics) {
            String msg;
            ByteArrayWrapper holder = new ByteArrayWrapper(this.serializer.serialize(topic.getTopic()));
            if (topic instanceof ChannelTopic) {
                this.remove(listener, topic, holder, this.channelMapping, channelsToRemove);
                if (!trace) continue;
                msg = listener != null ? "listener '" + listener + "'" : "all listeners";
                this.logger.trace((Object)("Removing " + msg + " from channel '" + topic.getTopic() + "'"));
                continue;
            }
            if (!(topic instanceof PatternTopic)) continue;
            this.remove(listener, topic, holder, this.patternMapping, patternsToRemove);
            if (!trace) continue;
            msg = listener != null ? "listener '" + listener + "'" : "all listeners";
            this.logger.trace((Object)("Removing " + msg + " from pattern '" + topic.getTopic() + "'"));
        }
        if (this.listenerTopics.isEmpty()) {
            this.subscriptionTask.cancel();
        } else if (this.listening) {
            this.subscriptionTask.unsubscribeChannel((byte[][])channelsToRemove.toArray((T[])new byte[channelsToRemove.size()][]));
            this.subscriptionTask.unsubscribePattern((byte[][])patternsToRemove.toArray((T[])new byte[patternsToRemove.size()][]));
        }
    }

    private void remove(MessageListener listener, Topic topic, ByteArrayWrapper holder, Map<ByteArrayWrapper, Collection<MessageListener>> mapping, List<byte[]> topicToRemove) {
        Collection<MessageListener> listeners = mapping.get(holder);
        Collection<MessageListener> listenersToRemove = null;
        if (listeners != null) {
            if (listener != null) {
                listeners.remove(listener);
                listenersToRemove = Collections.singletonList(listener);
            } else {
                listenersToRemove = listeners;
            }
            for (MessageListener messageListener : listenersToRemove) {
                Set<Topic> topics = this.listenerTopics.get(messageListener);
                if (topics != null) {
                    topics.remove(topic);
                }
                if (!CollectionUtils.isEmpty(topics)) continue;
                this.listenerTopics.remove(messageListener);
            }
            if (listener == null || listeners.isEmpty()) {
                mapping.remove(holder);
                topicToRemove.add(holder.getArray());
            }
        }
    }

    protected void handleSubscriptionException(Throwable ex) {
        this.listening = false;
        this.subscriptionTask.closeConnection();
        if (ex instanceof RedisConnectionFailureException) {
            if (this.isRunning()) {
                this.logger.error((Object)("Connection failure occurred. Restarting subscription task after " + this.recoveryInterval + " ms"));
                this.sleepBeforeRecoveryAttempt();
                this.lazyListen();
            }
        } else {
            this.logger.error((Object)"SubscriptionTask aborted with exception:", ex);
        }
    }

    protected void sleepBeforeRecoveryAttempt() {
        if (this.recoveryInterval > 0L) {
            try {
                Thread.sleep(this.recoveryInterval);
            }
            catch (InterruptedException interEx) {
                this.logger.debug((Object)"Thread interrupted while sleeping the recovery interval");
                Thread.currentThread().interrupt();
            }
        }
    }

    private void dispatchMessage(Collection<MessageListener> listeners, Message message, byte[] pattern) {
        byte[] source = pattern != null ? (byte[])pattern.clone() : message.getChannel();
        for (MessageListener messageListener : listeners) {
            this.taskExecutor.execute(() -> this.processMessage(messageListener, message, source));
        }
    }

    public void setRecoveryInterval(long recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
    }

    public long getMaxSubscriptionRegistrationWaitingTime() {
        return this.maxSubscriptionRegistrationWaitingTime;
    }

    public void setMaxSubscriptionRegistrationWaitingTime(long maxSubscriptionRegistrationWaitingTime) {
        this.maxSubscriptionRegistrationWaitingTime = maxSubscriptionRegistrationWaitingTime;
    }

    private static interface Condition {
        public boolean passes();
    }

    private static abstract class SpinBarrier {
        private SpinBarrier() {
        }

        static boolean waitFor(Condition condition, long timeout) {
            long startTime = System.currentTimeMillis();
            try {
                while (!SpinBarrier.timedOut(startTime, timeout)) {
                    if (condition.passes()) {
                        return true;
                    }
                    Thread.sleep(100L);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return false;
        }

        private static boolean timedOut(long startTime, long timeout) {
            return startTime + timeout < System.currentTimeMillis();
        }
    }

    private class DispatchMessageListener
    implements MessageListener {
        private DispatchMessageListener() {
        }

        @Override
        public void onMessage(Message message, @Nullable byte[] pattern) {
            Collection listeners = null;
            if (pattern != null && pattern.length > 0) {
                listeners = (Collection)RedisMessageListenerContainer.this.patternMapping.get(new ByteArrayWrapper(pattern));
            } else {
                pattern = null;
                listeners = (Collection)RedisMessageListenerContainer.this.channelMapping.get(new ByteArrayWrapper(message.getChannel()));
            }
            if (!CollectionUtils.isEmpty((Collection)listeners)) {
                RedisMessageListenerContainer.this.dispatchMessage(listeners, message, pattern);
            }
        }
    }

    private class SubscriptionTask
    implements SchedulingAwareRunnable {
        @Nullable
        private volatile RedisConnection connection;
        private boolean subscriptionTaskRunning = false;
        private final Object localMonitor = new Object();
        private long subscriptionWait = TimeUnit.SECONDS.toMillis(5L);

        private SubscriptionTask() {
        }

        public boolean isLongLived() {
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            block25: {
                Object object = this.localMonitor;
                synchronized (object) {
                    this.subscriptionTaskRunning = true;
                }
                try {
                    this.connection = RedisMessageListenerContainer.this.connectionFactory.getConnection();
                    if (this.connection.isSubscribed()) {
                        throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
                    }
                    boolean asyncConnection = ConnectionUtils.isAsync(RedisMessageListenerContainer.this.connectionFactory);
                    if (!asyncConnection) {
                        Object object2 = RedisMessageListenerContainer.this.monitor;
                        synchronized (object2) {
                            RedisMessageListenerContainer.this.monitor.notify();
                        }
                    }
                    SubscriptionPresentCondition subscriptionPresent = this.eventuallyPerformSubscription();
                    if (!asyncConnection) break block25;
                    SpinBarrier.waitFor(subscriptionPresent, RedisMessageListenerContainer.this.getMaxSubscriptionRegistrationWaitingTime());
                    Object object3 = RedisMessageListenerContainer.this.monitor;
                    synchronized (object3) {
                        RedisMessageListenerContainer.this.monitor.notify();
                    }
                }
                catch (Throwable t) {
                    RedisMessageListenerContainer.this.handleSubscriptionException(t);
                }
                finally {
                    Object asyncConnection = this.localMonitor;
                    synchronized (asyncConnection) {
                        this.subscriptionTaskRunning = false;
                        this.localMonitor.notify();
                    }
                }
            }
        }

        private SubscriptionPresentCondition eventuallyPerformSubscription() {
            SubscriptionPresentCondition condition = null;
            if (RedisMessageListenerContainer.this.channelMapping.isEmpty()) {
                condition = new PatternSubscriptionPresentCondition();
                this.connection.pSubscribe(new DispatchMessageListener(), this.unwrap(RedisMessageListenerContainer.this.patternMapping.keySet()));
            } else {
                if (RedisMessageListenerContainer.this.patternMapping.isEmpty()) {
                    condition = new SubscriptionPresentCondition();
                } else {
                    RedisMessageListenerContainer.this.subscriptionExecutor.execute((Runnable)((Object)new PatternSubscriptionTask()));
                    condition = new PatternSubscriptionPresentCondition();
                }
                this.connection.subscribe(new DispatchMessageListener(), this.unwrap(RedisMessageListenerContainer.this.channelMapping.keySet()));
            }
            return condition;
        }

        private byte[][] unwrap(Collection<ByteArrayWrapper> holders) {
            if (CollectionUtils.isEmpty(holders)) {
                return new byte[0][];
            }
            byte[][] unwrapped = new byte[holders.size()][];
            int index = 0;
            for (ByteArrayWrapper arrayHolder : holders) {
                unwrapped[index++] = arrayHolder.getArray();
            }
            return unwrapped;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void cancel() {
            Subscription sub;
            if (!RedisMessageListenerContainer.this.listening || this.connection == null) {
                return;
            }
            RedisMessageListenerContainer.this.listening = false;
            if (RedisMessageListenerContainer.this.logger.isTraceEnabled()) {
                RedisMessageListenerContainer.this.logger.trace((Object)"Cancelling Redis subscription...");
            }
            if ((sub = this.connection.getSubscription()) != null) {
                Object object = this.localMonitor;
                synchronized (object) {
                    if (RedisMessageListenerContainer.this.logger.isTraceEnabled()) {
                        RedisMessageListenerContainer.this.logger.trace((Object)"Unsubscribing from all channels");
                    }
                    try {
                        sub.close();
                    }
                    catch (Exception e) {
                        RedisMessageListenerContainer.this.logger.warn((Object)"Unable to unsubscribe from subscriptions", (Throwable)e);
                    }
                    if (this.subscriptionTaskRunning) {
                        try {
                            this.localMonitor.wait(this.subscriptionWait);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    if (!this.subscriptionTaskRunning) {
                        this.closeConnection();
                    } else {
                        RedisMessageListenerContainer.this.logger.warn((Object)"Unable to close connection. Subscription task still running");
                    }
                }
            }
        }

        void closeConnection() {
            if (this.connection != null) {
                RedisMessageListenerContainer.this.logger.trace((Object)"Closing connection");
                try {
                    this.connection.close();
                }
                catch (Exception e) {
                    RedisMessageListenerContainer.this.logger.warn((Object)"Error closing subscription connection", (Throwable)e);
                }
                this.connection = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void subscribeChannel(byte[] ... channels) {
            if (channels != null && channels.length > 0 && this.connection != null) {
                Object object = this.localMonitor;
                synchronized (object) {
                    Subscription sub = this.connection.getSubscription();
                    if (sub != null) {
                        sub.subscribe(channels);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void subscribePattern(byte[] ... patterns) {
            if (patterns != null && patterns.length > 0 && this.connection != null) {
                Object object = this.localMonitor;
                synchronized (object) {
                    Subscription sub = this.connection.getSubscription();
                    if (sub != null) {
                        sub.pSubscribe(patterns);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void unsubscribeChannel(byte[] ... channels) {
            if (channels != null && channels.length > 0 && this.connection != null) {
                Object object = this.localMonitor;
                synchronized (object) {
                    Subscription sub = this.connection.getSubscription();
                    if (sub != null) {
                        sub.unsubscribe(channels);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void unsubscribePattern(byte[] ... patterns) {
            if (patterns != null && patterns.length > 0 && this.connection != null) {
                Object object = this.localMonitor;
                synchronized (object) {
                    Subscription sub = this.connection.getSubscription();
                    if (sub != null) {
                        sub.pUnsubscribe(patterns);
                    }
                }
            }
        }

        private class PatternSubscriptionPresentCondition
        extends SubscriptionPresentCondition {
            private PatternSubscriptionPresentCondition() {
            }

            @Override
            public boolean passes() {
                return super.passes() && !CollectionUtils.isEmpty(SubscriptionTask.this.connection.getSubscription().getPatterns());
            }
        }

        private class SubscriptionPresentCondition
        implements Condition {
            private SubscriptionPresentCondition() {
            }

            @Override
            public boolean passes() {
                return SubscriptionTask.this.connection.isSubscribed();
            }
        }

        private class PatternSubscriptionTask
        implements SchedulingAwareRunnable {
            private long WAIT = 500L;
            private long ROUNDS = 3L;

            private PatternSubscriptionTask() {
            }

            public boolean isLongLived() {
                return false;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                boolean done = false;
                int i = 0;
                while ((long)i < this.ROUNDS && !done) {
                    if (SubscriptionTask.this.connection != null) {
                        Object object = SubscriptionTask.this.localMonitor;
                        synchronized (object) {
                            if (SubscriptionTask.this.connection.isSubscribed()) {
                                done = true;
                                SubscriptionTask.this.connection.getSubscription().pSubscribe(SubscriptionTask.this.unwrap(RedisMessageListenerContainer.this.patternMapping.keySet()));
                            } else {
                                try {
                                    Thread.sleep(this.WAIT);
                                }
                                catch (InterruptedException ex) {
                                    Thread.currentThread().interrupt();
                                    return;
                                }
                            }
                        }
                    }
                    ++i;
                }
            }
        }
    }
}

