package io.continual.services.processor.engine.runtime;

import io.continual.services.Service;
import io.continual.services.SimpleService;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Pipeline;
import io.continual.services.processor.engine.model.Program;
import io.continual.services.processor.engine.model.Sink;
import io.continual.services.processor.engine.model.Source;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.services.processor.service.ProcessingService;
import io.continual.util.data.exprEval.ExprDataSource;
import io.continual.util.data.exprEval.ExprDataSourceStack;
import io.continual.util.data.exprEval.ExpressionEvaluator;
import io.continual.util.data.json.JsonEval;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/services/processor/engine/runtime/Engine.class */
public class Engine extends SimpleService implements Service {
    private final Program fProgram;
    private final HashMap<String, ExecThread> fThreads = new HashMap<>();
    private final SerialNumberGenerator fSnGen = new SerialNumberGenerator();
    private final HashMap<String, String> fUserData = new HashMap<>();
    private final ExprDataSourceStack fExprEvalStack;
    private static final Logger log = LoggerFactory.getLogger(Engine.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/continual/services/processor/engine/runtime/Engine$ExecThread.class */
    public class ExecThread extends Thread {
        private final String fName;
        private final Source fSource;
        private final StreamProcessingContext fStreamContext;

        public ExecThread(Engine engine, String str, Source source) {
            this(str, source, new StdStreamProcessingContext(source));
        }

        public ExecThread(String str, Source source, StreamProcessingContext streamProcessingContext) {
            super("ExecThread " + str);
            this.fName = str;
            this.fSource = source;
            this.fStreamContext = streamProcessingContext;
        }

        public String getSourceName() {
            return this.fName;
        }

        public Source getSource() {
            return this.fSource;
        }

        public StreamProcessingContext getStreamContext() {
            return this.fStreamContext;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    for (Map.Entry<String, ProcessingService> entry : Engine.this.fProgram.getServicesFor(this.fName).entrySet()) {
                        this.fStreamContext.addNamedObject(entry.getKey(), entry.getValue());
                    }
                    Iterator<Map.Entry<String, ProcessingService>> it = Engine.this.fProgram.getServicesFor(this.fName).entrySet().iterator();
                    while (it.hasNext()) {
                        it.next().getValue().startBackgroundProcessing();
                    }
                    Engine.log.info("Source " + this.fName + ": START");
                    while (!this.fSource.isEof() && !this.fStreamContext.failed()) {
                        MessageAndRouting nextMessage = this.fSource.getNextMessage(this.fStreamContext, 500L, TimeUnit.MILLISECONDS);
                        if (nextMessage != null) {
                            Pipeline pipeline = Engine.this.fProgram.getPipeline(nextMessage.getPipelineName());
                            if (pipeline == null) {
                                Engine.log.info("No pipeline {} for source \"{}\", ignored.", nextMessage.getPipelineName(), this.fName);
                            } else {
                                pipeline.process(new StdProcessingContext(this.fStreamContext, nextMessage.getMessage()));
                            }
                            this.fSource.markComplete(this.fStreamContext, nextMessage);
                        }
                    }
                    if (this.fSource.isEof()) {
                        Engine.log.info("Source " + this.fName + ": EOF");
                    } else {
                        Engine.log.warn("Processing stopped.");
                    }
                    Iterator<Map.Entry<String, ProcessingService>> it2 = Engine.this.fProgram.getServicesFor(this.fName).entrySet().iterator();
                    while (it2.hasNext()) {
                        it2.next().getValue().stopBackgroundProcessing();
                    }
                } catch (IOException e) {
                    Engine.log.warn("Error on source {}: {}", this.fName, e.getMessage());
                    Iterator<Map.Entry<String, ProcessingService>> it3 = Engine.this.fProgram.getServicesFor(this.fName).entrySet().iterator();
                    while (it3.hasNext()) {
                        it3.next().getValue().stopBackgroundProcessing();
                    }
                } catch (InterruptedException e2) {
                    Engine.log.info("Source {} interrupted.", this.fName);
                    Iterator<Map.Entry<String, ProcessingService>> it4 = Engine.this.fProgram.getServicesFor(this.fName).entrySet().iterator();
                    while (it4.hasNext()) {
                        it4.next().getValue().stopBackgroundProcessing();
                    }
                }
            } catch (Throwable th) {
                Iterator<Map.Entry<String, ProcessingService>> it5 = Engine.this.fProgram.getServicesFor(this.fName).entrySet().iterator();
                while (it5.hasNext()) {
                    it5.next().getValue().stopBackgroundProcessing();
                }
                throw th;
            }
        }
    }

    /* loaded from: input_file:io/continual/services/processor/engine/runtime/Engine$StdProcessingContext.class */
    private class StdProcessingContext implements MessageProcessingContext {
        private final StreamProcessingContext fSpc;
        private final String fId;
        private final Message fMsg;
        private boolean fHaltRequested = false;

        public StdProcessingContext(StreamProcessingContext streamProcessingContext, Message message) {
            this.fSpc = streamProcessingContext;
            this.fMsg = message;
            this.fId = Engine.this.fSnGen.getNext();
        }

        @Override // io.continual.services.processor.engine.model.MessageProcessingContext
        public StreamProcessingContext getStreamProcessingContext() {
            return this.fSpc;
        }

        @Override // io.continual.services.processor.engine.model.MessageProcessingContext
        public Message getMessage() {
            return this.fMsg;
        }

        @Override // io.continual.services.processor.engine.model.MessageProcessingContext
        public String getId() {
            return this.fId;
        }

        @Override // io.continual.services.processor.engine.model.MessageProcessingContext
        public Source getSource(String str) {
            return Engine.this.fProgram.getSources().get(str);
        }

        @Override // io.continual.services.processor.engine.model.MessageProcessingContext
        public Sink getSink(String str) {
            return Engine.this.fProgram.getSinks().get(str);
        }

        @Override // io.continual.services.processor.engine.model.MessageProcessingContext
        public boolean shouldContinue() {
            return (this.fHaltRequested || this.fSpc.failed()) ? false : true;
        }

        @Override // io.continual.services.processor.engine.model.MessageProcessingContext
        public void stopProcessing() {
            this.fHaltRequested = true;
        }

        @Override // io.continual.services.processor.engine.model.MessageProcessingContext
        public void stopProcessing(String str) {
            this.fHaltRequested = true;
            warn(str);
        }

        @Override // io.continual.services.processor.engine.model.MessageProcessingContext
        public void warn(String str) {
            Engine.log.warn("msg #{}: {}", this.fId, str);
        }

        /* JADX WARN: Type inference failed for: r0v2, types: [T, java.lang.String] */
        @Override // io.continual.services.processor.engine.model.MessageProcessingContext
        public <T> T evalExpression(String str, Class<T> cls) {
            ?? r0 = (T) ExpressionEvaluator.evaluateText(str, new ExprDataSource[]{new ExprDataSource() { // from class: io.continual.services.processor.engine.runtime.Engine.StdProcessingContext.1
                public Object eval(String str2) {
                    return JsonEval.eval(StdProcessingContext.this.fMsg.accessRawJson(), str2);
                }
            }, Engine.this.fExprEvalStack});
            if (cls.equals(String.class)) {
                return r0;
            }
            if (cls.equals(Long.class)) {
                return (T) new Long(Long.parseLong(r0));
            }
            if (cls.equals(Integer.class)) {
                return (T) new Integer(Integer.parseInt(r0));
            }
            if (cls.equals(Double.class)) {
                return (T) new Double(Double.parseDouble(r0));
            }
            throw new IllegalArgumentException("Can't eval to " + cls.getName());
        }
    }

    /* loaded from: input_file:io/continual/services/processor/engine/runtime/Engine$StdStreamProcessingContext.class */
    public class StdStreamProcessingContext implements StreamProcessingContext {
        private final Source fSource;
        private boolean fFailed = false;
        private final HashMap<String, Object> fObjects = new HashMap<>();

        public StdStreamProcessingContext(Source source) {
            this.fSource = source;
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public void warn(String str) {
            Engine.log.warn("stream: {}", str);
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public void fail(String str) {
            warn(str);
            this.fFailed = true;
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public boolean failed() {
            return this.fFailed;
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public StreamProcessingContext addNamedObject(String str, Object obj) {
            this.fObjects.put(str, obj);
            return this;
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public Object getNamedObject(String str) {
            return this.fObjects.get(str);
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public <T> T getNamedObject(String str, Class<T> cls) throws ClassCastException {
            T t = (T) getNamedObject(str);
            if (t == null) {
                return null;
            }
            if (cls.isInstance(t)) {
                return t;
            }
            throw new ClassCastException("Object " + str + " is not a " + cls.getName());
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public <T> T getReqdNamedObject(String str, Class<T> cls) throws StreamProcessingContext.NoSuitableObjectException {
            try {
                T t = (T) getNamedObject(str, cls);
                if (t == null) {
                    throw new StreamProcessingContext.NoSuitableObjectException("No object named " + str + ".");
                }
                return t;
            } catch (ClassCastException e) {
                throw new StreamProcessingContext.NoSuitableObjectException(e);
            }
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public StreamProcessingContext removeNamedObject(String str) {
            this.fObjects.remove(str);
            return this;
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public boolean setFlag(String str) {
            boolean checkFlag = checkFlag(str);
            addNamedObject(str, Boolean.TRUE);
            return checkFlag;
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public boolean checkFlag(String str) {
            Boolean bool = (Boolean) getNamedObject(str, Boolean.class);
            if (bool == null) {
                return false;
            }
            return bool.booleanValue();
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public boolean clearFlag(String str) {
            boolean checkFlag = checkFlag(str);
            removeNamedObject(str);
            return checkFlag;
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public void requeue(MessageAndRouting messageAndRouting) {
            this.fSource.requeue(messageAndRouting);
        }

        @Override // io.continual.services.processor.engine.model.StreamProcessingContext
        public String evalExpression(String str) {
            return ExpressionEvaluator.evaluateText(str, new ExprDataSource[]{Engine.this.fExprEvalStack});
        }
    }

    public Engine(Program program) {
        this.fProgram = program;
        for (Map.Entry<String, Source> entry : this.fProgram.getSources().entrySet()) {
            this.fThreads.put(entry.getKey(), new ExecThread(this, entry.getKey(), entry.getValue()));
        }
        this.fExprEvalStack = new ExprDataSourceStack(new ExprDataSource[]{new ExprDataSource() { // from class: io.continual.services.processor.engine.runtime.Engine.1
            public Object eval(String str) {
                return Engine.this.getUserData(str);
            }
        }, new ExprDataSource() { // from class: io.continual.services.processor.engine.runtime.Engine.2
            public Object eval(String str) {
                return System.getenv().get(str);
            }
        }});
    }

    public synchronized boolean isRunning() {
        Iterator<ExecThread> it = this.fThreads.values().iterator();
        while (it.hasNext()) {
            if (it.next().isAlive()) {
                return true;
            }
        }
        return false;
    }

    public void startAndWait() throws Service.FailedToStart {
        try {
            start();
            while (isRunning()) {
                Thread.sleep(100L);
            }
        } catch (InterruptedException e) {
            System.out.println("exiting...");
        }
        waitForCompletion();
    }

    public void waitForCompletion() {
        Iterator<Source> it = this.fProgram.getSources().values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (IOException e) {
                log.warn("Problem closing source: " + e.getMessage());
            }
        }
        Iterator<Sink> it2 = this.fProgram.getSinks().values().iterator();
        while (it2.hasNext()) {
            try {
                it2.next().close();
            } catch (IOException e2) {
                log.warn("Problem closing sink: " + e2.getMessage());
            }
        }
    }

    public StreamProcessingContext getStreamContextFor(String str) {
        ExecThread execThread = this.fThreads.get(str);
        if (execThread != null) {
            return execThread.getStreamContext();
        }
        return null;
    }

    public void setUserData(String str, String str2) {
        this.fUserData.put(str, str2);
    }

    public String getUserData(String str) {
        return this.fUserData.get(str);
    }

    public void removeUserData(String str) {
        this.fUserData.remove(str);
    }

    protected void onStartRequested() throws Service.FailedToStart {
        Iterator<Sink> it = this.fProgram.getSinks().values().iterator();
        while (it.hasNext()) {
            it.next().init();
        }
        Iterator<ExecThread> it2 = this.fThreads.values().iterator();
        while (it2.hasNext()) {
            it2.next().start();
        }
    }

    protected void onStopRequested() {
        log.info("Stopping processing engine...");
        for (ExecThread execThread : this.fThreads.values()) {
            try {
                execThread.getSource().close();
            } catch (IOException e) {
                log.warn("Problem closing source {}: {}", execThread.getSourceName(), e.getMessage());
            }
        }
    }
}
