/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.plugins.spark.streaming;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FilenameUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.streaming.receiver.Receiver;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.EnumerablePropertySource;
import org.springframework.core.env.Environment;
import org.springframework.core.env.PropertySource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.util.Assert;
import org.springframework.util.SocketUtils;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.plugins.AbstractStreamPlugin;
import org.springframework.xd.dirt.plugins.spark.streaming.LocalMessageBusHolder;
import org.springframework.xd.dirt.plugins.spark.streaming.MessageBusReceiver;
import org.springframework.xd.dirt.plugins.spark.streaming.MessageBusSender;
import org.springframework.xd.dirt.plugins.stream.ModuleTypeConversionSupport;
import org.springframework.xd.module.ModuleType;
import org.springframework.xd.module.core.Module;
import org.springframework.xd.module.core.SimpleModule;
import org.springframework.xd.spark.streaming.SparkConfig;
import org.springframework.xd.spark.streaming.SparkMessageSender;
import org.springframework.xd.spark.streaming.SparkStreamingSupport;
import org.springframework.xd.spark.streaming.java.Processor;
import org.springframework.xd.spark.streaming.scala.ModuleExecutor;

public class SparkStreamingPlugin
extends AbstractStreamPlugin {
    private static final Logger logger = LoggerFactory.getLogger(SparkStreamingPlugin.class);
    private static final String REDIS_CONNECTION_PROPERTY_PREFIX = "spring.redis";
    private static final String RABBIT_CONNECTION_PROPERTY_PREFIX = "spring.rabbitmq";
    private static final String MESSAGE_BUS_PROPERTY_PREFIX = "xd.messagebus.";
    private PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
    private Map<Module, JavaStreamingContext> streamingContexts = new HashMap<Module, JavaStreamingContext>();

    @Autowired
    public SparkStreamingPlugin(MessageBus messageBus) {
        super(messageBus);
    }

    @Override
    public boolean supports(Module module) {
        String moduleExecutionFramework = module.getProperties().getProperty("moduleExecutionFramework");
        return "spark".equals(moduleExecutionFramework);
    }

    @Override
    public void postProcessModule(Module module) {
        SparkStreamingSupport processor;
        Properties sparkConfigs;
        ConfigurableApplicationContext moduleContext = module.getApplicationContext();
        ConfigurableEnvironment env = moduleContext.getEnvironment();
        String transport = env.getProperty("XD_TRANSPORT");
        Properties messageBusProperties = this.getMessageBusProperties(module);
        Properties inboundModuleProperties = this.extractConsumerProducerProperties(module)[0];
        Properties outboundModuleProperties = this.extractConsumerProducerProperties(module)[1];
        String defaultStorageLevel = env.getProperty("spark.storageLevel");
        StorageLevel configuredStorageLevel = StorageLevel.fromString((String)(StringUtils.hasText((String)defaultStorageLevel) ? defaultStorageLevel : "MEMORY_ONLY"));
        String storageLevelFromModule = module.getProperties().getProperty("storageLevel");
        StorageLevel storageLevel = StringUtils.hasText((String)storageLevelFromModule) ? StorageLevel.fromString((String)storageLevelFromModule) : configuredStorageLevel;
        MessageBusReceiver receiver = null;
        if (transport.equals("local")) {
            sparkConfigs = null;
            try {
                processor = (SparkStreamingSupport)module.getComponent(SparkStreamingSupport.class);
                Assert.notNull((Object)processor, (String)"Problem getting the spark streaming module. Is the module context active?");
                sparkConfigs = this.getSparkModuleProperties(processor);
            }
            catch (NoSuchBeanDefinitionException e) {
                throw new IllegalStateException("Either java or scala module should be present.");
            }
            String sparkMasterUrl = env.getProperty("spark.master");
            if (sparkConfigs != null && StringUtils.hasText((String)sparkConfigs.getProperty("spark.master"))) {
                sparkMasterUrl = sparkConfigs.getProperty("spark.master");
            }
            Assert.notNull((Object)sparkMasterUrl, (String)"Spark Master URL must be set.");
            if (!sparkMasterUrl.startsWith("local")) {
                throw new IllegalStateException("Spark cluster mode must be 'local' for 'local' XD transport.");
            }
            LocalMessageBusHolder messageBusHolder = new LocalMessageBusHolder();
            LocalMessageBusHolder.set((MessageBus)module.getComponent(MessageBus.class));
            receiver = new MessageBusReceiver(messageBusHolder, storageLevel, messageBusProperties, inboundModuleProperties, ModuleTypeConversionSupport.getInputMimeType(module));
            if (module.getType().equals((Object)ModuleType.processor)) {
                MessageBusSender sender = new MessageBusSender(messageBusHolder, this.getOutputChannelName(module), this.buildTapChannelName(module), messageBusProperties, outboundModuleProperties, ModuleTypeConversionSupport.getOutputMimeType(module), module.getProperties());
                ConfigurableListableBeanFactory beanFactory = module.getApplicationContext().getBeanFactory();
                beanFactory.registerSingleton("messageBusSender", (Object)sender);
            }
        } else {
            receiver = new MessageBusReceiver(storageLevel, messageBusProperties, inboundModuleProperties, ModuleTypeConversionSupport.getInputMimeType(module));
            if (module.getType().equals((Object)ModuleType.processor)) {
                ConfigurableListableBeanFactory beanFactory = module.getApplicationContext().getBeanFactory();
                MessageBusSender sender = new MessageBusSender(this.getOutputChannelName(module), this.buildTapChannelName(module), messageBusProperties, outboundModuleProperties, ModuleTypeConversionSupport.getOutputMimeType(module), module.getProperties());
                beanFactory.registerSingleton("messageBusSender", (Object)sender);
            }
        }
        this.registerMessageBusReceiver(receiver, module);
        try {
            processor = (SparkStreamingSupport)module.getComponent(SparkStreamingSupport.class);
            Assert.notNull((Object)processor, (String)"Problem getting the spark streaming module. Is the module context active?");
            sparkConfigs = this.getSparkModuleProperties(processor);
            this.startSparkStreamingContext(sparkConfigs, processor, module);
        }
        catch (NoSuchBeanDefinitionException e) {
            throw new IllegalStateException("Either java or scala module should be present.");
        }
    }

    @Override
    public void beforeShutdown(Module module) {
        super.beforeShutdown(module);
        logger.info("stopping SparkDriver");
        try {
            try {
                this.streamingContexts.get(module).stop(true, false);
            }
            catch (Exception e) {
                logger.warn("Error while stopping streaming context " + e);
            }
        }
        catch (Exception e) {
            logger.warn("Exception when stopping the spark module " + e);
        }
    }

    private Properties getMessageBusProperties(Module module) {
        ConfigurableEnvironment env = module.getApplicationContext().getEnvironment();
        Properties busProperties = new Properties();
        busProperties.put("XD_TRANSPORT", env.getProperty("XD_TRANSPORT"));
        for (PropertySource p : env.getPropertySources()) {
            if (!(p instanceof EnumerablePropertySource)) continue;
            for (String name : ((EnumerablePropertySource)p).getPropertyNames()) {
                if (!name.startsWith(REDIS_CONNECTION_PROPERTY_PREFIX) && !name.startsWith(RABBIT_CONNECTION_PROPERTY_PREFIX) && !name.startsWith(MESSAGE_BUS_PROPERTY_PREFIX)) continue;
                busProperties.put(name, env.getProperty(name));
            }
        }
        return busProperties;
    }

    private void registerMessageBusReceiver(MessageBusReceiver receiver, Module module) {
        receiver.setInputChannelName(this.getInputChannelName(module));
        ConfigurableListableBeanFactory beanFactory = module.getApplicationContext().getBeanFactory();
        beanFactory.registerSingleton("messageBusReceiver", (Object)receiver);
    }

    private Properties getSparkModuleProperties(SparkStreamingSupport processor) {
        Method[] methods;
        Properties sparkConfigs = new Properties();
        for (Method method : methods = processor.getClass().getDeclaredMethods()) {
            SparkConfig sparkConfig = method.getAnnotation(SparkConfig.class);
            if (sparkConfig == null) continue;
            try {
                if (method.getReturnType().equals(Properties.class)) {
                    sparkConfigs.putAll((Map<?, ?>)((Properties)method.invoke((Object)processor, new Object[0])));
                    continue;
                }
                logger.warn("@SparkConfig annotated method should return java.util.Properties type. Ignoring the method " + method.getName());
            }
            catch (InvocationTargetException ise) {
            }
            catch (IllegalAccessException ise) {
                // empty catch block
            }
        }
        return sparkConfigs;
    }

    private void startSparkStreamingContext(Properties sparkConfigs, final SparkStreamingSupport sparkStreamingSupport, final Module module) {
        final Receiver receiver = (Receiver)module.getComponent(Receiver.class);
        Environment env = this.getApplicationContext().getEnvironment();
        String masterURL = env.getProperty("spark.master", "spark://localhost:7077");
        SparkConf sparkConf = this.setupSparkConf(module, masterURL, sparkConfigs);
        String batchInterval = env.getProperty("batchInterval", env.getProperty("spark.streaming.batchInterval", "2000"));
        SparkStreamingListener streamingListener = new SparkStreamingListener();
        final SparkMessageSender sender = module.getType() == ModuleType.processor ? (SparkMessageSender)module.getComponent(SparkMessageSender.class) : null;
        final StreamingContext streamingContext = new StreamingContext(sparkConf, new Duration(Long.valueOf(batchInterval).longValue()));
        streamingContext.addStreamingListener((StreamingListener)streamingListener);
        Executors.newSingleThreadExecutor().execute(new Runnable(){

            @Override
            public void run() {
                try {
                    JavaStreamingContext javaStreamingContext = new JavaStreamingContext(streamingContext);
                    SparkStreamingPlugin.this.streamingContexts.put(module, javaStreamingContext);
                    JavaReceiverInputDStream javaInputDStream = javaStreamingContext.receiverStream(receiver);
                    if (sparkStreamingSupport instanceof Processor) {
                        new org.springframework.xd.spark.streaming.java.ModuleExecutor().execute(javaInputDStream, (Processor)sparkStreamingSupport, sender);
                    }
                    if (sparkStreamingSupport instanceof org.springframework.xd.spark.streaming.scala.Processor) {
                        ReceiverInputDStream receiverInput = javaInputDStream.receiverInputDStream();
                        new ModuleExecutor().execute(receiverInput, (org.springframework.xd.spark.streaming.scala.Processor)sparkStreamingSupport, sender);
                    }
                    javaStreamingContext.start();
                    javaStreamingContext.awaitTermination();
                }
                catch (Exception e) {
                    throw new IllegalStateException("Exception when running Spark Streaming application.", e);
                }
            }
        });
        try {
            boolean started = streamingListener.receiverStartLatch.await(30L, TimeUnit.SECONDS);
            if (!started) {
                logger.warn("Deployment timed out when deploying Spark Streaming module " + sparkStreamingSupport);
            }
            if (!streamingListener.receiverStartSuccess.get()) {
                throw new IllegalStateException("Failed to start Spark Streaming Receiver");
            }
        }
        catch (InterruptedException ie) {
            throw new RuntimeException(ie);
        }
    }

    private SparkConf setupSparkConf(Module module, String masterURL, Properties sparkConfigs) {
        SparkConf sparkConf = new SparkConf().set("spark.ui.port", String.valueOf(SocketUtils.findAvailableTcpPort())).set("spark.cores.max", "3").setMaster(masterURL).setAppName(module.getDescriptor().getGroup() + "-" + module.getDescriptor().getModuleLabel());
        if (sparkConfigs != null) {
            for (String property : sparkConfigs.stringPropertyNames()) {
                sparkConf.set(property, sparkConfigs.getProperty(property));
            }
        }
        ArrayList<String> sparkJars = new ArrayList<String>();
        try {
            String jarsFromConf = sparkConf.get("spark.jars");
            if (StringUtils.hasText((String)jarsFromConf)) {
                sparkJars.addAll(Arrays.asList(jarsFromConf.split("\\s*,\\s*")));
            }
        }
        catch (NoSuchElementException e) {
            // empty catch block
        }
        sparkJars.addAll(this.getApplicationJars(module));
        sparkConf.setJars(sparkJars.toArray(new String[sparkJars.size()]));
        return sparkConf;
    }

    private List<String> getApplicationJars(Module module) {
        URL[] urls;
        URLClassLoader classLoader = (URLClassLoader)((SimpleModule)module).getClassLoader();
        ArrayList<String> jars = new ArrayList<String>();
        for (URL url : classLoader.getURLs()) {
            String file = url.getFile().split("\\!", 2)[0];
            if (!file.endsWith(".jar")) continue;
            jars.add(file);
        }
        Environment env = this.getApplicationContext().getEnvironment();
        String jarsLocation = env.resolvePlaceholders("file:${XD_HOME}/lib/messagebus/${XD_TRANSPORT}/*.jar");
        try {
            Resource[] resources;
            for (Resource resource : resources = this.resolver.getResources(jarsLocation)) {
                URL url = resource.getURL();
                jars.add(url.getFile());
            }
        }
        catch (IOException ioe) {
            throw new RuntimeException(ioe);
        }
        URLClassLoader parentClassLoader = (URLClassLoader)classLoader.getParent();
        for (URL url : urls = parentClassLoader.getURLs()) {
            String file = FilenameUtils.getName((String)url.getFile());
            String fileToAdd = url.getFile().split("\\!", 2)[0];
            if (!file.endsWith(".jar") || !file.contains("spark") && !file.contains("spring-xd-") && !file.contains("spring-core") && !file.contains("spring-integration-core") && !file.contains("spring-beans") && !file.contains("spring-context") && !file.contains("spring-boot") && !file.contains("spring-aop") && !file.contains("spring-expression") && !file.contains("spring-messaging") && !file.contains("spring-retry") && !file.contains("spring-tx") && !file.contains("spring-data-commons") && !file.contains("spring-data-redis") && !file.contains("commons-pool") && !file.contains("jedis") && !file.contains("kryo") && !file.contains("gs-collections")) continue;
            jars.add(fileToAdd);
        }
        return jars;
    }

    private static class SparkStreamingListener
    implements StreamingListener {
        private final CountDownLatch receiverStartLatch = new CountDownLatch(1);
        private final AtomicBoolean receiverStartSuccess = new AtomicBoolean();

        private SparkStreamingListener() {
        }

        public void onReceiverStarted(StreamingListenerReceiverStarted started) {
            logger.info("Spark streaming receiver started " + started.receiverInfo());
            this.receiverStartSuccess.set(true);
            this.receiverStartLatch.countDown();
        }

        public void onReceiverError(StreamingListenerReceiverError receiverError) {
            logger.info("Error starting spark streaming receiver " + receiverError.receiverInfo());
            this.receiverStartSuccess.set(false);
            this.receiverStartLatch.countDown();
        }

        public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
            logger.info("Spark streaming receiver stopped " + receiverStopped.receiverInfo());
        }

        public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
        }

        public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
        }

        public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
        }
    }
}

