/*
 * Decompiled with CFR 0.152.
 */
package io.continual.services.processor.engine.library.util;

import io.continual.metrics.MetricsCatalog;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Program;
import io.continual.services.processor.engine.model.Sink;
import io.continual.services.processor.engine.model.Source;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.services.processor.engine.runtime.SerialNumberGenerator;
import io.continual.util.data.exprEval.ExprDataSource;
import io.continual.util.data.exprEval.ExprDataSourceStack;
import io.continual.util.data.exprEval.ExpressionEvaluator;
import io.continual.util.data.json.JsonEval;
import org.json.JSONObject;

public class SimpleMessageProcessingContext
implements MessageProcessingContext {
    private final StreamProcessingContext fSpc;
    private final String fId;
    private final Message fMsg;
    private final ExprDataSource fEvalStack;
    private final Program fProgram;
    private boolean fHaltRequested = false;

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public StreamProcessingContext getStreamProcessingContext() {
        return this.fSpc;
    }

    @Override
    public Message getMessage() {
        return this.fMsg;
    }

    @Override
    public String getId() {
        return this.fId;
    }

    @Override
    public Source getSource(String sinkName) {
        return this.fProgram.getSources().get(sinkName);
    }

    @Override
    public Sink getSink(String sinkName) {
        return this.fProgram.getSinks().get(sinkName);
    }

    @Override
    public boolean shouldContinue() {
        return !this.fHaltRequested && !this.fSpc.failed();
    }

    @Override
    public void stopProcessing() {
        this.fHaltRequested = true;
    }

    @Override
    public void stopProcessing(String warningText) {
        this.fHaltRequested = true;
        this.warn(warningText);
    }

    @Override
    public void warn(String warningText) {
        this.fSpc.warn("msg #" + this.fId + ": " + warningText);
    }

    @Override
    public <T> T evalExpression(String expression, Class<T> targetClass) {
        ExprDataSource eds = new ExprDataSource(){

            public Object eval(String label) {
                return JsonEval.eval((JSONObject)SimpleMessageProcessingContext.this.fMsg.accessRawJson(), (String)label);
            }
        };
        String asString = ExpressionEvaluator.evaluateText((String)expression, (ExprDataSource[])new ExprDataSource[]{eds, this.fEvalStack});
        if (targetClass.equals(String.class)) {
            return (T)asString;
        }
        if (targetClass.equals(Long.class)) {
            return (T)new Long(Long.parseLong(asString));
        }
        if (targetClass.equals(Integer.class)) {
            return (T)new Integer(Integer.parseInt(asString));
        }
        if (targetClass.equals(Double.class)) {
            return (T)new Double(Double.parseDouble(asString));
        }
        throw new IllegalArgumentException("Can't eval to " + targetClass.getName());
    }

    @Override
    public MetricsCatalog getMetrics() {
        return this.fSpc.getMetrics().getSubCatalog("messageProcessing");
    }

    private SimpleMessageProcessingContext(Builder b, Message msg) {
        this.fSpc = b.fStreamProcContext;
        this.fMsg = msg;
        this.fId = b.fSng.getNext();
        this.fEvalStack = b.fEvalStack;
        this.fProgram = b.fSrcSinkProg;
    }

    public static class Builder {
        private StreamProcessingContext fStreamProcContext = null;
        private SerialNumberGenerator fSng = new SerialNumberGenerator();
        private ExprDataSource fEvalStack = new ExprDataSourceStack(new ExprDataSource[0]);
        private Program fSrcSinkProg = new Program();

        public SimpleMessageProcessingContext build(Message msg) {
            return new SimpleMessageProcessingContext(this, msg);
        }

        public Builder usingContext(StreamProcessingContext s) {
            this.fStreamProcContext = s;
            return this;
        }

        public Builder serialNumbersFrom(SerialNumberGenerator sng) {
            this.fSng = sng;
            return this;
        }

        public Builder evaluatingAgainst(ExprDataSource eval) {
            this.fEvalStack = eval;
            return this;
        }

        public Builder sourcesAndSinksFrom(Program prog) {
            this.fSrcSinkProg = prog;
            return this;
        }
    }
}

