/*
 * Decompiled with CFR 0.152.
 */
package io.continual.services.processor.engine.library.sources;

import io.continual.builder.Builder;
import io.continual.services.ServiceContainer;
import io.continual.services.processor.engine.library.sources.BasicSource;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.json.JSONObject;

public class JsonObjectStreamSource
extends BasicSource {
    private final ArrayList<JSONObject> fPending = new ArrayList();
    private int fSkip;

    public JsonObjectStreamSource(ServiceContainer sc, JSONObject config) throws Builder.BuildFailure {
        super(config);
        this.fSkip = config.optInt("skip", 0);
    }

    public synchronized void submit(JSONObject msg) {
        if (!this.isEof()) {
            if (this.fSkip > 0) {
                --this.fSkip;
                return;
            }
        } else {
            throw new IllegalStateException("Added JSON msg after close.");
        }
        this.fPending.add(msg);
        this.notify();
    }

    @Override
    public synchronized boolean isEof() {
        return this.fPending.size() == 0 && super.isEof();
    }

    @Override
    protected synchronized MessageAndRouting internalGetNextMessage(StreamProcessingContext spc, long timeUnit, TimeUnit units) throws IOException, InterruptedException {
        if (this.fPending.size() > 0) {
            return this.makeDefRoutingMessage(new Message(this.fPending.remove(0)));
        }
        this.wait(TimeUnit.MILLISECONDS.convert(timeUnit, units));
        if (this.fPending.size() > 0) {
            return this.makeDefRoutingMessage(new Message(this.fPending.remove(0)));
        }
        return null;
    }
}

