/*
 * Decompiled with CFR 0.152.
 */
package io.continual.services.processor.engine.library.sources;

import io.continual.services.processor.engine.library.sources.BasicSource;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.util.time.Clock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.json.JSONObject;

public abstract class QueuingSource
extends BasicSource {
    private final ArrayList<MessageAndRouting> fPending = new ArrayList();

    public boolean hasMessagesReady(StreamProcessingContext spc) throws IOException {
        this.reloadPending();
        return this.fPending.size() > 0;
    }

    @Override
    protected MessageAndRouting internalGetNextMessage(StreamProcessingContext spc, long timeUnit, TimeUnit units) throws IOException, InterruptedException {
        long expireAtMs = Clock.now() + units.convert(timeUnit, TimeUnit.MILLISECONDS);
        while (!this.isEof() && !this.hasMessagesReady(spc) && Clock.now() < expireAtMs) {
            Thread.sleep(10L);
        }
        if (!this.isEof() && this.hasMessagesReady(spc)) {
            return this.getNextPendingMessage();
        }
        return null;
    }

    protected QueuingSource(String defaultPipelineName) {
        super(defaultPipelineName);
    }

    protected QueuingSource(JSONObject config) {
        this(config.getString("pipeline"));
    }

    protected MessageAndRouting getNextPendingMessage() {
        return this.fPending.remove(0);
    }

    protected List<MessageAndRouting> reload() {
        return new ArrayList<MessageAndRouting>();
    }

    private void reloadPending() {
        this.fPending.addAll(this.reload());
    }
}

