/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.Inbox;
import com.hazelcast.jet.Outbox;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboundEdgeStream;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.util.ArrayDequeInbox;
import com.hazelcast.jet.impl.util.CircularListCursor;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.util.Preconditions;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

abstract class ProcessorTaskletBase
implements Tasklet {
    final ProgressTracker progTracker = new ProgressTracker();
    final Processor processor;
    final OutboundEdgeStream[] outstreams;
    InboundEdgeStream currInstream;
    private final Contexts.ProcCtx context;
    private final ArrayDequeInbox inbox = new ArrayDequeInbox(this.progTracker);
    private final Queue<ArrayList<InboundEdgeStream>> instreamGroupQueue;
    private CircularListCursor<InboundEdgeStream> instreamCursor;

    ProcessorTaskletBase(Contexts.ProcCtx context, Processor processor, List<InboundEdgeStream> instreams, List<OutboundEdgeStream> outstreams) {
        Preconditions.checkNotNull(processor, "processor");
        this.context = context;
        this.processor = processor;
        this.instreamGroupQueue = instreams.stream().collect(Collectors.groupingBy(InboundEdgeStream::priority, TreeMap::new, Collectors.toCollection(ArrayList::new))).entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toCollection(ArrayDeque::new));
        this.outstreams = (OutboundEdgeStream[])outstreams.stream().sorted(Comparator.comparing(OutboundEdgeStream::ordinal)).toArray(OutboundEdgeStream[]::new);
        this.instreamCursor = this.popInstreamGroup();
    }

    Inbox inbox() {
        return this.inbox;
    }

    void initProcessor(Outbox outbox, CompletableFuture<Void> jobFuture) {
        this.context.initJobFuture(jobFuture);
        this.processor.init(outbox, this.context);
    }

    void tryFillInbox() {
        ProgressState result;
        if (this.instreamCursor == null) {
            return;
        }
        this.progTracker.notDone();
        InboundEdgeStream first = this.instreamCursor.value();
        do {
            this.currInstream = this.instreamCursor.value();
            result = this.currInstream.drainTo(this.inbox);
            this.progTracker.madeProgress(result.isMadeProgress());
            if (result.isDone()) {
                this.instreamCursor.remove();
            }
            if (this.instreamCursor.advance()) continue;
            this.instreamCursor = this.popInstreamGroup();
            return;
        } while (!result.isMadeProgress() && this.instreamCursor.value() != first);
    }

    private CircularListCursor<InboundEdgeStream> popInstreamGroup() {
        return Optional.ofNullable(this.instreamGroupQueue.poll()).map(CircularListCursor::new).orElse(null);
    }

    public String toString() {
        return "ProcessorTasklet{vertex=" + this.context.vertexName() + ", processor=" + this.processor + '}';
    }
}

