/*
 * Decompiled with CFR 0.152.
 */
package io.continual.services.processor.engine.runtime;

import io.continual.iam.identity.Identity;
import io.continual.metrics.MetricsCatalog;
import io.continual.metrics.impl.StdMetricsCatalog;
import io.continual.services.Service;
import io.continual.services.SimpleService;
import io.continual.services.processor.engine.library.util.SimpleStreamProcessingContext;
import io.continual.services.processor.engine.model.Program;
import io.continual.services.processor.engine.model.Sink;
import io.continual.services.processor.engine.model.Source;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.services.processor.engine.runtime.SerialNumberGenerator;
import io.continual.util.data.exprEval.ExprDataSource;
import io.continual.util.data.exprEval.ExprDataSourceStack;
import io.continual.util.data.exprEval.SpecialFnsDataSource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Engine
extends SimpleService
implements Service {
    private final Program fProgram;
    private final HashMap<String, ExecThread> fThreads;
    private final MetricsDumpThread fMetricsDumper;
    private final SerialNumberGenerator fSnGen;
    private final HashMap<String, String> fUserData;
    private final Identity fIdentity;
    private final ExprDataSourceStack fExprEvalStack;
    private final MetricsCatalog fEngineMetrics;
    private static final Logger log = LoggerFactory.getLogger(Engine.class);
    private static final Logger metricsLog = LoggerFactory.getLogger((String)"continualProcessorEngineMetrics");

    public Engine(Program p) {
        this(null, p);
    }

    public Engine(Identity ii, Program p) {
        this.fIdentity = ii;
        this.fProgram = p;
        this.fThreads = new HashMap();
        this.fSnGen = new SerialNumberGenerator();
        this.fUserData = new HashMap();
        this.fEngineMetrics = new StdMetricsCatalog.Builder().build();
        this.fExprEvalStack = new ExprDataSourceStack(new ExprDataSource[]{new ExprDataSource(){

            public Object eval(String label) {
                return Engine.this.getUserData(label);
            }
        }, new ExprDataSource(){

            public Object eval(String label) {
                return System.getenv().get(label);
            }
        }, new SpecialFnsDataSource()});
        this.fMetricsDumper = new MetricsDumpThread();
        TreeSet<String> names = new TreeSet<String>();
        for (Map.Entry<String, Source> src : this.fProgram.getSources().entrySet()) {
            String threadName = Engine.getName(names, src.getKey());
            this.fThreads.put(threadName, new ExecThread(threadName, src.getKey(), src.getValue()));
        }
    }

    public synchronized boolean isRunning() {
        for (ExecThread t : this.fThreads.values()) {
            if (!t.isAlive()) continue;
            return true;
        }
        return false;
    }

    public void startAndWait() throws Service.FailedToStart {
        try {
            this.start();
            while (this.isRunning()) {
                Thread.sleep(100L);
            }
        }
        catch (InterruptedException e1) {
            System.out.println("exiting...");
        }
        this.waitForCompletion();
    }

    public void waitForCompletion() {
        for (Source src : this.fProgram.getSources().values()) {
            try {
                src.close();
            }
            catch (IOException e) {
                log.warn("Problem closing source: " + e.getMessage());
            }
        }
        for (Sink sink : this.fProgram.getSinks().values()) {
            try {
                sink.close();
            }
            catch (IOException e) {
                log.warn("Problem closing sink: " + e.getMessage());
            }
        }
    }

    public StreamProcessingContext getStreamContextFor(String sourceName) {
        ExecThread thread = this.fThreads.get(sourceName);
        if (thread != null) {
            return thread.getStreamContext();
        }
        return null;
    }

    public Engine setUserData(String key, String value) {
        this.fUserData.put(key, value);
        return this;
    }

    public String getUserData(String key) {
        return this.fUserData.get(key);
    }

    public void removeUserData(String key) {
        this.fUserData.remove(key);
    }

    protected void onStartRequested() throws Service.FailedToStart {
        this.fMetricsDumper.start();
        for (Sink sink : this.fProgram.getSinks().values()) {
            sink.init();
        }
        for (ExecThread t : this.fThreads.values()) {
            t.start();
        }
    }

    protected void onStopRequested() {
        log.info("Stopping processing engine...");
        for (ExecThread t : this.fThreads.values()) {
            try {
                t.getSource().close();
            }
            catch (IOException e) {
                log.warn("Problem closing source {}: {}", (Object)t.getSourceName(), (Object)e.getMessage());
            }
        }
    }

    private static String getName(Set<String> used, String requested) {
        if (!used.contains(requested)) {
            used.add(requested);
            return requested;
        }
        int i = 2;
        String candidate = requested + "-" + i;
        while (used.contains(candidate)) {
            candidate = requested + "-" + ++i;
        }
        used.add(candidate);
        return candidate;
    }

    static /* synthetic */ Program access$600(Engine x0) {
        return x0.fProgram;
    }

    static /* synthetic */ SerialNumberGenerator access$700(Engine x0) {
        return x0.fSnGen;
    }

    private class ExecThread
    extends Thread {
        private final String fSrcName;
        private final Source fSource;
        private final MetricsCatalog fThreadMetrics;
        private final StreamProcessingContext fStreamContext;

        public ExecThread(String threadName, String srcName, Source s) {
            super("ExecThread " + threadName);
            this.fSrcName = srcName;
            this.fSource = s;
            this.fThreadMetrics = Engine.this.fEngineMetrics.getSubCatalog(threadName);
            this.fStreamContext = SimpleStreamProcessingContext.builder().withSource(s).operatedBy(Engine.this.fIdentity).evaluatingAgainst((ExprDataSource)Engine.this.fExprEvalStack).loggingTo(log).reportMetricsTo(this.fThreadMetrics).build();
        }

        public String getSourceName() {
            return this.fSrcName;
        }

        public Source getSource() {
            return this.fSource;
        }

        public StreamProcessingContext getStreamContext() {
            return this.fStreamContext;
        }

        /*
         * Exception decompiling
         */
        @Override
        public void run() {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }
    }

    private class MetricsDumpThread
    extends Thread {
        public MetricsDumpThread() {
            super("processor metrics dumper");
            this.setDaemon(true);
        }

        @Override
        public void run() {
            try {
                boolean running = true;
                while (running) {
                    ExecThread t;
                    Thread.sleep(5000L);
                    String text = Engine.this.fEngineMetrics.toJson().toString();
                    metricsLog.info(text);
                    running = false;
                    Iterator iterator = Engine.this.fThreads.values().iterator();
                    while (iterator.hasNext() && !(running = (t = (ExecThread)iterator.next()).isAlive())) {
                    }
                }
                log.info("Metrics dump thread exiting.");
            }
            catch (InterruptedException e) {
                log.warn("Metrics dumper interrupted: ", (Throwable)e);
            }
            catch (Throwable e) {
                log.warn("Metrics dumper terminated: ", e);
            }
        }
    }
}

