/*
 * Decompiled with CFR 0.152.
 */
package stream.runtime;

import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import stream.Data;
import stream.Process;
import stream.ProcessContext;
import stream.app.ComputeGraph;
import stream.container.IContainer;
import stream.data.DataFactory;
import stream.io.Queue;
import stream.io.Sink;
import stream.io.Source;
import stream.runtime.ApplicationContext;
import stream.runtime.ContainerContext;
import stream.runtime.ContainerController;
import stream.runtime.DefaultNamingService;
import stream.runtime.DependencyInjection;
import stream.runtime.ElementHandler;
import stream.runtime.LifeCycle;
import stream.runtime.ProcessContextImpl;
import stream.runtime.ProcessThread;
import stream.runtime.ServiceReference;
import stream.runtime.Supervisor;
import stream.runtime.rpc.RMINamingService;
import stream.runtime.setup.ObjectCreator;
import stream.runtime.setup.factory.ObjectFactory;
import stream.runtime.setup.factory.ProcessorFactory;
import stream.runtime.setup.handler.ContainerRefElementHandler;
import stream.runtime.setup.handler.DocumentHandler;
import stream.runtime.setup.handler.LibrariesElementHandler;
import stream.runtime.setup.handler.MonitorElementHandler;
import stream.runtime.setup.handler.ProcessElementHandler;
import stream.runtime.setup.handler.PropertiesHandler;
import stream.runtime.setup.handler.QueueElementHandler;
import stream.runtime.setup.handler.ServiceElementHandler;
import stream.runtime.setup.handler.SinkElementHandler;
import stream.runtime.setup.handler.StreamElementHandler;
import stream.runtime.setup.handler.SystemPropertiesHandler;
import stream.runtime.shutdown.AbstractShutdownCondition;
import stream.runtime.shutdown.LocalShutdownCondition;
import stream.runtime.shutdown.ServerShutdownCondition;
import stream.service.NamingService;
import stream.service.Service;
import stream.util.Variables;
import stream.util.XIncluder;
import stream.util.XMLUtils;

