/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.integration.bus.local;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.context.NamedComponent;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.xd.dirt.integration.bus.AbstractBusPropertiesAccessor;
import org.springframework.xd.dirt.integration.bus.Binding;
import org.springframework.xd.dirt.integration.bus.MessageBusSupport;

public class LocalMessageBus
extends MessageBusSupport {
    private static final int DEFAULT_EXECUTOR_CORE_POOL_SIZE = 0;
    private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 200;
    private static final int DEFAULT_EXECUTOR_QUEUE_SIZE = Integer.MAX_VALUE;
    private static final int DEFAULT_EXECUTOR_KEEPALIVE_SECONDS = 60;
    private static final int DEFAULT_REQ_REPLY_CONCURRENCY = 1;
    protected static final Set<Object> CONSUMER_REQUEST_REPLY_PROPERTIES = new MessageBusSupport.SetBuilder().addAll(CONSUMER_STANDARD_PROPERTIES).add((Object)"concurrency").build();
    private volatile PollerMetadata poller;
    private final Map<String, ExecutorChannel> requestReplyChannels = new HashMap<String, ExecutorChannel>();
    private final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    private volatile int executorCorePoolSize = 0;
    private volatile int executorMaxPoolSize = 200;
    private volatile int executorQueueSize = Integer.MAX_VALUE;
    private volatile int executorKeepAliveSeconds = 60;
    private volatile int queueSize = Integer.MAX_VALUE;
    private final Map<String, ThreadPoolTaskExecutor> reqRepExecutors = new ConcurrentHashMap<String, ThreadPoolTaskExecutor>();
    private final MessageBusSupport.SharedChannelProvider<QueueChannel> queueChannelProvider = new MessageBusSupport.SharedChannelProvider<QueueChannel>(QueueChannel.class){

        protected QueueChannel createSharedChannel(String name) {
            QueueChannel queueChannel = new QueueChannel(LocalMessageBus.this.queueSize);
            return queueChannel;
        }
    };
    private final MessageBusSupport.SharedChannelProvider<PublishSubscribeChannel> pubsubChannelProvider = new MessageBusSupport.SharedChannelProvider<PublishSubscribeChannel>(PublishSubscribeChannel.class){

        protected PublishSubscribeChannel createSharedChannel(String name) {
            PublishSubscribeChannel publishSubscribeChannel = new PublishSubscribeChannel((Executor)LocalMessageBus.this.executor);
            publishSubscribeChannel.setIgnoreFailures(true);
            return publishSubscribeChannel;
        }
    };

    public void setPoller(PollerMetadata poller) {
        this.poller = poller;
    }

    public void setQueueSize(int queueSize) {
        this.queueSize = queueSize;
    }

    public void setExecutorCorePoolSize(int executorCorePoolSize) {
        this.executorCorePoolSize = executorCorePoolSize;
    }

    public void setExecutorMaxPoolSize(int executorMaxPoolSize) {
        this.executorMaxPoolSize = executorMaxPoolSize;
    }

    public void setExecutorQueueSize(int executorQueueSize) {
        this.executorQueueSize = executorQueueSize;
    }

    public void setExecutorKeepAliveSeconds(int executorKeepAliveSeconds) {
        this.executorKeepAliveSeconds = executorKeepAliveSeconds;
    }

    protected void onInit() {
        this.executor.setCorePoolSize(this.executorCorePoolSize);
        this.executor.setMaxPoolSize(this.executorMaxPoolSize);
        this.executor.setQueueCapacity(this.executorQueueSize);
        this.executor.setKeepAliveSeconds(this.executorKeepAliveSeconds);
        this.executor.setThreadNamePrefix("xd.localbus-");
        this.executor.initialize();
    }

    public MessageChannel bindDynamicProducer(String name, Properties properties) {
        return this.doBindDynamicProducer(name, "dynamic.output.to." + name, properties);
    }

    public MessageChannel bindDynamicPubSubProducer(String name, Properties properties) {
        return this.doBindDynamicPubSubProducer(name, "dynamic.output.to." + name, properties);
    }

    private MessageBusSupport.SharedChannelProvider<?> getChannelProvider(String name) {
        MessageBusSupport.SharedChannelProvider<QueueChannel> channelProvider = this.directChannelProvider;
        if (name.startsWith("queue:") || name.startsWith("job:")) {
            channelProvider = this.queueChannelProvider;
        }
        return channelProvider;
    }

    public void bindConsumer(String name, MessageChannel moduleInputChannel, Properties properties) {
        this.validateConsumerProperties(name, properties, CONSUMER_STANDARD_PROPERTIES);
        this.doRegisterConsumer(name, moduleInputChannel, this.getChannelProvider(name), properties);
    }

    public void bindPubSubConsumer(String name, MessageChannel moduleInputChannel, Properties properties) {
        this.validateConsumerProperties(name, properties, CONSUMER_STANDARD_PROPERTIES);
        this.doRegisterConsumer(name, moduleInputChannel, this.pubsubChannelProvider, properties);
    }

    private void doRegisterConsumer(String name, MessageChannel moduleInputChannel, MessageBusSupport.SharedChannelProvider<?> channelProvider, Properties properties) {
        Assert.hasText((String)name, (String)"a valid name is required to register an inbound channel");
        Assert.notNull((Object)moduleInputChannel, (String)"channel must not be null");
        MessageChannel registeredChannel = channelProvider.lookupOrCreateSharedChannel(name);
        this.bridge(name, registeredChannel, moduleInputChannel, "inbound." + ((NamedComponent)registeredChannel).getComponentName(), new LocalBusPropertiesAccessor(properties));
    }

    public void bindProducer(String name, MessageChannel moduleOutputChannel, Properties properties) {
        this.validateConsumerProperties(name, properties, PRODUCER_STANDARD_PROPERTIES);
        this.doRegisterProducer(name, moduleOutputChannel, this.getChannelProvider(name), properties);
    }

    public void bindPubSubProducer(String name, MessageChannel moduleOutputChannel, Properties properties) {
        this.validateConsumerProperties(name, properties, PRODUCER_STANDARD_PROPERTIES);
        this.doRegisterProducer(name, moduleOutputChannel, this.pubsubChannelProvider, properties);
    }

    private void doRegisterProducer(String name, MessageChannel moduleOutputChannel, MessageBusSupport.SharedChannelProvider<?> channelProvider, Properties properties) {
        Assert.hasText((String)name, (String)"a valid name is required to register an outbound channel");
        Assert.notNull((Object)moduleOutputChannel, (String)"channel must not be null");
        MessageChannel registeredChannel = channelProvider.lookupOrCreateSharedChannel(name);
        this.bridge(name, moduleOutputChannel, registeredChannel, "outbound." + ((NamedComponent)registeredChannel).getComponentName(), new LocalBusPropertiesAccessor(properties));
    }

    public void bindRequestor(String name, MessageChannel requests, final MessageChannel replies, Properties properties) {
        this.validateConsumerProperties(name, properties, CONSUMER_REQUEST_REPLY_PROPERTIES);
        ExecutorChannel requestChannel = this.findOrCreateRequestReplyChannel(name, "requestor.", properties);
        Assert.isInstanceOf(SubscribableChannel.class, (Object)requests);
        ((SubscribableChannel)requests).subscribe(new MessageHandler((MessageChannel)requestChannel){
            final /* synthetic */ MessageChannel val$requestChannel;
            {
                this.val$requestChannel = messageChannel;
            }

            public void handleMessage(Message<?> message) throws MessagingException {
                this.val$requestChannel.send(message);
            }
        });
        ExecutorChannel replyChannel = this.findOrCreateRequestReplyChannel(name, "replier.", properties);
        replyChannel.subscribe(new MessageHandler(){

            public void handleMessage(Message<?> message) throws MessagingException {
                replies.send(message);
            }
        });
    }

    public void bindReplier(String name, final MessageChannel requests, MessageChannel replies, Properties properties) {
        this.validateConsumerProperties(name, properties, CONSUMER_REQUEST_REPLY_PROPERTIES);
        ExecutorChannel requestChannel = this.findOrCreateRequestReplyChannel(name, "requestor.", properties);
        requestChannel.subscribe(new MessageHandler(){

            public void handleMessage(Message<?> message) throws MessagingException {
                requests.send(message);
            }
        });
        Assert.isInstanceOf(SubscribableChannel.class, (Object)replies);
        ExecutorChannel replyChannel = this.findOrCreateRequestReplyChannel(name, "replier.", properties);
        ((SubscribableChannel)replies).subscribe(new MessageHandler((SubscribableChannel)replyChannel){
            final /* synthetic */ SubscribableChannel val$replyChannel;
            {
                this.val$replyChannel = subscribableChannel;
            }

            public void handleMessage(Message<?> message) throws MessagingException {
                this.val$replyChannel.send(message);
            }
        });
    }

    private synchronized ExecutorChannel findOrCreateRequestReplyChannel(String name, String prefix, Properties properties) {
        String channelName = prefix + name;
        ExecutorChannel channel = this.requestReplyChannels.get(channelName);
        if (channel == null) {
            ThreadPoolTaskExecutor executor = this.createRequestReplyExecutor(name, properties);
            channel = new ExecutorChannel((Executor)executor);
            channel.setBeanFactory((BeanFactory)this.getBeanFactory());
            this.requestReplyChannels.put(channelName, channel);
            this.reqRepExecutors.put(name, executor);
        }
        return channel;
    }

    private ThreadPoolTaskExecutor createRequestReplyExecutor(String name, Properties properties) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(new LocalBusPropertiesAccessor(properties).getConcurrency(1));
        executor.setThreadNamePrefix("xd.localBus." + name + "-");
        executor.initialize();
        return executor;
    }

    public void unbindProducer(String name, MessageChannel channel) {
        ThreadPoolTaskExecutor executor;
        this.requestReplyChannels.remove("replier." + name);
        MessageChannel requestChannel = (MessageChannel)this.requestReplyChannels.remove("requestor." + name);
        if (requestChannel == null) {
            super.unbindProducer(name, channel);
        }
        if ((executor = this.reqRepExecutors.remove(name)) != null) {
            executor.shutdown();
        }
    }

    protected BridgeHandler bridge(String name, MessageChannel from, MessageChannel to, String bridgeName, LocalBusPropertiesAccessor properties) {
        return this.bridge(name, from, to, bridgeName, null, properties);
    }

    protected BridgeHandler bridge(String name, MessageChannel from, MessageChannel to, String bridgeName, Collection<MimeType> acceptedMimeTypes, LocalBusPropertiesAccessor properties) {
        boolean isInbound = bridgeName.startsWith("inbound.");
        BridgeHandler handler = new BridgeHandler(){

            protected boolean shouldCopyRequestHeaders() {
                return false;
            }

            protected Object handleRequestMessage(Message<?> requestMessage) {
                return requestMessage;
            }
        };
        handler.setBeanFactory((BeanFactory)this.getBeanFactory());
        handler.setOutputChannel(to);
        handler.setBeanName(bridgeName);
        handler.afterPropertiesSet();
        ConsumerEndpointFactoryBean cefb = new ConsumerEndpointFactoryBean();
        cefb.setBeanName(bridgeName);
        cefb.setInputChannel(from);
        cefb.setHandler((MessageHandler)handler);
        cefb.setBeanFactory((BeanFactory)this.getBeanFactory());
        if (from instanceof PollableChannel) {
            cefb.setPollerMetadata(this.poller);
        }
        try {
            cefb.afterPropertiesSet();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
        try {
            cefb.getObject().setComponentName(handler.getComponentName());
            Binding binding = isInbound ? Binding.forConsumer((String)name, (AbstractEndpoint)cefb.getObject(), (MessageChannel)to, (AbstractBusPropertiesAccessor)properties) : Binding.forProducer((String)name, (MessageChannel)from, (AbstractEndpoint)cefb.getObject(), (AbstractBusPropertiesAccessor)properties);
            this.addBinding(binding);
            binding.start();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
        return handler;
    }

    protected <T> T getBean(String name, Class<T> requiredType) {
        return (T)this.getApplicationContext().getBean(name, requiredType);
    }

    private static class LocalBusPropertiesAccessor
    extends AbstractBusPropertiesAccessor {
        public LocalBusPropertiesAccessor(Properties properties) {
            super(properties);
        }
    }
}

