/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.plugins.spark.streaming;

import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.plugins.spark.streaming.LocalMessageBusHolder;
import org.springframework.xd.dirt.plugins.spark.streaming.MessageBusConfiguration;
import org.springframework.xd.dirt.plugins.spark.streaming.SparkStreamingChannel;
import org.springframework.xd.spark.streaming.SparkMessageSender;

class MessageBusSender
extends SparkMessageSender {
    private static final Logger logger = LoggerFactory.getLogger(MessageBusSender.class);
    private final String outputChannelName;
    private final LocalMessageBusHolder messageBusHolder;
    private final Properties messageBusProperties;
    private final Properties moduleProducerProperties;
    private final Properties moduleProperties;
    private MessageBus messageBus;
    private ConfigurableApplicationContext applicationContext;
    private boolean running = false;
    private static final String OUTPUT = "output";
    private final MimeType contentType;
    private final SparkStreamingChannel outputChannel;
    private final String tapChannelName;
    private static final String ENABLE_TAP_PROP = "enableTap";

    public MessageBusSender(String outputChannelName, String tapChannelName, Properties messageBusProperties, Properties moduleProducerProperties, MimeType contentType, Properties moduleProperties) {
        this(null, outputChannelName, tapChannelName, messageBusProperties, moduleProducerProperties, contentType, moduleProperties);
    }

    public MessageBusSender(LocalMessageBusHolder messageBusHolder, String outputChannelName, String tapChannelName, Properties messageBusProperties, Properties moduleProducerProperties, MimeType contentType, Properties moduleProperties) {
        this.messageBusHolder = messageBusHolder;
        this.outputChannelName = outputChannelName;
        this.tapChannelName = tapChannelName;
        this.messageBusProperties = messageBusProperties;
        this.moduleProducerProperties = moduleProducerProperties;
        this.moduleProperties = moduleProperties;
        this.contentType = contentType;
        this.outputChannel = new SparkStreamingChannel();
    }

    public synchronized void start() {
        if (!this.isRunning()) {
            this.outputChannel.setBeanName(OUTPUT);
            logger.info("starting MessageBusSender");
            if (this.messageBus == null) {
                if (this.messageBusHolder != null) {
                    this.messageBus = this.messageBusHolder.get();
                } else {
                    this.applicationContext = MessageBusConfiguration.createApplicationContext(this.messageBusProperties);
                    this.messageBus = (MessageBus)this.applicationContext.getBean(MessageBus.class);
                }
                if (this.contentType != null) {
                    this.outputChannel.configureMessageConverter(this.contentType);
                }
                this.messageBus.bindProducer(this.outputChannelName, (MessageChannel)this.outputChannel, this.moduleProducerProperties);
                if (this.isTapEnabled()) {
                    this.addTapChannel();
                }
            }
            this.running = true;
        }
    }

    private boolean isTapEnabled() {
        boolean tapEnabled = false;
        String enableTap = this.moduleProperties.getProperty(ENABLE_TAP_PROP);
        if (enableTap != null) {
            tapEnabled = enableTap.equalsIgnoreCase(Boolean.TRUE.toString());
        }
        return tapEnabled;
    }

    private void addTapChannel() {
        Assert.notNull((Object)this.outputChannel, (String)"Output channel can not be null.");
        Assert.notNull((Object)this.messageBus, (String)"MessageBus can not be null.");
        logger.info("creating and binding tap channel for {}", (Object)this.tapChannelName);
        DirectChannel tapChannel = new DirectChannel();
        tapChannel.setBeanName(this.tapChannelName + ".tap.bridge");
        this.messageBus.bindPubSubProducer(this.tapChannelName, (MessageChannel)tapChannel, null);
        this.outputChannel.addInterceptor((ChannelInterceptor)new WireTap((MessageChannel)tapChannel));
    }

    public synchronized void send(Message message) {
        this.outputChannel.send(message);
    }

    public synchronized void stop() {
        if (this.isRunning() && this.messageBus != null) {
            logger.info("stopping MessageBusSender");
            this.messageBus.unbindProducer(this.outputChannelName, (MessageChannel)this.outputChannel);
            for (ChannelInterceptor interceptor : this.outputChannel.getChannelInterceptors()) {
                if (!(interceptor instanceof WireTap)) continue;
                ((WireTap)interceptor).stop();
            }
            if (this.isTapEnabled()) {
                this.messageBus.unbindProducers(this.tapChannelName);
            }
            this.messageBus = null;
        }
        if (this.applicationContext != null) {
            this.applicationContext.close();
            this.applicationContext = null;
        }
        this.running = false;
    }

    public synchronized boolean isRunning() {
        return this.running;
    }
}

