/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.redis.channel;

import java.util.concurrent.Executor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.integration.MessageDispatchingException;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.channel.ChannelUtils;
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
import org.springframework.integration.support.converter.SimpleMessageConverter;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

public class SubscribableRedisChannel
extends AbstractMessageChannel
implements BroadcastCapableChannel,
SmartLifecycle {
    private final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    private final RedisConnectionFactory connectionFactory;
    private final RedisTemplate redisTemplate;
    private final String topicName;
    private final BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(true);
    private Executor taskExecutor = new SimpleAsyncTaskExecutor();
    private RedisSerializer<?> serializer = new StringRedisSerializer();
    private MessageConverter messageConverter = new SimpleMessageConverter();
    private volatile Integer maxSubscribers;
    private volatile boolean initialized;

    public SubscribableRedisChannel(RedisConnectionFactory connectionFactory, String topicName) {
        Assert.notNull((Object)connectionFactory, (String)"'connectionFactory' must not be null");
        Assert.hasText((String)topicName, (String)"'topicName' must not be empty");
        this.connectionFactory = connectionFactory;
        this.redisTemplate = new StringRedisTemplate(connectionFactory);
        this.topicName = topicName;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        Assert.notNull((Object)taskExecutor, (String)"'taskExecutor' must not be null");
        this.taskExecutor = taskExecutor;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"'messageConverter' must not be null");
        this.messageConverter = messageConverter;
    }

    public void setSerializer(RedisSerializer<?> serializer) {
        Assert.notNull(serializer, (String)"'serializer' must not be null");
        this.serializer = serializer;
    }

    public void setMaxSubscribers(int maxSubscribers) {
        this.maxSubscribers = maxSubscribers;
        this.dispatcher.setMaxSubscribers(maxSubscribers);
    }

    public boolean subscribe(MessageHandler handler) {
        return this.dispatcher.addHandler(handler);
    }

    public boolean unsubscribe(MessageHandler handler) {
        return this.dispatcher.removeHandler(handler);
    }

    protected boolean doSend(Message<?> message, long arg1) {
        Object value = this.messageConverter.fromMessage(message, Object.class);
        this.redisTemplate.convertAndSend(this.topicName, value);
        return true;
    }

    public void onInit() {
        if (this.initialized) {
            return;
        }
        super.onInit();
        if (this.maxSubscribers == null) {
            this.setMaxSubscribers((Integer)this.getIntegrationProperty("spring.integration.channels.maxBroadcastSubscribers", Integer.class));
        }
        if (this.messageConverter == null) {
            this.messageConverter = new SimpleMessageConverter();
        }
        BeanFactory beanFactory = this.getBeanFactory();
        if (this.messageConverter instanceof BeanFactoryAware) {
            ((BeanFactoryAware)this.messageConverter).setBeanFactory(beanFactory);
        }
        this.container.setConnectionFactory(this.connectionFactory);
        if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) {
            ErrorHandler errorHandler = ChannelUtils.getErrorHandler((BeanFactory)beanFactory);
            this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler);
        }
        this.container.setTaskExecutor(this.taskExecutor);
        MessageListenerAdapter adapter = new MessageListenerAdapter((Object)new MessageListenerDelegate());
        adapter.setSerializer(this.serializer);
        adapter.afterPropertiesSet();
        this.container.addMessageListener((MessageListener)adapter, (Topic)new ChannelTopic(this.topicName));
        this.container.afterPropertiesSet();
        this.dispatcher.setBeanFactory(beanFactory);
        this.initialized = true;
    }

    public boolean isAutoStartup() {
        return this.container.isAutoStartup();
    }

    public int getPhase() {
        return this.container.getPhase();
    }

    public boolean isRunning() {
        return this.container.isRunning();
    }

    public void start() {
        this.container.start();
    }

    public void stop() {
        this.container.stop();
    }

    public void stop(Runnable callback) {
        this.container.stop(callback);
    }

    public void destroy() {
        try {
            this.container.destroy();
        }
        catch (Exception ex) {
            throw new IllegalStateException("Cannot destroy " + this.container, ex);
        }
    }

    private class MessageListenerDelegate {
        MessageListenerDelegate() {
        }

        public void handleMessage(Object payload) {
            Message siMessage = SubscribableRedisChannel.this.messageConverter.toMessage(payload, null);
            try {
                SubscribableRedisChannel.this.dispatcher.dispatch(siMessage);
            }
            catch (MessageDispatchingException e) {
                String exceptionMessage = e.getMessage();
                throw new MessageDeliveryException(siMessage, (exceptionMessage == null ? ((Object)((Object)e)).getClass().getSimpleName() : exceptionMessage) + " for redis-channel '" + (StringUtils.hasText((String)SubscribableRedisChannel.this.topicName) ? SubscribableRedisChannel.this.topicName : "unknown") + "' (" + SubscribableRedisChannel.this.getFullChannelName() + ").", (Throwable)e);
            }
        }
    }
}

