package io.continual.services.processor.engine.library.sources;

import io.continual.builder.Builder;
import io.continual.resources.ResourceLoader;
import io.continual.services.ServiceContainer;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.util.data.json.JsonVisitor;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.json.JSONException;
import org.json.JSONObject;

/* loaded from: input_file:io/continual/services/processor/engine/library/sources/CsvSource.class */
public class CsvSource extends BasicSource {
    private final String fResource;
    private final String fFieldDelim;
    private final List<String> fFieldList;
    private final String fLineNumberToField;
    private final HashMap<String, String> fFieldMap;
    private boolean fInited;
    private InputStream fStream;
    private CSVParser fParser;
    private Iterator<CSVRecord> fIterator;
    private int fLineNumber;

    public CsvSource(JSONObject jSONObject) throws Builder.BuildFailure {
        this((ServiceContainer) null, jSONObject);
    }

    public CsvSource(String str, String str2) throws Builder.BuildFailure {
        super(str);
        this.fInited = false;
        this.fStream = null;
        this.fParser = null;
        this.fIterator = null;
        try {
            this.fResource = str2;
            this.fFieldDelim = null;
            this.fFieldList = null;
            this.fLineNumberToField = null;
            this.fFieldMap = new HashMap<>();
        } catch (JSONException e) {
            throw new Builder.BuildFailure(e);
        }
    }

    public CsvSource(ServiceContainer serviceContainer, JSONObject jSONObject) throws Builder.BuildFailure {
        super(jSONObject);
        this.fInited = false;
        this.fStream = null;
        this.fParser = null;
        this.fIterator = null;
        try {
            this.fResource = jSONObject.optString("data", "");
            this.fFieldDelim = jSONObject.optString("fieldDelimiter", null);
            this.fFieldList = JsonVisitor.arrayToList(jSONObject.optJSONArray("fields"));
            this.fLineNumberToField = jSONObject.optString("lineNumberTo", null);
            this.fFieldMap = new HashMap<>();
            JsonVisitor.forEachElement(jSONObject.optJSONObject("fieldMap"), new JsonVisitor.ObjectVisitor<String, JSONException>() { // from class: io.continual.services.processor.engine.library.sources.CsvSource.1
                public boolean visit(String str, String str2) throws JSONException {
                    CsvSource.this.fFieldMap.put(str, str2);
                    return true;
                }
            });
        } catch (JSONException e) {
            throw new Builder.BuildFailure(e);
        }
    }

    @Override // io.continual.services.processor.engine.library.sources.BasicSource, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.fParser != null) {
            this.fParser.close();
        }
        this.fParser = null;
        this.fInited = true;
        super.close();
    }

    public CsvSource setResource(InputStream inputStream) {
        this.fStream = inputStream;
        return this;
    }

    @Override // io.continual.services.processor.engine.library.sources.BasicSource, io.continual.services.processor.engine.model.Source
    public boolean isEof() {
        return this.fInited && this.fParser == null;
    }

    @Override // io.continual.services.processor.engine.library.sources.BasicSource
    protected MessageAndRouting internalGetNextMessage(StreamProcessingContext streamProcessingContext, long j, TimeUnit timeUnit) throws IOException {
        init(streamProcessingContext);
        if (isEof()) {
            return null;
        }
        if (this.fIterator.hasNext()) {
            this.fLineNumber++;
            return buildMessage(this.fIterator.next());
        }
        this.fParser.close();
        this.fParser = null;
        return null;
    }

    private void init(StreamProcessingContext streamProcessingContext) throws IOException {
        if (this.fInited) {
            return;
        }
        this.fInited = true;
        if (this.fStream == null) {
            this.fStream = ResourceLoader.load(streamProcessingContext.evalExpression(this.fResource));
        }
        if (this.fStream == null) {
            throw new IOException("Unable to load resource: " + streamProcessingContext.evalExpression(this.fResource) + " (" + this.fResource + ")");
        }
        CSVFormat cSVFormat = CSVFormat.DEFAULT;
        if (this.fFieldDelim != null && this.fFieldDelim.equals("\t")) {
            cSVFormat = CSVFormat.TDF;
        }
        this.fParser = new CSVParser(new InputStreamReader(this.fStream), (this.fFieldList == null || this.fFieldList.size() <= 0) ? cSVFormat.withFirstRecordAsHeader() : cSVFormat.withHeader((String[]) this.fFieldList.toArray(new String[this.fFieldList.size()])));
        this.fIterator = this.fParser.iterator();
        this.fLineNumber = 1;
    }

    private MessageAndRouting buildMessage(CSVRecord cSVRecord) {
        JSONObject jSONObject = new JSONObject();
        for (String str : this.fParser.getHeaderMap().keySet()) {
            String str2 = this.fFieldMap.get(str);
            jSONObject.put(str2 != null ? str2 : str, cSVRecord.get(str));
        }
        if (this.fLineNumberToField != null) {
            jSONObject.put(this.fLineNumberToField, this.fLineNumber);
        }
        return makeDefRoutingMessage(new Message(jSONObject));
    }
}
