/*
 * Decompiled with CFR 0.152.
 */
package stream.runtime;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Process;
import stream.app.ComputeGraph;
import stream.io.Sink;
import stream.runtime.Monitor;
import stream.runtime.ProcessListener;

public class Supervisor
implements ProcessListener {
    static Logger log = LoggerFactory.getLogger(Supervisor.class);
    List<Process> runningProcesses = new ArrayList<Process>();
    final AtomicInteger running = new AtomicInteger(0);
    final AtomicInteger errors = new AtomicInteger(0);
    final AtomicInteger finished = new AtomicInteger(0);
    ComputeGraph dependencies;
    Map<Process, Set<Sink>> processOutlets = new HashMap<Process, Set<Sink>>();

    public Supervisor(ComputeGraph graph) {
        this.dependencies = graph;
    }

    @Override
    public void processStarted(Process p) {
        if (p instanceof Monitor) {
            log.info("Monitor #{} started", (Object)p);
            return;
        }
        log.debug("Process  #{}  started.", (Object)p);
        int run2 = this.running.incrementAndGet();
        this.runningProcesses.add(p);
        Set<Sink> sinks = this.collectSinks(p);
        log.debug("   process #{} is writing to {} sinks", (Object)sinks.size());
        this.processOutlets.put(p, sinks);
        log.debug("{} processes running.", (Object)run2);
    }

    @Override
    public void processError(Process p, Exception e) {
        this.errors.incrementAndGet();
    }

    @Override
    public void processFinished(Process p) {
        int run2 = this.running.decrementAndGet();
        this.finished.incrementAndGet();
        this.runningProcesses.remove(p);
        log.debug("Process  #{}  finished.", (Object)p);
        Set<Sink> outs = this.processOutlets.get(p);
        if (outs == null) {
            outs = new HashSet<Sink>();
        }
        log.debug("   process has {} outgoing targets: {}", (Object)outs.size(), outs);
        Set<Sink> outlets = this.processOutlets.remove(p);
        for (Sink sink : outlets) {
            int refCount = 0;
            for (Process pr : this.processOutlets.keySet()) {
                Set<Sink> prOuts = this.processOutlets.get(pr);
                if (!prOuts.contains(sink)) continue;
                ++refCount;
            }
            if (refCount == 0) {
                log.debug("Reference count of {} is 0, closing sink!", (Object)sink);
                continue;
            }
            log.debug("Reference count for {} is: {}", (Object)sink, (Object)refCount);
        }
        if (log.isTraceEnabled()) {
            this.printTargets(p, 0);
        }
        this.dependencies.remove((Object)p);
        log.debug("{} processes running.", (Object)run2);
    }

    public void printTargets(Object src, int depth) {
        String prefix = "";
        for (int i = 0; i < depth; ++i) {
            prefix = prefix + "  ";
        }
        Set outs = this.dependencies.getTargets(src);
        for (Object out : outs) {
            log.debug(prefix + " " + out);
            if (out instanceof Sink) break;
            this.printTargets(out, depth + 1);
        }
    }

    public int processesRunning() {
        log.debug("Active processes: {}", this.runningProcesses);
        return this.running.get();
    }

    public Set<Sink> collectSinks(Object p) {
        HashSet<Sink> sinks = new HashSet<Sink>();
        Set outs = this.dependencies.getTargets(p);
        for (Object out : outs) {
            if (out instanceof Sink) {
                sinks.add((Sink)out);
                continue;
            }
            sinks.addAll(this.collectSinks(out));
        }
        return sinks;
    }
}