public class ProcessContainer
implements IContainer,
Runnable {
    static Logger log;
    static final List<ProcessContainer> container;
    private Boolean runShutdownHook = true;
    protected final ObjectFactory objectFactory = ObjectFactory.newInstance();
    protected final ProcessorFactory processorFactory = new ProcessorFactory(this.objectFactory);
    protected final ComputeGraph depGraph = new ComputeGraph();
    protected final DependencyInjection dependencyInjection = new DependencyInjection();
    protected String name = null;
    protected final ContainerContext context;
    protected final Map<String, Source> streams = new LinkedHashMap<String, Source>();
    protected final Map<String, Sink> sinks = new LinkedHashMap<String, Sink>();
    protected final Map<String, Queue> listeners = new LinkedHashMap<String, Queue>();
    protected final List<Process> processes = new ArrayList<Process>();
    protected final Map<Process, ProcessContext> processContexts = new LinkedHashMap<Process, ProcessContext>();
    protected final List<ProcessThread> worker = new ArrayList<ProcessThread>();
    protected final List<ServiceReference> serviceRefs = new ArrayList<ServiceReference>();
    protected final Map<String, ElementHandler> elementHandler = new HashMap<String, ElementHandler>();
    protected final List<DocumentHandler> documentHandler = new ArrayList<DocumentHandler>();
    protected NamingService namingService = null;
    protected final List<LifeCycle> lifeCyleObjects = new ArrayList<LifeCycle>();
    boolean server = true;
    protected long runtime;
    protected Long startTime = 0L;
    protected Variables containerVariables = new Variables();
    private Exception failFastReason = null;
    static final String[] extensions;
    static final Map<String, ElementHandler> autoHandlers;

    public static Document parseDocument(URL url) throws Exception {
        DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
        dbf.setNamespaceAware(true);
        DocumentBuilder db = dbf.newDocumentBuilder();
        Document doc = db.parse(url.openStream());
        return doc;
    }

    public ProcessContainer(URL url) throws Exception {
        this(url, null);
    }

    public ProcessContainer(URL url, Map<String, ElementHandler> customElementHandler) throws Exception {
        this(ProcessContainer.parseDocument(url), customElementHandler, null);
    }

    public ProcessContainer(URL url, Map<String, ElementHandler> customElementHandler, Map<String, String> vars) throws Exception {
        this(ProcessContainer.parseDocument(url), customElementHandler, vars);
    }

    public ProcessContainer(Document doc, Map<String, ElementHandler> customElementHandler, Map<String, String> variables) throws Exception {
        if (variables != null) {
            this.containerVariables.addVariables(variables);
        }
        container.add(this);
        LibrariesElementHandler libHandler = new LibrariesElementHandler(this.objectFactory);
        this.documentHandler.add(new PropertiesHandler());
        this.documentHandler.add(new SystemPropertiesHandler());
        this.documentHandler.add(libHandler);
        for (String elem : autoHandlers.keySet()) {
            log.debug("Adding auto-discovered handler for element '{}': {}", (Object)elem, (Object)autoHandlers.get(elem));
            this.elementHandler.put(elem, autoHandlers.get(elem));
        }
        this.elementHandler.put("Container-Ref", new ContainerRefElementHandler(this.objectFactory));
        this.elementHandler.put("Stream", new StreamElementHandler(this.objectFactory));
        this.elementHandler.put("Sink", new SinkElementHandler());
        this.elementHandler.put("Queue", new QueueElementHandler());
        this.elementHandler.put("Monitor", new MonitorElementHandler(this.objectFactory, this.processorFactory));
        this.elementHandler.put("Process", new ProcessElementHandler(this.objectFactory, this.processorFactory));
        this.elementHandler.put("Service", new ServiceElementHandler(this.objectFactory));
        this.elementHandler.put("Libs", libHandler);
        if (customElementHandler != null) {
            this.elementHandler.putAll(customElementHandler);
        }
        XIncluder includer = new XIncluder();
        Variables v = new Variables(this.containerVariables);
        doc = includer.perform(doc, v);
        log.debug(XMLUtils.toString((Document)doc));
        log.debug("XML created and preprocessed.");
        Element root = doc.getDocumentElement();
        Map<String, String> attr = this.objectFactory.getAttributes(root);
        String applicationId = null;
        if (root.hasAttribute("id")) {
            this.name = applicationId = root.getAttribute("id");
        } else {
            applicationId = UUID.randomUUID().toString();
        }
        if (System.getProperty("container.address") != null) {
            attr.put("address", System.getProperty("container.address"));
        }
        if (System.getProperty("container.port") != null) {
            attr.put("port", System.getProperty("container.port"));
        }
        try {
            this.server = new Boolean(attr.get("server"));
        }
        catch (Exception e) {
            this.server = true;
        }
        if (!root.getNodeName().equalsIgnoreCase("application") && !root.getNodeName().equalsIgnoreCase("container")) {
            throw new Exception("Expecting root element to be 'container'!");
        }
        String host = "localhost";
        try {
            host = InetAddress.getLocalHost().getHostAddress();
            this.name = InetAddress.getLocalHost().getHostName();
            if (this.name.indexOf(".") > 0) {
                this.name = this.name.substring(0, this.name.indexOf("."));
            }
        }
        catch (Exception e) {
            this.name = UUID.randomUUID().toString();
        }
        log.debug("Default hostname is: {}", (Object)host);
        if (attr.containsKey("address") && !attr.get("address").trim().isEmpty()) {
            host = InetAddress.getByName(attr.get("address")).getHostAddress();
            log.debug("Container address will be {}", (Object)host);
        }
        Integer port = 0;
        if (attr.containsKey("port") && !attr.get("port").trim().isEmpty()) {
            port = new Integer(attr.get("port"));
            log.debug("Container port will be {}", (Object)port);
        }
        if (root.hasAttribute("id")) {
            this.name = root.getAttribute("id");
        }
        try {
            String nsClass = root.getAttribute("namingService");
            if (nsClass != null && !nsClass.trim().isEmpty()) {
                this.namingService = (NamingService)this.objectFactory.create(nsClass, attr, ObjectFactory.createConfigDocument(root));
            }
        }
        catch (Exception e) {
            log.error("Faild to instantiate naming service '{}': {}", (Object)root.getAttribute("namingService"), (Object)e.getMessage());
            throw new Exception("Faild to instantiate naming service '" + root.getAttribute("namingService") + "': " + e.getMessage());
        }
        if (this.namingService == null) {
            if (attr.containsKey("address")) {
                log.debug("Creating RMI naming-service...");
                System.setProperty("java.rmi.server.hostname", host);
                this.namingService = new RMINamingService(this.name, host, port, true);
            } else {
                log.debug("No address specified, using local naming-service. Container will not be able to reference other containers!");
                this.namingService = new DefaultNamingService();
            }
        }
        if (this.namingService instanceof LifeCycle) {
            this.lifeCyleObjects.add((LifeCycle)this.namingService);
        }
        log.debug("Using naming-service {}", (Object)this.namingService);
        this.context = new ContainerContext(applicationId + "@" + System.currentTimeMillis(), this.name, this.namingService);
        this.init(doc);
    }

    public ComputeGraph computeGraph() {
        return this.depGraph;
    }

    public Set<Source> getStreams() {
        return new LinkedHashSet<Source>(this.streams.values());
    }

    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public ContainerContext getContext() {
        return this.context;
    }

    public List<Process> getProcesses() {
        return this.processes;
    }

    public List<ServiceReference> getServiceRefs() {
        return this.serviceRefs;
    }

    public void register(LifeCycle lc) {
        if (!this.lifeCyleObjects.contains(lc)) {
            log.debug("Registering new life-cycle object {}", (Object)lc);
            this.lifeCyleObjects.add(lc);
        }
    }

    private void init(Document doc) throws Exception {
        String name;
        Element root = doc.getDocumentElement();
        if (root.getAttribute("import") != null) {
            String[] pkgs = root.getAttribute("import").split(",");
            for (String pkg : pkgs) {
                if (pkg.trim().isEmpty()) continue;
                this.objectFactory.addPackage(pkg.trim());
            }
        }
        if ((name = root.getAttribute("name")) == null) {
            name = "local";
        }
        for (DocumentHandler handle : this.documentHandler) {
            handle.handle(this, doc, this.containerVariables, this.dependencyInjection);
        }
        this.objectFactory.addVariables(this.context.getProperties());
        this.objectFactory.addVariables((Map)this.containerVariables);
        NodeList children = root.getChildNodes();
        if (this.context.getProperties().get("container.datafactory") != null) {
            log.debug("Using {} as default DataFactory for this container...", (Object)this.context.getProperties().get("container.datafactory"));
            Class<?> dataFactoryClass = Class.forName(this.context.getProperties().get("container.datafactory"));
            DataFactory.setDefaultDataFactory((DataFactory)((DataFactory)dataFactoryClass.newInstance()));
        }
        for (int i = 0; i < children.getLength(); ++i) {
            Node node = children.item(i);
            if (node.getNodeType() != 1) continue;
            Element element = (Element)node;
            for (ElementHandler handler : this.elementHandler.values()) {
                if (!handler.handlesElement(element)) continue;
                handler.handleElement(this, element, this.containerVariables, this.dependencyInjection);
            }
        }
        try {
            this.dependencyInjection.injectDependencies(this.depGraph, this.namingService);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
        this.context.setProperty("xml", XMLUtils.toString((Document)doc));
        this.drawGraph();
        log.info("ProcessContainer is initialized and ready to start:{}", (Object)this.toString());
    }

    private void drawGraph() {
        ComputeGraph g = this.computeGraph();
        log.info("######## Sources ########");
        for (Object o : g.getSources()) {
            if (!(o instanceof Source)) continue;
            log.info("########" + o.toString() + "########");
            for (Object t : g.getTargets(o)) {
                log.info("\t==> " + t.toString());
            }
        }
        log.info("######## RootSources ########");
        for (Object o : g.getRootSources()) {
            log.info(o.toString());
        }
        log.info("######## NonRefSinks ########");
        for (Object o : g.getNonRefQueues()) {
            log.info("########" + o.toString() + "########");
            for (Object t : g.getSourcesFor(o)) {
                log.info("\t==> " + t.toString());
            }
        }
    }

    public void registerQueue(String id, Queue queue, boolean externalListener) throws Exception {
        log.debug("A new queue '{}' is registered for id '{}'", (Object)queue, (Object)id);
        if (externalListener) {
            this.listeners.put(id, queue);
        }
        this.registerStream(id, (Source)queue);
        this.registerSink(id, (Sink)queue);
    }

    public void registerSink(String id, Sink sink) {
        this.sinks.put(id, sink);
    }

    public void registerStream(String id, Source stream) {
        this.streams.put(id, stream);
    }

    @Override
    public void run() {
        try {
            this.runtime = this.execute();
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public long execute() throws Exception {
        if (!container.contains(this)) {
            container.add(this);
        }
        this.startTime = System.currentTimeMillis();
        ContainerController controller = new ContainerController(this);
        log.debug("Registering container-controller {}", (Object)controller);
        this.namingService.register(".ctrl", (Service)controller);
        Map services = this.computeGraph().services();
        for (String name : services.keySet()) {
            log.info("Registering service '{}' => {}", (Object)name, services.get(name));
            this.namingService.register(name, (Service)services.get(name));
        }
        if (!this.server && this.streams.isEmpty() && this.listeners.isEmpty()) {
            throw new Exception("No data-stream defined!");
        }
        log.debug("Need to handle {} sources: {}", (Object)this.streams.size(), this.streams.keySet());
        log.debug("Experiment contains {} stream processes", (Object)this.processes.size());
        log.debug("Initializing all DataStreams...");
        if (this.streams.keySet().isEmpty()) {
            log.debug("No dataStreams to initialize");
        }
        for (String name : this.streams.keySet()) {
            Source stream = this.streams.get(name);
            log.debug("Initializing stream '{}'", (Object)name);
            stream.init();
        }
        log.debug("Initializing all Sinks...");
        for (String name : this.sinks.keySet()) {
            Sink sink = this.sinks.get(name);
            log.debug("Initializing sink '{}'", (Object)name);
            sink.init();
        }
        log.info("Initializing life-cycle objects...");
        for (LifeCycle lc : this.lifeCyleObjects) {
            log.info("Initializing life-cycle for {}", (Object)lc);
            lc.init((ApplicationContext)this.context);
        }
        Supervisor supervisor = new Supervisor(this.computeGraph()){

            @Override
            public void processError(Process p, Exception e) {
                super.processError(p, e);
                log.error("Process {} signaled an error: {}", (Object)p, (Object)e.getMessage());
                log.debug("Forcing fail-fast shutdown of application...");
                ProcessContainer.this.failFastReason = e;
                ProcessContainer.this.shutdown();
            }
        };
        log.info("Creating {} active processes...", (Object)this.processes.size());
        long start = System.currentTimeMillis();
        for (Process spu : this.processes) {
            ProcessContext ctx = this.processContexts.get(spu);
            if (ctx == null) {
                ctx = new ProcessContextImpl("process:" + spu.hashCode(), this.context);
                this.processContexts.put(spu, ctx);
            }
            ProcessThread worker = new ProcessThread(spu, this.context);
            worker.addListener(supervisor);
            log.debug("Initializing stream-process [{}]", (Object)spu);
            worker.init();
            log.debug("Starting stream-process [{}]", (Object)spu);
            worker.start();
            log.debug("Stream-process started.");
        }
        log.debug("Waiting for container to finish...");
        AbstractShutdownCondition con = null;
        con = this.server ? new ServerShutdownCondition() : new LocalShutdownCondition();
        log.debug("Waiting for shutdown-condition...");
        con.waitForCondition(this.depGraph);
        log.debug("Shutdown-condition met, container finished.");
        this.runShutdownHook = false;
        long end = System.currentTimeMillis();
        log.trace("Running processes: {}", this.processes);
        log.debug("ProcessContainer finished all processes after {} ms", (Object)(end - start));
        if (this.failFastReason != null) {
            throw this.failFastReason;
        }
        while (supervisor.processesRunning() > 0) {
            log.info("{} processes still running...", (Object)supervisor.processesRunning());
            try {
                Thread.sleep(1000L);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        return end - start;
    }

    public Variables getVariables() {
        return this.containerVariables;
    }

    public Set<String> getStreamListenerNames() {
        return this.listeners.keySet();
    }

    public ObjectFactory getObjectFactory() {
        return this.objectFactory;
    }

    public void dataArrived(String key, Data item) {
        if (this.listeners.containsKey(key)) {
            log.debug("Adding item {} into queue {}", (Object)item, (Object)key);
            try {
                this.listeners.get(key).write(item);
            }
            catch (Exception e) {
                log.error("Failed to inject arriving data item into queue {}: {}", (Object)key, (Object)e.getMessage());
            }
        } else {
            log.warn("No listener defined for {}", (Object)key);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Boolean bl = this.runShutdownHook;
        synchronized (bl) {
            if (!this.runShutdownHook.booleanValue()) {
                return;
            }
            this.runShutdownHook = false;
        }
        log.debug("Graph has {} source elements", (Object)this.depGraph.getRootSources().size());
        for (Object source : this.depGraph.getRootSources()) {
            log.debug("Removing element {} from compute-graph", source);
            List lifeCycles = this.depGraph.remove(source);
            for (LifeCycle lf : lifeCycles) {
                try {
                    log.debug("Finishing LifeCycle object {}", (Object)lf);
                    lf.finish();
                }
                catch (Exception e) {
                    log.error("Failed to end LifeCycle object {}: {}", (Object)lf, (Object)e.getMessage());
                    if (!log.isDebugEnabled()) continue;
                    e.printStackTrace();
                }
                finally {
                    this.lifeCyleObjects.remove(lf);
                }
            }
        }
        Iterator<LifeCycle> it = this.lifeCyleObjects.iterator();
        while (it.hasNext()) {
            LifeCycle lc = it.next();
            try {
                log.info("Finishing life-cycle object {}", (Object)lc);
                lc.finish();
            }
            catch (Exception e) {
                log.error("Failed to end life-cycle object {}: {}", (Object)lc, (Object)e.getMessage());
                if (!log.isDebugEnabled()) continue;
                e.printStackTrace();
            }
            finally {
                it.remove();
            }
        }
    }

    public void setProcessContext(Process process, ProcessContext ctx) {
        this.processContexts.put(process, ctx);
    }

    public NamingService getNamingService() {
        return this.namingService;
    }

    static {
        String[] handlerClasses;
        log = LoggerFactory.getLogger(ProcessContainer.class);
        container = new ArrayList<ProcessContainer>();
        log.debug("Adding container shutdown-hook");
        Thread t = new Thread(){

            @Override
            public void run() {
                if ("disabled".equalsIgnoreCase(System.getProperty("container.shutdown-hook"))) {
                    log.warn("Shutdown-hook disabled...");
                    return;
                }
                log.debug("Running shutdown-hook...");
                for (ProcessContainer pc : container) {
                    log.debug("Sending shutdown signal to {}", (Object)pc);
                    pc.shutdown();
                }
            }
        };
        t.setDaemon(true);
        Runtime.getRuntime().addShutdownHook(t);
        extensions = new String[]{"stream.moa.MoaObjectFactory", "stream.script.JavaScriptProcessorFactory"};
        autoHandlers = new LinkedHashMap<String, ElementHandler>();
        for (String ext : extensions) {
            try {
                Class<?> clazz = Class.forName(ext);
                ObjectCreator creator = (ObjectCreator)clazz.newInstance();
                ObjectFactory.registerObjectCreator(creator);
                log.debug("Registered extension {}", (Object)ext);
            }
            catch (Exception e) {
                log.debug("Failed to register extension '{}': {}", (Object)ext, (Object)e.getMessage());
                if (!log.isTraceEnabled()) continue;
                e.printStackTrace();
            }
        }
        for (String handlerClass : handlerClasses = new String[]{"streams.esper.EsperEngineElementHandler"}) {
            try {
                Class<?> clazz = Class.forName(handlerClass);
                ElementHandler handler = (ElementHandler)clazz.newInstance();
                String key = handler.getKey();
                autoHandlers.put(key, handler);
            }
            catch (Exception e) {
                log.debug("Failed to register handler {}", (Object)handlerClass);
                if (!log.isTraceEnabled()) continue;
                e.printStackTrace();
            }
        }
    }
}

