/*
 * Decompiled with CFR 0.152.
 */
package io.continual.services.processor.engine.runtime;

import io.continual.services.Service;
import io.continual.services.SimpleService;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Pipeline;
import io.continual.services.processor.engine.model.Program;
import io.continual.services.processor.engine.model.Sink;
import io.continual.services.processor.engine.model.Source;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.services.processor.engine.runtime.SerialNumberGenerator;
import io.continual.services.processor.service.ProcessingService;
import io.continual.util.data.exprEval.ExprDataSource;
import io.continual.util.data.exprEval.ExprDataSourceStack;
import io.continual.util.data.exprEval.ExpressionEvaluator;
import io.continual.util.data.json.JsonEval;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Engine
extends SimpleService
implements Service {
    private final Program fProgram;
    private final HashMap<String, ExecThread> fThreads;
    private final SerialNumberGenerator fSnGen;
    private final HashMap<String, String> fUserData;
    private final ExprDataSourceStack fExprEvalStack;
    private static final Logger log = LoggerFactory.getLogger(Engine.class);

    public Engine(Program p) {
        this.fProgram = p;
        this.fThreads = new HashMap();
        this.fSnGen = new SerialNumberGenerator();
        this.fUserData = new HashMap();
        for (Map.Entry<String, Source> src : this.fProgram.getSources().entrySet()) {
            this.fThreads.put(src.getKey(), new ExecThread(src.getKey(), src.getValue()));
        }
        this.fExprEvalStack = new ExprDataSourceStack(new ExprDataSource[]{new ExprDataSource(){

            public Object eval(String label) {
                return Engine.this.getUserData(label);
            }
        }, new ExprDataSource(){

            public Object eval(String label) {
                return System.getenv().get(label);
            }
        }});
    }

    public synchronized boolean isRunning() {
        for (ExecThread t : this.fThreads.values()) {
            if (!t.isAlive()) continue;
            return true;
        }
        return false;
    }

    public void startAndWait() throws Service.FailedToStart {
        try {
            this.start();
            while (this.isRunning()) {
                Thread.sleep(100L);
            }
        }
        catch (InterruptedException e1) {
            System.out.println("exiting...");
        }
        this.waitForCompletion();
    }

    public void waitForCompletion() {
        for (Source src : this.fProgram.getSources().values()) {
            try {
                src.close();
            }
            catch (IOException e) {
                log.warn("Problem closing source: " + e.getMessage());
            }
        }
        for (Sink sink : this.fProgram.getSinks().values()) {
            try {
                sink.close();
            }
            catch (IOException e) {
                log.warn("Problem closing sink: " + e.getMessage());
            }
        }
    }

    public StreamProcessingContext getStreamContextFor(String sourceName) {
        ExecThread thread = this.fThreads.get(sourceName);
        if (thread != null) {
            return thread.getStreamContext();
        }
        return null;
    }

    public Engine setUserData(String key, String value) {
        this.fUserData.put(key, value);
        return this;
    }

    public String getUserData(String key) {
        return this.fUserData.get(key);
    }

    public void removeUserData(String key) {
        this.fUserData.remove(key);
    }

    protected void onStartRequested() throws Service.FailedToStart {
        for (Sink sink : this.fProgram.getSinks().values()) {
            sink.init();
        }
        for (ExecThread t : this.fThreads.values()) {
            t.start();
        }
    }

    protected void onStopRequested() {
        log.info("Stopping processing engine...");
        for (ExecThread t : this.fThreads.values()) {
            try {
                t.getSource().close();
            }
            catch (IOException e) {
                log.warn("Problem closing source {}: {}", (Object)t.getSourceName(), (Object)e.getMessage());
            }
        }
    }

    private class StdProcessingContext
    implements MessageProcessingContext {
        private final StreamProcessingContext fSpc;
        private final String fId;
        private final Message fMsg;
        private boolean fHaltRequested = false;

        public StdProcessingContext(StreamProcessingContext spc, Message msg) {
            this.fSpc = spc;
            this.fMsg = msg;
            this.fId = Engine.this.fSnGen.getNext();
        }

        @Override
        public StreamProcessingContext getStreamProcessingContext() {
            return this.fSpc;
        }

        @Override
        public Message getMessage() {
            return this.fMsg;
        }

        @Override
        public String getId() {
            return this.fId;
        }

        @Override
        public Source getSource(String sinkName) {
            return Engine.this.fProgram.getSources().get(sinkName);
        }

        @Override
        public Sink getSink(String sinkName) {
            return Engine.this.fProgram.getSinks().get(sinkName);
        }

        @Override
        public boolean shouldContinue() {
            return !this.fHaltRequested && !this.fSpc.failed();
        }

        @Override
        public void stopProcessing() {
            this.fHaltRequested = true;
        }

        @Override
        public void stopProcessing(String warningText) {
            this.fHaltRequested = true;
            this.warn(warningText);
        }

        @Override
        public void warn(String warningText) {
            log.warn("msg #{}: {}", (Object)this.fId, (Object)warningText);
        }

        @Override
        public <T> T evalExpression(String expression, Class<T> targetClass) {
            ExprDataSource eds = new ExprDataSource(){

                public Object eval(String label) {
                    return JsonEval.eval((JSONObject)StdProcessingContext.this.fMsg.accessRawJson(), (String)label);
                }
            };
            String asString = ExpressionEvaluator.evaluateText((String)expression, (ExprDataSource[])new ExprDataSource[]{eds, Engine.this.fExprEvalStack});
            if (targetClass.equals(String.class)) {
                return (T)asString;
            }
            if (targetClass.equals(Long.class)) {
                return (T)new Long(Long.parseLong(asString));
            }
            if (targetClass.equals(Integer.class)) {
                return (T)new Integer(Integer.parseInt(asString));
            }
            if (targetClass.equals(Double.class)) {
                return (T)new Double(Double.parseDouble(asString));
            }
            throw new IllegalArgumentException("Can't eval to " + targetClass.getName());
        }
    }

    public class StdStreamProcessingContext
    implements StreamProcessingContext {
        private final Source fSource;
        private final HashMap<String, Object> fObjects;
        private boolean fFailed;

        public StdStreamProcessingContext(Source src) {
            this.fSource = src;
            this.fFailed = false;
            this.fObjects = new HashMap();
        }

        @Override
        public void warn(String warningText) {
            log.warn("stream: {}", (Object)warningText);
        }

        @Override
        public void fail(String warningText) {
            this.warn(warningText);
            this.fFailed = true;
        }

        @Override
        public boolean failed() {
            return this.fFailed;
        }

        @Override
        public StreamProcessingContext addNamedObject(String name, Object o) {
            this.fObjects.put(name, o);
            return this;
        }

        @Override
        public Object getNamedObject(String name) {
            return this.fObjects.get(name);
        }

        @Override
        public <T> T getNamedObject(String name, Class<T> clazz) throws ClassCastException {
            Object o = this.getNamedObject(name);
            if (o == null) {
                return null;
            }
            if (!clazz.isInstance(o)) {
                throw new ClassCastException("Object " + name + " is not a " + clazz.getName());
            }
            return (T)o;
        }

        @Override
        public <T> T getReqdNamedObject(String name, Class<T> clazz) throws StreamProcessingContext.NoSuitableObjectException {
            try {
                T obj = this.getNamedObject(name, clazz);
                if (obj == null) {
                    throw new StreamProcessingContext.NoSuitableObjectException("No object named " + name + ".");
                }
                return obj;
            }
            catch (ClassCastException x) {
                throw new StreamProcessingContext.NoSuitableObjectException(x);
            }
        }

        @Override
        public StreamProcessingContext removeNamedObject(String name) {
            this.fObjects.remove(name);
            return this;
        }

        @Override
        public boolean setFlag(String flagName) {
            boolean result = this.checkFlag(flagName);
            this.addNamedObject(flagName, Boolean.TRUE);
            return result;
        }

        @Override
        public boolean checkFlag(String flagName) {
            Boolean val = this.getNamedObject(flagName, Boolean.class);
            if (val == null) {
                return false;
            }
            return val;
        }

        @Override
        public boolean clearFlag(String flagName) {
            boolean result = this.checkFlag(flagName);
            this.removeNamedObject(flagName);
            return result;
        }

        @Override
        public void requeue(MessageAndRouting mr) {
            this.fSource.requeue(mr);
        }

        @Override
        public String evalExpression(String expression) {
            return ExpressionEvaluator.evaluateText((String)expression, (ExprDataSource[])new ExprDataSource[]{Engine.this.fExprEvalStack});
        }
    }

    private class ExecThread
    extends Thread {
        private final String fName;
        private final Source fSource;
        private final StreamProcessingContext fStreamContext;

        public ExecThread(String name, Source s) {
            this(name, s, engine.new StdStreamProcessingContext(s));
        }

        public ExecThread(String name, Source s, StreamProcessingContext spc) {
            super("ExecThread " + name);
            this.fName = name;
            this.fSource = s;
            this.fStreamContext = spc;
        }

        public String getSourceName() {
            return this.fName;
        }

        public Source getSource() {
            return this.fSource;
        }

        public StreamProcessingContext getStreamContext() {
            return this.fStreamContext;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Could not resolve type clashes
         * Loose catch block
         */
        @Override
        public void run() {
            try {
                for (Map.Entry entry : Engine.this.fProgram.getServicesFor(this.fName).entrySet()) {
                    this.fStreamContext.addNamedObject(entry.getKey(), entry.getValue());
                }
                for (Map.Entry entry : Engine.this.fProgram.getServicesFor(this.fName).entrySet()) {
                    entry.getValue().startBackgroundProcessing();
                }
                log.info("Source " + this.fName + ": START");
                while (!this.fSource.isEof() && !this.fStreamContext.failed()) {
                    MessageAndRouting msgAndRoute = this.fSource.getNextMessage(this.fStreamContext, 500L, TimeUnit.MILLISECONDS);
                    if (msgAndRoute == null) continue;
                    Pipeline pl = Engine.this.fProgram.getPipeline(msgAndRoute.getPipelineName());
                    if (pl == null) {
                        log.info("No pipeline {} for source \"{}\", ignored.", (Object)msgAndRoute.getPipelineName(), (Object)this.fName);
                    } else {
                        StdProcessingContext pc = new StdProcessingContext(this.fStreamContext, msgAndRoute.getMessage());
                        pl.process(pc);
                    }
                    this.fSource.markComplete(this.fStreamContext, msgAndRoute);
                }
                if (this.fSource.isEof()) {
                    log.info("Source " + this.fName + ": EOF");
                } else {
                    log.warn("Processing stopped.");
                }
            }
            catch (IOException e) {
                log.warn("Error on source {}: {}", (Object)this.fName, (Object)e.getMessage());
            }
            catch (InterruptedException e2) {
                log.info("Source {} interrupted.", (Object)this.fName);
                {
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                }
            }
            finally {
                for (Map.Entry<String, ProcessingService> entry : Engine.this.fProgram.getServicesFor(this.fName).entrySet()) {
                    entry.getValue().stopBackgroundProcessing();
                }
            }
            for (Map.Entry entry : Engine.this.fProgram.getServicesFor(this.fName).entrySet()) {
                ((ProcessingService)entry.getValue()).stopBackgroundProcessing();
            }
        }
    }
}

