/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.plugins.spark.streaming;

import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.storage.StreamBlockId;
import org.apache.spark.streaming.receiver.BlockGenerator;
import org.apache.spark.streaming.receiver.BlockGeneratorListener;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.MimeType;
import org.springframework.xd.dirt.integration.bus.BusUtils;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.integration.bus.MessageBusSupport;
import org.springframework.xd.dirt.plugins.spark.streaming.LocalMessageBusHolder;
import org.springframework.xd.dirt.plugins.spark.streaming.MessageBusConfiguration;
import org.springframework.xd.dirt.plugins.spark.streaming.SparkStreamingChannel;
import scala.collection.mutable.ArrayBuffer;

class MessageBusReceiver
extends Receiver {
    private static final long serialVersionUID = 1L;
    private static final Logger logger = LoggerFactory.getLogger(MessageBusReceiver.class);
    private MessageBus messageBus;
    private ConfigurableApplicationContext applicationContext;
    private String channelName;
    private final LocalMessageBusHolder messageBusHolder;
    private final Properties messageBusProperties;
    private final Properties moduleConsumerProperties;
    private final MimeType contentType;
    private BlockGenerator blockGenerator = null;
    private LinkedBlockingQueue<MessageHeaders> headersList = new LinkedBlockingQueue();

    public MessageBusReceiver(StorageLevel storageLevel, Properties messageBusProperties, Properties moduleConsumerProperties, MimeType contentType) {
        this(null, storageLevel, messageBusProperties, moduleConsumerProperties, contentType);
    }

    public MessageBusReceiver(LocalMessageBusHolder messageBusHolder, StorageLevel storageLevel, Properties messageBusProperties, Properties moduleConsumerProperties, MimeType contentType) {
        super(storageLevel);
        this.messageBusHolder = messageBusHolder;
        this.messageBusProperties = messageBusProperties;
        this.moduleConsumerProperties = moduleConsumerProperties;
        this.contentType = contentType;
    }

    public void setInputChannelName(String channelName) {
        this.channelName = channelName;
    }

    public void onStart() {
        logger.info("starting MessageBusReceiver");
        MessageStoringChannel messageStoringChannel = new MessageStoringChannel();
        this.blockGenerator = new BlockGenerator((BlockGeneratorListener)new GeneratedBlockHandler(), 0, new SparkConf());
        this.blockGenerator.start();
        if (this.contentType != null) {
            messageStoringChannel.configureMessageConverter(this.contentType);
        }
        if (this.messageBusHolder != null) {
            this.messageBus = this.messageBusHolder.get();
        } else {
            this.applicationContext = MessageBusConfiguration.createApplicationContext(this.messageBusProperties);
            this.messageBus = (MessageBus)this.applicationContext.getBean(MessageBus.class);
        }
        if (BusUtils.isChannelPubSub((String)this.channelName)) {
            this.messageBus.bindPubSubConsumer(this.channelName, (MessageChannel)messageStoringChannel, this.moduleConsumerProperties);
        } else {
            this.messageBus.bindConsumer(this.channelName, (MessageChannel)messageStoringChannel, this.moduleConsumerProperties);
        }
    }

    public void onStop() {
        logger.info("stopping MessageBusReceiver");
        this.blockGenerator.stop();
        if (this.messageBus != null) {
            this.messageBus.unbindConsumers(this.channelName);
        }
        if (this.applicationContext != null) {
            this.applicationContext.close();
        }
    }

    private class MessageStoringChannel
    extends SparkStreamingChannel {
        private static final long serialVersionUID = 1L;
        private static final String INPUT = "input";

        public MessageStoringChannel() {
            this.setBeanName(INPUT);
        }

        protected synchronized boolean doSend(Message<?> message, long timeout) {
            try {
                MessageBusReceiver.this.headersList.put(message.getHeaders());
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            MessageBusReceiver.this.blockGenerator.addDataWithCallback(message.getPayload(), (Object)message.getHeaders());
            return true;
        }
    }

    private class GeneratedBlockHandler
    implements BlockGeneratorListener {
        private GeneratedBlockHandler() {
        }

        public void onAddData(Object data, Object metadata) {
            logger.debug("Adding data to block generator buffer");
        }

        public void onError(String data, Throwable t) {
            MessageBusReceiver.this.reportError(data, t);
        }

        public void onPushBlock(StreamBlockId streamBlockId, ArrayBuffer<?> dataBuffer) {
            MessageBusReceiver.this.store(dataBuffer);
            LinkedList headersListToAck = new LinkedList();
            for (int i = 0; i < dataBuffer.size(); ++i) {
                try {
                    headersListToAck.add(MessageBusReceiver.this.headersList.take());
                    continue;
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
            ((MessageBusSupport)MessageBusReceiver.this.messageBus).doManualAck(headersListToAck);
        }

        public void onGenerateBlock(StreamBlockId blockId) {
            logger.debug("Generated block " + blockId);
        }
    }
}

